2016-07-22 9 views
5

Ich versuche, einige XML-Daten, die in einer JMS-Warteschlange (QPID) empfangen werden, mithilfe von Spark Streaming zu verarbeiten. Nachdem ich xml als DStream erhalten habe wandle ich sie in Dataframes um, damit ich sie mit einigen meiner statischen Daten in Form von bereits geladenen Dataframes verbinden kann. Aber nach API-Dokumentation für foreachRdd-Methode auf DStream: es auf Treiber ausgeführt wird, so bedeutet dies, dass alle Verarbeitungslogik nur auf Treiber ausgeführt wird und nicht an Worker/Executoren verteilt werden.Wird foreachRDD auf dem Treiber ausgeführt?

API Dokumentation

foreachRDD(func)

Der generische Ausgabe Operator, der eine Funktion gilt, func, jeden RDD aus dem Strom erzeugt wird. Diese Funktion sollte die Daten in jeder RDD zu einem externen System schieben, wie zum Beispiel die RDD in Dateien speichern oder über das Netzwerk in eine Datenbank schreiben. Hinweis , dass die Funktion func im Treiberprozess unter der Streaming-Anwendung ausgeführt wird, und in der Regel RDD-Aktionen darin haben, dass die Berechnung der Streaming-RDDs erzwingen wird.

Antwort

6

so bedeutet das, dass alle Logikverarbeitung nur auf Treiber laufen und nicht Arbeitnehmer/Testamentsvollstrecker verteilt bekommen.

Nein, die Funktion selbst läuft auf den Fahrer, aber vergessen Sie nicht, dass es auf einem RDD arbeitet. Die inneren Funktionen, die Sie für die verwenden, wie foreachPartition, map, filter usw. werden weiterhin auf den Worker-Knoten ausgeführt. Diese wird nicht bewirken, dass alle Daten über das Netzwerk an den Treiber zurückgesendet werden, es sei denn, Sie rufen Methoden wie collect auf, die dies tun.

+0

Nun, das ist so verwirrend "Nein, die Funktion selbst läuft auf dem Treiber, aber vergiss nicht, dass sie auf einem RDD arbeitet", nehme an, dass "rdd.foreRADD (func)" aufgerufen wird, und diese "func" -Funktion schreibt Daten über a in eine Redis global var 'redisis_client', dh' func' bezieht sich auf 'redis_client', also Q ist: wird irgendeine Ausnahme in 'foreachRDD'-Aufruf ausgelöst, da' redis_client' nicht serialisierbar ist. – avocado

+0

@loganecolss Ich stimme zu, die Semantik der Ausführung ist kompliziert. 'rdd.foreachRDD' führt' func' auf den Executoren aus. Wenn 'func' * den' redis_client' durch Schließen erfasst, erhalten Sie eine 'TaskNotSerializable' Ausnahme. Wenn die Instanz von 'redis_client' in 'func' zugewiesen ist, ist alles in Ordnung. –

+0

@YuvalItzchakov Zitieren Sie Ihren Kommentar: _rdd.foreachRDD führt func auf den Executoren aus._ Aus der [docs] (https://spark.apache.org/docs/latest/streaming-programming-guide.html#output- operations-on-dstreams), führt "foreachRDD" 'func' auf dem ** Driver ** aus. 'foreach',' foreachPartition' läuft jedoch auf ** Executors **. –

1

Um dies deutlich zu machen, wenn Sie die folgenden ausführen, werden Sie „Affen“ auf dem Fahrer stdout sehen:

myDStream.foreachRDD { rdd => 
    println("monkey") 
} 

Wenn Sie die folgenden ausführen, werden Sie „Affen“ auf dem Fahrer stdout sehen, und der Filter Arbeit die rdd aus welchem ​​Testamentsvollstrecker getan werden verteilt über:

myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
} 

Lassen Sie uns die Vereinfachung hinzufügen, dass myDStream onl Sie erhalten je eine RDD, und diese RDD verteilt sich auf eine Reihe von Partitionen, die wir PartitionSetA nennen, die unter MachineSetB existieren, wo ExecutorSetC läuft.Wenn Sie Folgendes ausführen, sehen Sie "affe" auf dem Treiber-Stdout, Sie werden "turtle" auf den Stdouts aller Executoren in ExecutorSetC sehen ("turtle" erscheint einmal für jede Partition - viele Partitionen könnten sich auf der Maschine befinden wo ein Vollstrecker ausgeführt wird), und die Arbeit der beiden Filter und Additionsoperationen wird über ExecutorSetC erfolgen:

myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
    rdd.foreachPartition { partition => 
    println("turtle") 
    val x = 1 + 1 
    } 
} 

eine weitere Sache, dass in dem folgenden Code zu beachten ist, y würde am Ende über die gesendeten Netzwerk vom Treiber zu allen ExecutorSetC für jeden rdd:

val y = 2 
myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
    rdd.foreachPartition { partition => 
    println("turtle") 
    val x = 1 + 1 
    val z = x + y 
    } 
} 

Um diesen Overhead zu vermeiden, können Sie Broadcast-Variablen verwenden, die nur einmal den Wert vom Treiber an die Executoren senden. Zum Beispiel:

val y = 2 
val broadcastY = sc.broadcast(y) 
myDStream.foreachRDD { rdd => 
    println("monkey") 
    rdd.filter(element => element == "Save me!") 
    rdd.foreachPartition { partition => 
    println("turtle") 
    val x = 1 + 1 
    val z = x + broadcastY.value 
    } 
} 

Für komplexere Dinge über als Broadcast Senden von Variablen, wie Objekte, die nicht leicht serializable einmal instanziiert sind, können Sie die folgende Blog-Post sehen: https://allegro.tech/2015/08/spark-kafka-integration.html

Verwandte Themen