Ich versuche parallel das Lesen von Kafka-Nachrichten parallel zu verarbeiten. Mein Kafka-Thema hat 10 Partitionen. Ich versuche, 5 DStreams zu erstellen und Union
Methode anzuwenden, um auf einem einzelnen DStream zu arbeiten. Hier ist der Code, den ich bisher versucht:Lesen und Verarbeiten der Parallelität in Kafka Spark streaming
def main(args: scala.Array[String]): Unit = {
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[2]").setAppName("KafkaStream")
val ssc = new StreamingContext(streamConf, Seconds(1))
// println("defaultParallelism: "+ssc.sparkContext.defaultParallelism)
ssc.sparkContext.setLogLevel("WARN")
val numPartitionsOfInputTopic = 5
val group_id = Random.alphanumeric.take(4).mkString("consumer_group")
val kafkaStream = {
val kafkaParams = Map("zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> group_id,
"zookeeper.connection.timeout.ms" -> "3000")
val streams = (1 to numPartitionsOfInputTopic).map { _ =>
KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaParams, Map("kafka_topic" -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
}
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism = 5
unifiedStream.repartition(sparkProcessingParallelism)
}
kafkaStream.foreachRDD { x =>
x.foreach {
msg => println("Message: "+msg)
processMessage(msg)
}
}
ssc.start()
ssc.awaitTermination()
}
Bei der Ausführung, es ist nicht einmal eine einzige Nachricht empfangen, geschweige denn es weiter verarbeitet wird. Fehle ich hier etwas? Bitte schlagen Sie für Änderungen vor, falls erforderlich. Vielen Dank.
Hallo Gaweda, Danke, dass du eine Alternative vorgeschlagen hast. Ich habe gerade Direct Stream-Methode versucht. Trotzdem werden die Nachrichten nacheinander abgearbeitet. Die Verarbeitungszeit ist tatsächlich weniger als eine Sekunde. Um es zu testen, hatte ich 'Thread.sleep (10000)' in die Nachrichtenverarbeitungsfunktion eingefügt. Hier ist die Karte von KafkaParams, die ich formiere: 'val kafkaParams = Karte (" metadata.broker.list "-> localhost: 9092, " group.id "->" dsdc ", " auto.offset. reset "->" large ")' Muss ich hier etwas ändern? – Arjun