Kontext: Ich verwende Apache Spark, um eine laufende Anzahl von verschiedenen Ereignistypen aus Protokollen zu aggregieren. Die Protokolle werden sowohl in Cassandra für historische Analysezwecke als auch in Kafka für Echtzeitanalysezwecke gespeichert. Jedes Protokoll hat einen Datums- und Ereignistyp. Nehmen wir an, ich wollte aus Gründen der Einfachheit die Anzahl der Logs eines einzelnen Typs für jeden Tag im Auge behalten.Kombinieren Sie Ergebnisse von Batch-RDD mit Streaming-RDD in Apache Spark
Wir haben zwei RDDs, eine RDD von Batch-Daten von Cassandra und eine andere Streaming RDD von Kafka. Pseudocode:
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
Frage: Wie kombiniere ich die Ergebnisse aus der streamRDD mit dem batchRDD? Lassen Sie sich sagen, dass batchRDD
folgende Daten hat und diese Arbeit wurde am 2014.10.16 laufen:
("2014-10-15", 1000000)
("2014-10-16", 2000000)
Da die Cassandra Abfrage alle nur die Daten an die Startzeit der Stapelabfrage oben enthielt, müssen wir Lesen Sie von Kafka, wenn die Abfrage beendet ist, und berücksichtigen Sie nur Protokolle nach der Startzeit des Jobs. Wir nehmen an, dass die Abfrage sehr lange dauert. Dies bedeutet, dass ich die historischen Ergebnisse mit den Streaming-Ergebnissen kombinieren muss.
Zur Veranschaulichung:
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
Dann nehme an, dass in dem ersten Strom Batch wir diese Daten bekommen:
("2014-10-19", 1000)
Dann habe ich die Batch-RDD mit diesem Strom RDD kombinieren möchten, so dass der Strom RDD hat jetzt den Wert:
("2014-10-19", 2001000)
Dann nehme an, dass in der zweiten Stream-Charge w e erhielt diese Daten:
("2014-10-19", 4000)
den Wert haben, dann sollte der Strom RDD aktualisiert werden:
("2014-10-19", 2005000)
Und so weiter ...
Es ist möglich, streamRDD.transformToPair(...)
zu verwenden, um die streamRDD zu kombinieren Daten mit den batchRDD-Daten unter Verwendung einer join
, aber wenn wir dies für jeden Stream-Chunk machen, dann würden wir die Zählung von der batchRDD für jeden Stream-Chunk hinzufügen, der den Statuswert "doppelt gezählt" macht, wenn er nur dem hinzugefügt werden soll erster Strombrocken.
Danke. Ich möchte nur hinzufügen, dass anstelle der Verwendung von 'rdd.union (defaultRdd)' in der Transformation endete ich mit 'rdd.leftOuterJoin (defaultRdd)' nur so das 'runningTotal' nicht Paare, die nicht geändert wurden. Dann muss ich nur die Paare speichern, deren Werte sich geändert haben. – Bobby