2016-11-21 6 views
1

Ich bin ziemlich neu auf Spark Streaming und ich bin stecken stecken versucht, um herauszufinden, wie dieses Problem zu behandeln, da ich viele Beispiele für einzelne (K, V) Paare, aber alles weitere. Ich würde mich über Hilfe freuen, um mit Sparks Transformationen mit Java den besten Ansatz zu finden.Spark-Streaming reduzieren durch mehrere Schlüssel Java

ich kurz das Szenario beschreiben lassen,

Das Ziel ist es, die Fehlerquote eines Satzes von Elementen innerhalb eines Zeitfensters zu erhalten.

die folgende Eingabe gegeben,

(A, Error) 
(B, Success) 
(B, Error) 
(B, Success) 
(C, Success) 
(C, Error) 

Es wird von Element aggregieren wird und (Element, (Number of Success, Number of Error)) der Status dann. Auf diesem Fall würde das Ergebnis der Transformation sein,

(A, (0,1)) 
(B, (2,1)) 
(C, (1,1)) 

Und schließlich eine Verhältnisberechnung eine Funktion wie (i1, i2) mit -> i1/(i1 + i2).

(A, 100%) 
(B, 33.3%) 
(C, 50%) 

Soweit ich verstehe, wäre das Ergebnis von reduceByKeyAndWindow() Funktion zum Beispiel gegeben werden

JavaPairDStream<String, Double> res = 
pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(1)); 

Nach dem Rückfluss der Anwendung, meine Fragen sind,

Wie definiert man ein Paar auf einem JavaPairDStream mit mehr als einem Wert oder Schlüssel (vielleicht so etwas wie JavaPairDStream<String, Tuple2<Integer,Integer>>)?

Welches ist der beste Ansatz für die reduceFunc gegeben ein Paar mit mehreren Schlüsseln?

Welches ist der beste Weg, um den ursprünglichen DStream (vielleicht so etwas wie JavaDStream<Tuple2<String, String>> line = input.map(func)) abzubilden?

Vielen Dank im Voraus für Ihre Hilfe.

Antwort

2

Ich habe bereits die Lösung gefunden. Wenn Sie mit Funktionsklassen und Tupeln arbeiten, können Sie jede Kombination finden, die Sie mit Scala erstellen würden. Das Problem ist, dass ich in Java keine Dokumentation oder Beispiele dazu gefunden habe. Im Folgenden finden Sie meine Lösung für den Fall, dass es in der Zukunft helfen kann.

JavaPairDStream<String,String> samples = lines.flatMapToPair(new PairFlatMapFunction<String,String, String>() { 
      public Iterator<Tuple2<String,String>> call(String s) throws Exception { 
       return Arrays.asList(new Tuple2<String, String>(//Some logic on my data//).iterator(); 
      } 
     }); 


JavaPairDStream<Tuple2<String,String>, Integer> samplePairs = samples.mapToPair(
       new PairFunction<Tuple2<String,String>, Tuple2<String,String>, Integer>() { 
        public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String,String> t) { 
         return new Tuple2<Tuple2<String,String>, Integer>(t, 1); 
        } 
       }); 

     JavaPairDStream<String, Integer> countErrors = samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() { 
      public Boolean call(Tuple2<Tuple2<String,String>, Integer> t) 
      { 
       return (t._1._2.equals("Error")); 
      }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() { 
      public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) { 
       return new Tuple2(t._1._1,t._2); 
      } 
     }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      }}, Durations.seconds(30), Durations.seconds(1)); 

     JavaPairDStream<String, Integer> countSuccess= samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() { 
      public Boolean call(Tuple2<Tuple2<String,String>, Integer> t) 
      { 
       return (t._1._2.equals("Success")); 
      }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() { 
      public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) { 
       return new Tuple2(t._1._1,t._2); 
      } 
     }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      }}, Durations.seconds(30), Durations.seconds(1)); 

     JavaPairDStream<String,Tuple2<Optional<Integer>,Optional<Integer>>> countPairs = countSuccess.fullOuterJoin(countErrors); 

     JavaPairDStream<String, Double> mappedRDD = countPairs 
       .mapToPair(new PairFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>, String, Double>() { 
        public Tuple2<String, Double> call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> stringTuple2Tuple2) throws Exception { 
         if ((stringTuple2Tuple2._2()._2().isPresent()) && (stringTuple2Tuple2._2()._1().isPresent())) { 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), ((double)stringTuple2Tuple2._2()._1().get()/
            ((double)stringTuple2Tuple2._2()._2().get()+(double)stringTuple2Tuple2._2()._1().get()))); 
         } else if (stringTuple2Tuple2._2()._2().isPresent()){ 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 1.0); 
         } else { 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 0.0); 
         } 
        } 
       }); 
Verwandte Themen