0

Ich versuche parallel das Lesen von Kafka-Nachrichten parallel zu verarbeiten. Mein Kafka-Thema hat 10 Partitionen. Ich versuche, 5 DStreams zu erstellen und Union Methode anzuwenden, um auf einem einzelnen DStream zu arbeiten. Hier ist der Code, den ich bisher versucht:Lesen und Verarbeiten der Parallelität in Kafka Spark streaming

def main(args: scala.Array[String]): Unit = { 

    val properties = readProperties() 

    val streamConf = new SparkConf().setMaster("local[2]").setAppName("KafkaStream") 
    val ssc = new StreamingContext(streamConf, Seconds(1)) 
    // println("defaultParallelism: "+ssc.sparkContext.defaultParallelism) 
    ssc.sparkContext.setLogLevel("WARN") 
    val numPartitionsOfInputTopic = 5 
    val group_id = Random.alphanumeric.take(4).mkString("consumer_group") 
    val kafkaStream = { 
     val kafkaParams = Map("zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"), 
          "group.id" -> group_id, 
          "zookeeper.connection.timeout.ms" -> "3000")      

     val streams = (1 to numPartitionsOfInputTopic).map { _ => 
          KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
          ssc, kafkaParams, Map("kafka_topic" -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) 
     } 
    val unifiedStream = ssc.union(streams) 
    val sparkProcessingParallelism = 5 
    unifiedStream.repartition(sparkProcessingParallelism) 
    } 

    kafkaStream.foreachRDD { x => 
    x.foreach {  
     msg => println("Message: "+msg) 
     processMessage(msg) 
    }  
    } 

    ssc.start() 
    ssc.awaitTermination() 
} 

Bei der Ausführung, es ist nicht einmal eine einzige Nachricht empfangen, geschweige denn es weiter verarbeitet wird. Fehle ich hier etwas? Bitte schlagen Sie für Änderungen vor, falls erforderlich. Vielen Dank.

Antwort

0

Ich empfehle dringend, zu Direct Stream zu wechseln. Warum?

Direct Stream legt die Parallelität standardmäßig auf die Anzahl der Partitionen fest, die Sie in Kafka haben. Nichts mehr getan werden muss - nur Direct Stream erstellen und tun Sie Ihre Arbeit :)

Wenn Sie 5 DStreams erstellen, wird standardmäßig eingelesen 5 Faden, eine nicht-Direct-DSTREAM = ein Thread

+0

Hallo Gaweda, Danke, dass du eine Alternative vorgeschlagen hast. Ich habe gerade Direct Stream-Methode versucht. Trotzdem werden die Nachrichten nacheinander abgearbeitet. Die Verarbeitungszeit ist tatsächlich weniger als eine Sekunde. Um es zu testen, hatte ich 'Thread.sleep (10000)' in die Nachrichtenverarbeitungsfunktion eingefügt. Hier ist die Karte von KafkaParams, die ich formiere: 'val kafkaParams = Karte (" metadata.broker.list "-> localhost: 9092, " group.id "->" dsdc ", " auto.offset. reset "->" large ")' Muss ich hier etwas ändern? – Arjun

Verwandte Themen