2017-03-29 9 views
1

Ich habe eine einfache Pipeline, die Datensätze liest sowohl eine Textdatei und mysql und versucht, sie zu vereinbaren, dh Datensätze einfügen, wenn sie nicht in der Datenbank vorhanden sind, Datensätze in der DB mit der Datei aktualisieren, und einige andere Updates Datensätze in der DB, die nicht in der Datei vorhanden sind.Wie verteilt man Beam Tasks gleichmäßig auf Spark?

Ein Problem, das mit 2 M Aufzeichnungen in Funken bei der Ausführung auftritt, ist die folgende:

enter image description here

ist Meine Vermutung, dass der folgende Code

 final TupleTag<FileRecord> fileTag = new TupleTag<>(); 
     final TupleTag<MysqlRecord> mysqlTag = new TupleTag<>(); 
     PCollection<KV<Integer, CoGbkResult>> joinedRawCollection = 
       KeyedPCollectionTuple.of(fileTag, fileRecords) 
         .and(mysqlTag, mysqlRecords) 
         .apply(CoGroupByKey.create()); 

Hier dieses Ungleichgewicht erzeugt die Funken Executor DAG Visualisierung

enter image description here

Schließlich wird dem einen Arbeiter der Arbeitsspeicher ausgehen. Ich weiß nativ in Spark, dass man Partitioners angeben kann, um die Arbeitslast auf Worker zu verteilen. Wie mache ich das in Beam?

EDIT:

Ich vermutete, dass JDBCIo konnte nicht richtig die eine Abfrage verteilen, so teile ich es in mehrere PCollections und sie dann später abgeflacht. Ich las viel schneller von Mysql, stieß aber schließlich auf das gleiche Problem.

Hier sind die Stufen, die bearbeitet werden: enter image description here

Aber jede Stufe immer noch von diesem Ungleichgewicht leidet ?: enter image description here

+0

Eigentlich könnte es so aussehen, dass der Grund für dieses Ungleichgewicht darin liegt, dass es der Leseschritt von MySQL mit so vielen Datensätzen ist. Da JDBCIO vermutlich die eine SELECT-Abfrage nicht verteilt, sehen wir diese Konkurrenz. Lass mich versuchen, es aufzuteilen. – nambrot

Antwort

0

Um meine eigene Frage mit der Verwirklichung meiner eigenen Versagen zu beantworten zu unterscheiden zwischen Spark Stages und Aufgabe. Die Aufgaben waren in der Tat verteilt, ich habe dem Treiberprogramm eigentlich nicht genügend Speicher zugewiesen.

Verwandte Themen