2017-08-14 3 views
0

Ich habe eine Dataflow-Pipeline lesen von unbegrenzten Quelle. Meine Fenstergröße beträgt 10 Stunden. Ich versuche meinen Trigger mit einem TestStream zu testen. Mein Trigger wird ein frühes Ergebnis ausgeben, wenn die Elementanzahl in einem Fenster mindestens 2 für denselben Schlüssel erreicht. Ich habe folgende Trigger dies zu erreichen:Cloud-Datenfluss: Sobald Trigger nicht funktioniert

input.apply(Window.into(FixedWindows.of(Duration.standardHours(12)))    .triggering(AfterWatermark.pastEndOfWindow() 
     .withEarlyFirings(AfterPane.elementCountAtLeast(2))) 
     .apply(Count.perElement()) 

Wir haben auch versucht:

Repeatedly.forever(AfterPane.elementCountAtLeast(2)).orFinally(AfterWatermark.pastEndOfWindow()) 

ich früh Brennen erwarten, wenn das Ergebnis zu behaupten, aber ich nicht

PAssert.that(pipeline).inWindow(..).. 
in

alle das Ergebnis bekommt

Was mache ich falsch? Wenn derselbe Test wiederholt ausgeführt wird, führt dies zu unterschiedlichen Ergebnissen, was bedeutet, dass vom Trigger verschiedene Werte zurückgegeben werden.

Antwort

2

Die Triggerung ist nicht deterministisch. Es gibt Ihnen eine frühe Zündung einige Zeit, nachdem die Triggerbedingung erfüllt ist. Es gibt Ihnen dann eine weitere frühe Zündung einige Zeit, nachdem die Triggerbedingung wieder erfüllt ist.

Die tatsächliche Auswahl nach dem Auslöser wird vom Läufer bestimmt. Wenn Sie einen Batch-Runner verwenden, kann es warten, bis alle Daten verfügbar sind. Wie viel Input erwarten Sie für jeden Schlüssel/jedes Fenster? Welchen Läufer benutzt du?

+0

Dies war im Zusammenhang mit TestStream. Es funktioniert jetzt. Danke für die Information. – Mayumi