2016-03-22 10 views
1

Ich habe ein Fenster in Spark Streaming mit 5 ganzzahligen Werten erstellt. Jedes Mal, wenn eine Sekunde verstrichen ist, erhält das Fenster einen neuen Wert und verliert den ältesten. Jedes Mal, wenn ein neuer Wert kommt, berechne ich den Durchschnitt der Werte im Fenster wie folgt aus:Vergleiche neuen Wert mit dem vorherigen Durchschnitt in Spark Streaming

JavaDStream<Integer> average = values.map(new Function<Integer, Integer>() { 
     @Override 
     public Integer call(Integer a) throws Exception { 
      int b = a/5; 


      return b; 


    } }); 

So ist die durchschnittliche Veränderung hält.

Das Problem ist, jedes Mal, wenn ein neuer Wert zum Fenster kommt, möchte ich es mit dem vorherigen Durchschnitt vergleichen. Wenn dieser neue Wert viel größer oder kleiner als der Durchschnittswert ist, möchte ich ihn fallen lassen und den alten Durchschnitt beibehalten. Wenn nicht, kann die Durchschnitt aktualisiert werden.

Meine Frage ist, wie kann ich diesen 'alten Durchschnitt' speichern, damit ich ihn mit dem neuen Wert im Fenster vergleichen kann?

Vielen Dank.

+0

Berechnung sind die durchschnittlichen basierend auf einem Schlüssel? Oder einfach einen Strom von eingehenden ganzen Zahlen verarbeiten? –

+0

Eine Möglichkeit zum Speichern und Abrufen für einen Vergleich besteht darin, einen der verschiedenen Datenspeicher zu verwenden, die entweder mit Spark Streaming oder mit einem Spark-Connector integriert sind. Beispiele sind [SnappyData] (https://github.com/SnappyDataInc/snappydata), [Redis] (https://github.com/RedisLabs/spark-redis), [MemSQL] (https://github.com/). memsql/memsql-spark-connector), [Kassandra] (https://github.com/datastax/spark-cassandra-connector), [HBase] (https://github.com/nerdammer/spark-hbase-connector) – plambre

+0

@YuvalItzchakov, es ist der zweite Fall genau, ich verarbeite einen Strom von ganzen Zahlen (5 ankommen pro Sekunde) und mit 'map' Methoden, um den Mittelwert dieser ganzen Zahlen zu berechnen –

Antwort

0

Sie etwas versuchen könnten, ähnlich den folgenden:

  1. einen „Schlüssel-Wert DSTREAM“ erstellen mit einer Pseudo-Taste (zB 1)
  2. Verwendung mapWithState() den letzten Durchschnitt zu speichern und zu vergleichen, wie du erwähntest; Rückkehr der Durchschnitt Sie

es wie folgt aussehen könnte weiter verwenden möchten:

JavaPairDStream<Integer, Integer> pairedIntStream = values.mapToPair(
new PairFunction<Integer, Integer, Integer>() { 
     @Override 
     public Tuple2<Integer, Integer> call(Integer a) throws Exception { 
      return new Tuple2<Integer, Integer>(1, a); 
} }); 

Function3<Integer, Optional<Integer>, State<Integer>, Integer> mappingFunction = 
     new Function3<Integer, Optional<Integer>, State<Integer>, Integer>() { 
      @Override 
      public Integer call(Integer s, Optional<Integer> value, State<Integer> state) { 

       int avg = value.orNull()/5; 

       if(!state.exists() || state.get() < avg) { 
        state.update(avg); 
        return avg; 
       } 
       return state.get(); 
      } 
     }; 

    JavaMapWithStateDStream<Integer, Integer, Integer, Integer> mapWithStateDStream = 
     pairedIntStream.mapWithState(StateSpec.function(mappingFunc));