2014-10-23 8 views
8

Kontext: Ich verwende Apache Spark, um eine laufende Anzahl von verschiedenen Ereignistypen aus Protokollen zu aggregieren. Die Protokolle werden sowohl in Cassandra für historische Analysezwecke als auch in Kafka für Echtzeitanalysezwecke gespeichert. Jedes Protokoll hat einen Datums- und Ereignistyp. Nehmen wir an, ich wollte aus Gründen der Einfachheit die Anzahl der Logs eines einzelnen Typs für jeden Tag im Auge behalten.Kombinieren Sie Ergebnisse von Batch-RDD mit Streaming-RDD in Apache Spark

Wir haben zwei RDDs, eine RDD von Batch-Daten von Cassandra und eine andere Streaming RDD von Kafka. Pseudocode:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type"); 

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() { 
    @Override 
    public Tuple2<String, Integer> call(CassandraRow row) { 
     return new Tuple2<String, Integer>(row.getString("date"), 1); 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}); 

save(batchRDD) // Assume this saves the batch RDD somewhere 

... 

// Assume we read a chunk of logs from the Kafka stream every x seconds. 
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...); 
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() { 
    @Override 
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) { 
     String jsonString = data._2; 
     JSON jsonObj = JSON.parse(jsonString); 
     Date eventDate = ... // get date from json object 
     // Assume startTime is broadcast variable that is set to the time when the job started. 
     if (eventDate.after(startTime.value())) { 
      ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>(); 
      pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1)); 
      return pairs; 
     } else { 
      return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs 
     } 
    } 
}).reduceByKey(new Function2<Integer, Integer, Integer>() { 
    @Override 
    public Integer call(Integer count1, Integer count2) { 
     return count1 + count2; 
    } 
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() { 
    @Override 
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) { 
     Integer previousValue = state.or(0l); 
     Integer currentValue = ... // Sum of counts 
     return Optional.of(previousValue + currentValue); 
    } 
}); 
save(streamRDD); // Assume this saves the stream RDD somewhere 

sc.start(); 
sc.awaitTermination(); 

Frage: Wie kombiniere ich die Ergebnisse aus der streamRDD mit dem batchRDD? Lassen Sie sich sagen, dass batchRDD folgende Daten hat und diese Arbeit wurde am 2014.10.16 laufen:

("2014-10-15", 1000000) 
("2014-10-16", 2000000) 

Da die Cassandra Abfrage alle nur die Daten an die Startzeit der Stapelabfrage oben enthielt, müssen wir Lesen Sie von Kafka, wenn die Abfrage beendet ist, und berücksichtigen Sie nur Protokolle nach der Startzeit des Jobs. Wir nehmen an, dass die Abfrage sehr lange dauert. Dies bedeutet, dass ich die historischen Ergebnisse mit den Streaming-Ergebnissen kombinieren muss.

Zur Veranschaulichung:

|------------------------|-------------|--------------|---------> 
tBatchStart    tStreamStart streamBatch1 streamBatch2 

Dann nehme an, dass in dem ersten Strom Batch wir diese Daten bekommen:

("2014-10-19", 1000) 

Dann habe ich die Batch-RDD mit diesem Strom RDD kombinieren möchten, so dass der Strom RDD hat jetzt den Wert:

("2014-10-19", 2001000) 

Dann nehme an, dass in der zweiten Stream-Charge w e erhielt diese Daten:

("2014-10-19", 4000) 

den Wert haben, dann sollte der Strom RDD aktualisiert werden:

("2014-10-19", 2005000) 

Und so weiter ...

Es ist möglich, streamRDD.transformToPair(...) zu verwenden, um die streamRDD zu kombinieren Daten mit den batchRDD-Daten unter Verwendung einer join, aber wenn wir dies für jeden Stream-Chunk machen, dann würden wir die Zählung von der batchRDD für jeden Stream-Chunk hinzufügen, der den Statuswert "doppelt gezählt" macht, wenn er nur dem hinzugefügt werden soll erster Strombrocken.

Antwort

4

diesen Fall zu begegnen, würde ich Gewerkschaft die Basis mit dem Ergebnis der aggregierten StateDStream RDD, die die Summen der Streaming-Daten halten. Dies stellt effektiv eine Grundlinie für Daten bereit, die in jedem Streaming-Intervall berichtet werden, ohne die x-Zeiten der Grundlinie zu zählen.

Ich habe diese Idee mit dem Beispiel WordCount versucht und es funktioniert.Fallen diese auf dem REPL für ein anschauliches Beispiel:

(Verwendung nc -lk 9876 auf einer separate Schale Eingang zum socketTextStream zur Verfügung zu stellen)

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.storage.StorageLevel 

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7) 
val defaultRdd = sc.parallelize(defaults) 

@transient val ssc = new StreamingContext(sc, Seconds(10)) 
ssc.checkpoint("/tmp/spark") 

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _) 
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0)) 
} 
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey(_+_) 

wordCount.print() 
historicCount.print() 
runningTotal.print() 
ssc.start() 
+1

Danke. Ich möchte nur hinzufügen, dass anstelle der Verwendung von 'rdd.union (defaultRdd)' in der Transformation endete ich mit 'rdd.leftOuterJoin (defaultRdd)' nur so das 'runningTotal' nicht Paare, die nicht geändert wurden. Dann muss ich nur die Paare speichern, deren Werte sich geändert haben. – Bobby

0

Sie updateStateByKey einen Versuch geben könnten:

def main(args: Array[String]) { 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 
     val previousCount = state.getOrElse(0) 
     Some(currentCount + previousCount) 
    } 

    // stream 
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1)) 
    ssc.checkpoint(".") 
    val lines = ssc.socketTextStream("127.0.0.1", 9999) 
    val words = lines.flatMap(_.split(" ")) 
    val pairs = words.map(word => (word, 1)) 
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc) 
    stateWordCounts.print() 
    ssc.start() 
    ssc.awaitTermination() 
} 
+0

ich schon bin mit ihm. Das Problem ist, dass, wenn der optionale Statuswert null ist, ich einen Standardwert haben muss. Idealerweise wäre dies der Wert, der aus der Chargen-RDD berechnet wird. Das Problem besteht darin, dass 'updateStateByKey()' den Schlüssel nicht übergibt. Daher kann ich nicht nach dem Wert suchen, der aus der Batch-RDD berechnet wurde. – Bobby