2016-04-27 11 views
0

Ich habe ein einfaches Programm, weil ich versuche, Daten mit kafka zu empfangen. Wenn ich einen Kafka-Producer starte und Daten sende, zum Beispiel: "Hallo", bekomme ich das, wenn ich die Nachricht drucke: (null, Hello). Und ich weiß nicht, warum diese Null erscheint. Gibt es eine Möglichkeit, diese Null zu vermeiden? Ich denke es liegt an Tuple2<String, String>, der erste Parameter, aber ich möchte nur den zweiten Parameter drucken. Und eine andere Sache, wenn ich das mit System.out.println("inside map "+ message); drucken es erscheint keine Nachricht, weiß jemand warum? Vielen Dank.Null Wert in Spark Streaming von Kafka

public static void main(String[] args){ 

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]"); 
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode 
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); 
    // Create the context with 2 seconds batch size 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 

    Map<String, Integer> topicMap = new HashMap<>(); 
    String[] topics = KafkaProperties.TOPIC.split(","); 
    for (String topic: topics) { 
     topicMap.put(topic, KafkaProperties.NUM_THREADS); 
    } 
    /* connection to cassandra */ 
    CassandraConnector connector = CassandraConnector.apply(sparkConf); 
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++"); 

    /* Receive kafka inputs */ 
    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap); 
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++"); 

    JavaDStream<String> lines = messages.map(
      new Function<Tuple2<String, String>, String>() { 
       public String call(Tuple2<String, String> message) { 
        System.out.println("inside map "+ message); 
        return message._2(); 
       } 
      } 
    ); 

    messages.print(); 
    jssc.start(); 
    jssc.awaitTermination(); 
} 

Antwort

0

Q1) Nullwerte: Nachrichten in Kafka verkeilt sind, das heißt, sie haben alle eine (Schlüssel, Wert) Struktur. Wenn Sie sehen, (null, Hello) ist, weil der Produzent einen (null,"Hello") Wert in einem Thema veröffentlicht. Wenn Sie den Schlüssel in Ihrem Prozess verzichten wollen, Karte die ursprüngliche Dtream den Schlüssel zu entfernen: kafkaDStream.map(new Function<String,String>() {...})

Q2) System.out.println("inside map "+ message); druckt nicht. Ein paar klassischer Gründe:

  1. Transformationen in dem Testamentsvollstrecker angelegt werden, so dass, wenn in einem Cluster ausgeführt wird, wird die Ausgabe in den Testamentsvollstrecker erscheinen und nicht auf dem Master.

  2. Operationen sind faul und DStreams müssen materialisiert werden, damit Operationen angewendet werden können.

In diesem speziellen Fall wird die JavaDStream<String> lines nie materialisiert das heißt für eine Ausgabeoperation nicht verwendet. Daher wird die map nie ausgeführt.

+0

Okay danke. Für Q1, wie kann ich den Schlüssel in Java weglassen (ich bin nicht mit Skala vertraut). Und was kann ich für Q2 tun, um die Nachricht zu drucken? Danke nochmal –

+0

Q1-> Verwenden Sie eine Kartenfunktion. Es gibt viele Beispiele herum. Q2 -> Mach dasselbe wie du für die 'messages' DStream getan hast. – maasg

+0

Okay ich suche nach der Kartenfunktion von Q1, ich hoffe etwas zu finden. Für Q2 verstehe ich dich nicht, was muss ich genau machen? –