Ich habe ein einfaches Programm, weil ich versuche, Daten mit kafka
zu empfangen. Wenn ich einen Kafka-Producer starte und Daten sende, zum Beispiel: "Hallo", bekomme ich das, wenn ich die Nachricht drucke: (null, Hello)
. Und ich weiß nicht, warum diese Null erscheint. Gibt es eine Möglichkeit, diese Null zu vermeiden? Ich denke es liegt an Tuple2<String, String>
, der erste Parameter, aber ich möchte nur den zweiten Parameter drucken. Und eine andere Sache, wenn ich das mit System.out.println("inside map "+ message);
drucken es erscheint keine Nachricht, weiß jemand warum? Vielen Dank.Null Wert in Spark Streaming von Kafka
public static void main(String[] args){
SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
// Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = KafkaProperties.TOPIC.split(",");
for (String topic: topics) {
topicMap.put(topic, KafkaProperties.NUM_THREADS);
}
/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sparkConf);
System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");
/* Receive kafka inputs */
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");
JavaDStream<String> lines = messages.map(
new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> message) {
System.out.println("inside map "+ message);
return message._2();
}
}
);
messages.print();
jssc.start();
jssc.awaitTermination();
}
Okay danke. Für Q1, wie kann ich den Schlüssel in Java weglassen (ich bin nicht mit Skala vertraut). Und was kann ich für Q2 tun, um die Nachricht zu drucken? Danke nochmal –
Q1-> Verwenden Sie eine Kartenfunktion. Es gibt viele Beispiele herum. Q2 -> Mach dasselbe wie du für die 'messages' DStream getan hast. – maasg
Okay ich suche nach der Kartenfunktion von Q1, ich hoffe etwas zu finden. Für Q2 verstehe ich dich nicht, was muss ich genau machen? –