Wie funken schreiben Nachrichten in alle Partitionen in der Kafka, so dass ich einen DirectStream verwenden und die Leistung des Streaming verbessern kann.Beim Schreiben der Nachrichten in ein Kafka-Thema mit Funken-Streaming wird nur in eine Partition schreiben
hier ist mein Code: -
object kafka {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("FlightawareSparkApp")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 18436)
val topic = "test"
val props = new java.util.Properties()
props.put("metadata.broker.list", "list")
props.put("bootstrap.servers", "list")
// props.put("bootstrap.servers", "localhost:9092")
// props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "KafkaProducer")
props.put("producer.type", "async")
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
lines.foreachRDD(rdd => {
rdd.foreachPartition(part => {
val producer = new KafkaProducer[Integer, String](props)
part.foreach(msg =>{
val record = new ProducerRecord[Integer, String](topic, msg)
producer.send(record)
})
producer.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
dieser Code Nachrichten in kafka Thema schieben, aber wenn ich die Zählung mit
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKABROKERS --topic test --time -1
bin immer Ausgang, wo sehen kann ich Sehen Sie die Nachrichten nur in einer Partition.
test:8:0
test:2:0
test:5:0
test:4:0
test:7:0
test:1:0
test:9:0
test:3:0
test:6:237629
test:0:0
Vorschläge zur Aufteilung der Daten in alle Partitionen.
Wie wird der Partitionsschlüssel standardmäßig im Programm implementiert, um die Nachrichten über die Partitionen zu verteilen.
Danke,
Ankush Reddy.
Danke für die Antwort. Wenn ich hier einen beliebigen Schlüssel übergebe, wird es funktionieren. val record = neuer ProducerRecord [Integer, String] (Thema, Schlüssel, 8, msg). –