2017-11-23 4 views

Antwort

3

Ja, das ist möglich.

Wenn Sie die Ereigniszeit verwenden, können Sie die Fenster einfach mit zunehmenden Zeitintervallen kaskadieren. So können Sie tun:

DataStream<String> data = ... 
// append a Long 1 to each record to count it. 
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne); 

DataStream<Tuple2<String, Long>> 1minCnts = withOnes 
    // key by String field 
    .keyBy(0) 
    // define time window 
    .timeWindow(Time.of(1, MINUTES)) 
    // sum ones of the Long field 
    // in practice you want to use an incrementally aggregating ReduceFunction and 
    // a WindowFunction to extract the start/end timestamp of the window 
    .sum(1); 

// emit 1-min counts to wherever you need it 
1minCnts.addSink(new YourSink()); 

// compute 5-min counts based on 1-min counts 
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts 
    // key by String field 
    .keyBy(0) 
    // define time window of 5 minutes 
    .timeWindow(Time.of(5, MINUTES)) 
    // sum the 1-minute counts in the Long field 
    .sum(1); 

// emit 5-min counts to wherever you need it 
5minCnts.addSink(new YourSink()); 

// continue with 1 day window and 1 week window 

Beachten Sie, dass dies möglich ist, weil:

  1. Summe ist eine assoziative Funktion (Sie können eine Summe berechnen, indem Teilsumme summieren).
  2. Die taumelnden Fenster sind gut ausgerichtet und überlappen sich nicht.

In Bezug auf den Kommentar über die schrittweise Aggregation ReduceFunction:

Normalerweise möchten Sie den Start- und/oder End-Zeitstempel des Fensters in der Ausgabe eines Fensters Betrieb haben (sonst alle Ergebnisse für den gleichen Schlüssel gleich aussehen). Auf die Start- und Endzeit eines Fensters kann über den window Parameter der apply() Methode einer WindowFunction zugegriffen werden. Ein WindowFunction aggregiert jedoch Datensätze nicht inkrementell, sondern sammelt sie und aggregiert die Datensätze am Ende des Fensters. Daher ist es effizienter, eine ReduceFunction für inkrementelle Aggregation und eine WindowFunction zu verwenden, um die Start- und/oder Endzeit des Fensters an das Ergebnis anzuhängen. Die documentation diskutiert die Details.

Wenn Sie dies mit Verarbeitungszeit berechnen möchten, können Sie die Fenster nicht kaskadieren, sondern müssen aus dem Eingabedatenstrom auf vier Fensterfunktionen auffächern.

+0

Ausgezeichnet - sobald ich Code zuweisenTimestampsAndWatermarks et al kann ich EventTime verwenden, was immer der Plan war. Könntest du ein bisschen auf den 'Best Practice reduceFunction' Kommentar erweitern? Klingt interessant ... –

+0

Sicher habe ich meine Antwort erweitert –