2016-03-22 10 views
0

Ich habe ein Problem mit einer Funktion (die ich nicht ändern kann), die eine RDD als Eingabe benötigt, aber meine Daten sind in einem solchen Format, dass ich nicht scheinen kann, nur eine RDD in die Funktion zu bekommen.Iterieren über ein RDD-Paar, um eine Funktion auf einer RDD ab dem zweiten Wert auszuführen.

Betrachten Sie eine RDD, die von einer Gruppe erstellt wurde, so dass es aus ("Name", Daten) -Paaren besteht, genannt coolRdd. Die Daten sind ein Iterable [String] und der Name ist ein String. Allerdings muss ich CoolFunction darauf ausführen, die Typ (Rdd [String], String) übernimmt. Hier war mein Versuch:

coolRdd.foreach{ case (name, data) => sc.CoolFunction(data.toList, name) } 

die

found : List[String] 
required: org.apache.spark.rdd.RDD[String] 

kehrt Ich habe auch versucht sc.parallelize auf dem data.toList läuft, aber das gibt einen Nullpointer, weil es eine RDD von RDDs die Spark-doesn schaffen würde nicht erlauben.

Ich frage mich, ob es möglich ist, eine andere Funktion zu schreiben, die die Konvertierung von Daten durchführen kann, und rufen Sie dann die erforderliche CoolFunction. Es wäre besser, wenn ich das nicht mit dem Fahrer machen müsste, aber wenn nötig, ist das machbar.

Als Bonus: Ich mache das tatsächlich mit dem Streaming, also wird dieses ganze Durcheinander in einem Anruf für foreachRDD sein, aber ich erwarte, dass, wenn ich das im Normalfall arbeiten kann, ich es schaffen kann Arbeit im Streaming-Fall.

+0

Wenn Sie darüber nachdenken, teilen Sie im Wesentlichen eine RDD in kleinere RDDs. Wie in einer der Antworten hier diskutiert: http://stackoverflow.com/questions/32970709/how-to-split-a-rdd-into-two-or-mor-rdds können Sie eine RDD nicht teilen. Das Beste, was Sie tun können, ist es zu filtern. In Ihrem Fall müssten Sie es in einzelne Zeilen aufteilen. Es gibt wahrscheinlich keinen guten Weg, es zu tun - Ihre Antwort unten funktioniert gut. Eine mögliche Optimierung wäre, nur ein Array von "name" in den Treiber zu ziehen, dann die RDD einzeln nach "name" zu filtern und diese der Funktion zuzuführen. Je nach Datensatz kann das hilfreich sein. –

Antwort

0

konnte ich eine Lösung finden:

coolRdd. 
collect. 
foreach{ case (name, data) => 
val data_list = data.toList 
sc.coolFunction(sc.parallelize(data_list), pid) 
} 

Wo sammle ich versagte irrte auf. Da nur der Fahrer RDDs kennt, ist hier Sammeln erforderlich.

+0

Lassen Sie Frage für ein Bit offen, falls jemand eine größere Lösung hinzufügen muss. – BBischof

+0

Das ist sehr schlechte Praxis. Wenn Ihre Daten auf den Treiber passen, warum möchten Sie sie in einer RDD verteilen? Es gibt noch mehr, das Sammeln einer RDD kann den Treiber überfordern, wenn Ihr Datenvolumen groß ist, was zu OOME führen kann. – eliasah

+0

Sie haben völlig Recht, ich wollte dazu bemerken. Eine Sache, die mich rettet, ist, dass die einzelnen RDDS in diesen Fällen klein sein sollten. Kannst du hier eine bessere Strategie sehen? – BBischof

Verwandte Themen