Wir erzeugen einen sequentiellen Index in einem ParDo mit Beam's Java SDK 2.0.0. Genau wie das einfache Stateful Index Beispiel in Beam introduction to stateful processing verwenden wir eine ValueState<Integer>
Zelle und unsere einzige Operation ist es, den Wert und Schritt abgerufen werden, wenn wir den nächsten Index benötigen:Stateful Indizierung bewirkt, dass ParDo single-threaded auf Dataflow Runner ausgeführt wird
Integer statefulIndex = firstNonNull(index.read(), 0);
index.write(statefulIndex + 1);
Wenn mit dem Google-Datenfluss-läufer läuft, wir Auf der Dataflow-Überwachungsschnittstelle wurde festgestellt, dass sich die Wandzeit für diesen ParDo synchron mit der verstrichenen Zeit ansammelte. Wir konnten bestätigen, dass der ParDo single-threaded ausführt, indem er sich in den Worker-Knoten einfügt und top
und 1
verwendet, um die CPU-Auslastung pro Kern anzuzeigen. Das gleiche ParDo, das die Stateful-Processing-Zelle auskommentiert und den Code ansonsten unverändert lässt, verwendet alle Cores unseres Worker-Knotens.
Auch wenn der Dataflow-Runner in der Lage ist, die statusbezogene Indizierung basierend auf jedem Schlüssel und Fensterpaar zu parallelisieren (wir haben derzeit ein Fenster und einen Schlüssel), führt der Mangel an Parallelität zu einer so erheblichen Leistungsminderung, die wir nicht nutzen können es. Ist dies das erwartete Verhalten des Dataflow-Runner?
Naiv, ich erwartete, dass Beam Stateful Indizierung im Hintergrund ähnlich wie Javas AtomicInteger
funktionieren würde. Gibt es Einschränkungen, die eine Parallelverarbeitung mit einer ValueState<Integer>
Zelle verhindern oder ist diese Funktionalität noch nicht in den Runner integriert?
Es ist natürlich sinnvoll, dass der Zugriff auf den statusbehafteten Index seriell ist; Warum ist die * ganze * ParDo-Funktion auch seriell? Wenn wir atomare Integer (statt Stateful Index) verwenden, läuft der ParDo parallel; Aber wenn wir Stateful Index verwendeten, lief der ParDo seriell (dh nur ein Thread wurde ausgeführt). Wird das erwartet? (Fyi: Wir hatten numWorkers = 1, aber dieser Knoten hatte ~ 32 Kerne). –
Das Ausführen des 'ParDo' in mehreren Threads hilft dir überhaupt nicht. Wenn Sie den 'AtomicInteger' verwenden, um dies korrekt zu implementieren, indem Sie einen Algorithmus verwenden, der auf Vergleichen und Tauschen basiert, wird jedes Element weiterhin seriell verarbeitet, wobei alle außer einem Thread ständig beschäftigt sind. –