2017-05-25 4 views
1

Ich implementiere eine Dataflow-Pipeline, die Nachrichten aus Pubsub liest und TableRows mit Apache Beam SDK 2.0.0 für Java in BigQuery (BQ) schreibt.Akkumulation von Elementen in GroupByKey-Subtask beim Schreiben in BigQuery Apache Beam v2.0

Dies ist der damit verbundene Teil des Codes:

tableRowPCollection 
      .apply(BigQueryIO.writeTableRows().to(this.tableId) 
       .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) 
       .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

Dieser Code erzeugt eine Gruppe von Aufgaben unter der Haube in der Datenfluss-Pipeline. Eine dieser Aufgaben ist der GroupByKey. Diese Aufgabe sammelt Elemente in der Pipeline, wie in diesem Druckbildschirm zu sehen ist: GBK elements accumulation image. Nach dem Lesen der Dokumente vermute ich, dass dieses Problem auf die Window-Konfiguration bezieht. Aber ich konnte keine Möglichkeit finden, die Window-Konfiguration zu ändern, da es implizit ist, die von Window.Assign in der Umgruppierung-Aufgabe transformiert wird.

Gibt es eine Möglichkeit zum Festlegen der Fensterparameter und/oder das Hinzufügen von Triggern zu diesem impliziten Fenster oder soll ich mein eigenes DoFn erstellen, das ein TableRow in BQ einfügt?

Vielen Dank im Voraus!


[Update]

ich die Pipeline für einen Tag laufen gelassen ungefähr und danach die GroupByKey Unteraufgabe schneller geworden und die Anzahl der Elemente kommen und (manchmal einander angenähert herauskommen waren das Gleiche). Außerdem habe ich festgestellt, dass sich die Watermark dem aktuellen Datum näherte und schneller anstieg. So wurde das "Problem" gelöst.

+0

Was ist das Problem? GroupByKey gruppiert die Werte, die einem bestimmten Schlüssel zugeordnet sind. Wenn Sie also N Schlüssel haben, würden Sie N Elemente erwarten, die aus dem GroupByKey kommen. Was Sie beschrieben haben, klingt so, als ob es wie beabsichtigt funktioniert. –

+0

@BenChambers Der Punkt ist, dass ich nicht zu lange warten möchte, um die Daten in BQ einzufügen. Ich möchte den Trigger auswählen, der in dieser Unteraufgabe verwendet werden soll. –

Antwort

0

Es gibt keine Wartezeiten durch die Reshuffle in der BigQuery-Senke eingeführt. Es wird vielmehr zum Erstellen der Stapel für Zeilen verwendet, die in BigQuery geschrieben werden sollen. Die Anzahl der Elemente, die aus dem GroupByKey herauskommen, ist kleiner, da jedes Ausgabeelement einen Stapel (oder eine Gruppe) von Eingabeelementen darstellt.

Sie sollten in der Lage sein, die Gesamtzahl der Elemente zu sehen, die als Ausgabe des ExpandIterable ausgegeben werden (die Ausgabe des Reshuffle).

+0

Ich ließ die Pipeline ungefähr einen Tag lang laufen und danach wurde die Unteraufgabe 'GroupByKey' schneller und die Anzahl der ein- und ausgehenden Elemente approximierte sich (manchmal waren sie gleich). Außerdem habe ich bemerkt, dass das 'Watermark' dem aktuellen Datum näher kam und schneller zunahm. –

Verwandte Themen