2017-06-16 3 views
2

Ich habe ein Programm, das mit Spark JavaStreamingContext arbeitet. Ich habe gelernt, dass bei der Verwendung von DStreams nur wenige Ausgabefunktionen zugelassen sind, wie zB print(). Dies ist ein Stück CodeAusführen von Abfragen in einem JavaSparkStreamingContext

private static void analyzeHashtags() throws InterruptedException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 

} 

Nun möchte Ich mag Abfragen Operation an diesem Code hinzufügen, wie unten:

private static void analyzeHashtags() throws InterruptedException, SQLException { 
    JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics); 
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
    lines.print(); 
    String hashtag = "#dummy"; int frequencies = 59; 
    String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})"; 
    st.executeUpdate(cql); 
    jssc.start(); 
    jssc.awaitTermination(); 
} 

Aber dieser Code ausgeführt wird nur die Abfrage einmal. Ich möchte es bei jedem Looping ausführen. Wie ist es möglich, dies zu machen? Danke im Voraus.

Antwort

2

Um beliebige Operationen auf einem DStream auszuführen, verwenden wir foreachRDD. Es bietet Zugriff auf die Daten bei jedem Batch-Intervall, dargestellt durch die zugrunde liegende RDD.

Java/Scala Pseudo (mix) Code:

JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new 
Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc); 
lines.foreachRDD{ rdd => 
    .. do something with the RDD here... 
} 

Normalerweise arbeitet der do something auf die Daten des RDD. Wir können diese Daten entweder verteilt verarbeiten, indem wir RDD-Funktionen wie foreachPartition verwenden.

Aber da Sie hier eine lokale neo4j-Verbindung verwenden und die Daten in jedem Streaming-Intervall nicht sehr groß sind, können wir die Daten für den Treiber sammeln und die Operation lokal durchführen. Es scheint, dass wäre ein Sitz in diesem Fall, da die Daten bereits verteilt bestanden haben eine Verringerung Phase (reduceBykey)

Also, der foreachRDD Teil würde:

lines.foreachRDD{ rdd => 
    val localDataCollection = rdd.collect 
    localDataCollection.foreach{ keywordFreqPair => 
     val cql = "CREATE (n:Hashtag {name:'"+keywordFreqPair._1+"', freq:"+keywordFreqPair._2+"})" 
     st.executeUpdate(cql) 
} 
+0

Dank für die vollständige und nützliche Antwort. Ich weiß einfach nicht, wie man den 'foreachRDD' Teil in Java (IDK Scala) implementiert. Mit einem Lambda-Ausdruck, sollte ich 'Linien schreiben.foreRDD (rdd -> (...' mit einer Funktion anstelle von Punkten? – sirdan

+1

Ich persönlich empfehle Scala mit Spark-Streaming. Für die Java-Übersetzung der 'foreachRDD' Lambda, Ich denke, Sie finden ein Beispiel im Spark Streaming Beispiel-Paket, zB: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg

+0

Vielen Dank, das hilft sehr – sirdan

Verwandte Themen