2017-06-28 3 views
2

Wir erzeugen einen sequentiellen Index in einem ParDo mit Beam's Java SDK 2.0.0. Genau wie das einfache Stateful Index Beispiel in Beam introduction to stateful processing verwenden wir eine ValueState<Integer> Zelle und unsere einzige Operation ist es, den Wert und Schritt abgerufen werden, wenn wir den nächsten Index benötigen:Stateful Indizierung bewirkt, dass ParDo single-threaded auf Dataflow Runner ausgeführt wird

Integer statefulIndex = firstNonNull(index.read(), 0); 
index.write(statefulIndex + 1); 

Wenn mit dem Google-Datenfluss-läufer läuft, wir Auf der Dataflow-Überwachungsschnittstelle wurde festgestellt, dass sich die Wandzeit für diesen ParDo synchron mit der verstrichenen Zeit ansammelte. Wir konnten bestätigen, dass der ParDo single-threaded ausführt, indem er sich in den Worker-Knoten einfügt und top und 1 verwendet, um die CPU-Auslastung pro Kern anzuzeigen. Das gleiche ParDo, das die Stateful-Processing-Zelle auskommentiert und den Code ansonsten unverändert lässt, verwendet alle Cores unseres Worker-Knotens.

Auch wenn der Dataflow-Runner in der Lage ist, die statusbezogene Indizierung basierend auf jedem Schlüssel und Fensterpaar zu parallelisieren (wir haben derzeit ein Fenster und einen Schlüssel), führt der Mangel an Parallelität zu einer so erheblichen Leistungsminderung, die wir nicht nutzen können es. Ist dies das erwartete Verhalten des Dataflow-Runner?

Naiv, ich erwartete, dass Beam Stateful Indizierung im Hintergrund ähnlich wie Javas AtomicInteger funktionieren würde. Gibt es Einschränkungen, die eine Parallelverarbeitung mit einer ValueState<Integer> Zelle verhindern oder ist diese Funktionalität noch nicht in den Runner integriert?

Antwort

1

Dies ist nicht nur das erwartete Verhalten des Dataflow-Runner, sondern eine logische Notwendigkeit in jedem Kontext. Es spielt keine Rolle, ob Sie den Status in Beam oder AtomicInteger in einem single-process Java-Programm verwenden: Wenn Operation "A" einen Wert schreibt und Operation "B" den Wert liest, muss "B" nach "A" ausgeführt werden ". Der gebräuchliche Ausdruck hierfür ist Beziehung "passiert vor".

Diese Form der Stateful-Berechnung ist das Gegenteil der parallelen Berechnung. Per Definition hat ein Lesevorgang, der einen Schreibvorgang beobachtet, einen kausalen Zusammenhang. Per Definition haben zwei parallele Operationen keinen kausalen Zusammenhang.

Jetzt erwarten Sie möglicherweise parallele Threads, die gleichzeitig auf die Statuszelle zugreifen, wie im Standardmuster der Multithread-Programmierung mit einem gemeinsamen Status mit Gleichzeitigkeitskontrolle. Wenn diese Threads in diesem Beispiel tatsächlich parallel wären, würden Sie doppelte Indizes erhalten. Einen Schritt zurück schreitend, zielt Beam auf massive "peinlich parallele" Berechnungen, die transparent über eine große Gruppe von Maschinen verteilt sind. Feinkörnige Gleichzeitigkeitskontrollen können, abgesehen davon, dass sie extrem schwierig zu korrigieren sind, nicht ohne weiteres auf massive verteilte Berechnungen übertragen werden.

+0

Es ist natürlich sinnvoll, dass der Zugriff auf den statusbehafteten Index seriell ist; Warum ist die * ganze * ParDo-Funktion auch seriell? Wenn wir atomare Integer (statt Stateful Index) verwenden, läuft der ParDo parallel; Aber wenn wir Stateful Index verwendeten, lief der ParDo seriell (dh nur ein Thread wurde ausgeführt). Wird das erwartet? (Fyi: Wir hatten numWorkers = 1, aber dieser Knoten hatte ~ 32 Kerne). –

+0

Das Ausführen des 'ParDo' in mehreren Threads hilft dir überhaupt nicht. Wenn Sie den 'AtomicInteger' verwenden, um dies korrekt zu implementieren, indem Sie einen Algorithmus verwenden, der auf Vergleichen und Tauschen basiert, wird jedes Element weiterhin seriell verarbeitet, wobei alle außer einem Thread ständig beschäftigt sind. –

Verwandte Themen