0

Wie funken schreiben Nachrichten in alle Partitionen in der Kafka, so dass ich einen DirectStream verwenden und die Leistung des Streaming verbessern kann.Beim Schreiben der Nachrichten in ein Kafka-Thema mit Funken-Streaming wird nur in eine Partition schreiben

hier ist mein Code: -

object kafka { 
def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("FlightawareSparkApp") 
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    val ssc = new StreamingContext(sparkConf, Seconds(3)) 
    val lines = ssc.socketTextStream("localhost", 18436) 
         val topic = "test" 
         val props = new java.util.Properties() 
         props.put("metadata.broker.list", "list") 
         props.put("bootstrap.servers", "list") 
         // props.put("bootstrap.servers", "localhost:9092") 
         // props.put("bootstrap.servers", "localhost:9092") 
         props.put("client.id", "KafkaProducer") 
         props.put("producer.type", "async") 
         props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer") 
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
      lines.foreachRDD(rdd => { 
       rdd.foreachPartition(part => { 
        val producer = new KafkaProducer[Integer, String](props) 
        part.foreach(msg =>{ 
         val record = new ProducerRecord[Integer, String](topic, msg) 
         producer.send(record) 
        }) 
        producer.close() 
       }) 
      }) 
      ssc.start() 
      ssc.awaitTermination() 
} 

}

dieser Code Nachrichten in kafka Thema schieben, aber wenn ich die Zählung mit

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKABROKERS --topic test --time -1 

bin immer Ausgang, wo sehen kann ich Sehen Sie die Nachrichten nur in einer Partition.

test:8:0 
test:2:0 
test:5:0 
test:4:0 
test:7:0 
test:1:0 
test:9:0 
test:3:0 
test:6:237629 
test:0:0 

Vorschläge zur Aufteilung der Daten in alle Partitionen.

Wie wird der Partitionsschlüssel standardmäßig im Programm implementiert, um die Nachrichten über die Partitionen zu verteilen.

Danke,

Ankush Reddy.

Antwort

2

Es ist, weil Sie den Schlüssel nicht einstellen. Sie finden die folgenden Details in Kafka FAQ [1].

Warum werden Daten nicht gleichmäßig auf Partitionen verteilt, wenn kein Partitionierungsschlüssel angegeben wird?

Im Kafka-Producer kann ein Partitionsschlüssel angegeben werden, um die Zielpartition der Nachricht anzugeben. Standardmäßig wird ein Hashing-basierter Partitionierer verwendet, um die Partitions-ID für den Schlüssel zu ermitteln, und Benutzer können auch angepasste Partitionierer verwenden.

Um die Anzahl der offenen Sockets in 0.8.0 (https://issues.apache.org/jira/browse/KAFKA-1017) zu reduzieren, wenn ein Partitionierungsschlüssel nicht oder null ist, wird ein Produzent eine zufällige Partition auswählen und sich für einige Zeit (Standard ist 10 Minuten) davor zu einem anderen wechseln. Wenn zu einem bestimmten Zeitpunkt weniger Hersteller als Partitionen vorhanden sind, erhalten einige Partitionen möglicherweise keine Daten. Um dieses Problem zu beheben, kann entweder das Aktualisierungsintervall für die Metadaten reduziert oder ein Nachrichtenschlüssel und ein benutzerdefinierter zufälliger Partitionierer angegeben werden. Genauere Angaben finden Sie dieses Themas http://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E

[1] https://cwiki.apache.org/confluence/display/KAFKA/FAQ

+0

Danke für die Antwort. Wenn ich hier einen beliebigen Schlüssel übergebe, wird es funktionieren. val record = neuer ProducerRecord [Integer, String] (Thema, Schlüssel, 8, msg). –

Verwandte Themen