Ich habe ein Programm, das mit Spark JavaStreamingContext
arbeitet. Ich habe gelernt, dass bei der Verwendung von DStreams nur wenige Ausgabefunktionen zugelassen sind, wie zB print()
. Dies ist ein Stück CodeAusführen von Abfragen in einem JavaSparkStreamingContext
private static void analyzeHashtags() throws InterruptedException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
jssc.start();
jssc.awaitTermination();
}
Nun möchte Ich mag Abfragen Operation an diesem Code hinzufügen, wie unten:
private static void analyzeHashtags() throws InterruptedException, SQLException {
JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.print();
String hashtag = "#dummy"; int frequencies = 59;
String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})";
st.executeUpdate(cql);
jssc.start();
jssc.awaitTermination();
}
Aber dieser Code ausgeführt wird nur die Abfrage einmal. Ich möchte es bei jedem Looping ausführen. Wie ist es möglich, dies zu machen? Danke im Voraus.
Dank für die vollständige und nützliche Antwort. Ich weiß einfach nicht, wie man den 'foreachRDD' Teil in Java (IDK Scala) implementiert. Mit einem Lambda-Ausdruck, sollte ich 'Linien schreiben.foreRDD (rdd -> (...' mit einer Funktion anstelle von Punkten? – sirdan
Ich persönlich empfehle Scala mit Spark-Streaming. Für die Java-Übersetzung der 'foreachRDD' Lambda, Ich denke, Sie finden ein Beispiel im Spark Streaming Beispiel-Paket, zB: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/ JavaSqlNetworkWordCount.java – maasg
Vielen Dank, das hilft sehr – sirdan