2016-12-13 4 views
0

Arbeiten mit einer Streaming-unbegrenzten PCollection in Google Dataflow, die aus einem Cloud PubSub-Abonnement stammt. Wir verwenden dies als Firehose, um Events einfach an BigTable zu liefern. Alles mit der Lieferung funktioniert gut.Cloud-Datenfluss: Nebeneffekt bei fortschreitendem Wasserzeichen

Unser Problem ist, dass wir nachgelagerte Batch-Jobs haben, die erwarten, dass sie einen Tag lang Daten aus BigTable auslesen, sobald sie ausgeliefert sind. Ich würde gerne Fensterung und Triggerung verwenden, um einen Nebeneffekt zu implementieren, der eine Markierzeile nach bigtable schreibt, wenn das Wasserzeichen über den Tagesschwellenwert hinausgeht, was darauf hinweist, dass der Datenfluss Grund zu der Annahme hat, dass die meisten Ereignisse geliefert wurden. t brauchen starke Garantien auf Vollständigkeit, nur vernünftige) und dass die Downstream-Verarbeitung beginnen kann.

Wir haben versucht, die rohen Ereignisse als eine Senke in der Pipeline zu schreiben, und dann Fenster in eine andere Senke unter Verwendung the timing information in the pane, um festzustellen, ob das Wasserzeichen fortgeschritten ist. Das Problem bei diesem Ansatz besteht darin, dass er erneut auf die rohen Ereignisse selbst einwirkt, was unerwünscht ist, da es das Schreiben der Ereigniszeilen wiederholen würde. Wir können dieses Schreiben verhindern, aber der parallele Pfad in der Pipeline würde immer noch über die gefensterten Ströme von Ereignissen arbeiten.

Gibt es eine effektive Möglichkeit, einen Rückruf an das Wasserzeichen anzuhängen, sodass wir eine einzige Aktion ausführen können, wenn das Wasserzeichen vorrückt?

Antwort

1

Die allgemeine Fähigkeit, einen Timer in Ereigniszeit einzustellen und einen Rückruf zu erhalten, ist definitiv ein wichtiger Feature-Request, wie BEAM-27 eingereicht, die unter aktiver Entwicklung ist.

Aber tatsächlich scheint Ihr Ansatz der Fenstereinblendung in FixedWindows.of(Duration.standardDays(1)) wie es Ihr Ziel nur mit den Funktionen des Dataflow Java SDK 1.x erreichen wird. Anstatt die Pipeline zu verzweigen, können Sie das "Firehose" -Verhalten beibehalten, indem Sie den Trigger AfterPane.elementCountAtLeast(1) hinzufügen. Es kostet die Kosten eines GroupByKey, aber dupliziert nichts.

Der komplette Pipeline könnte wie folgt aussehen:

pipeline 
    // Read your data from Cloud Pubsub and parse to MyValue 
    .apply(PubsubIO.Read.topic(...).withCoder(MyValueCoder.of()) 

    // You'll need some keys 
    .apply(WithKeys.<MyKey, MyValue>of(...)) 

    // Window into daily windows, but still output as fast as possible 
    .apply(Window.into(FixedWindows.of(Duration.standardDays(1))) 
       .triggering(AfterPane.elementCountAtLeast(1))) 

    // GroupByKey adds the necessary EARLY/ON_TIME/LATE labeling 
    .apply(GroupByKey.<MyKey, MyValue>create()) 

    // Convert KV<MyKey, Iterable<MyValue>> 
    // to KV<ByteString, Iterable<Mutation>> 
    // where the iterable of mutations has the "end of day" marker if 
    // it was ON_TIME 
    .apply(MapElements.via(new MessageToMutationWithEndOfWindow()) 

    // Write it! 
    .apply(BigTableIO.Write.to(...); 

Bitte kommentieren meine Antwort, wenn ich einige Detail Ihres Anwendungsfall verpasst.