2

Ich konnte Kafka Struktur Streaming-Programmierung zuvor ausführen. Aber plötzlich versagen alle meine Struktur-Streaming-Python-Programme mit einem Fehler. Ich nahm grundlegende Kafka-Struktur-Streaming-Programmierung von Spark-Website, die auch mit demselben Fehler fehlschlägt.Kafka Structured Streaming Fehler

py4j.protocol.Py4JJavaError: An error occurred while calling o31.load. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)

Funken einreichen Ich bin mit

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 C:\Users\ranjith.gangam\PycharmProjects\sparktest\Structured_streaming.py

Dies ist ein Code, den ich von Spark-Github nahm

spark = SparkSession\ 
     .builder\ 
     .appName("StructuredKafkaWordCount")\ 
     .getOrCreate() 

    # Create DataSet representing the stream of input lines from kafka 
    lines = spark\ 
     .readStream\ 
     .format("kafka")\ 
     .option("kafka.bootstrap.servers", bootstrapServers)\ 
     .option(subscribeType, topics)\ 
     .load()\ 
     .selectExpr("CAST(value AS STRING)") 

    words = lines.select(
     # explode turns each item in an array into a separate row 
     explode(
      split(lines.value, ' ') 
     ).alias('word') 
    ) 

    # Generate running word count 
    wordCounts = words.groupBy('word').count() 
    # Start running the query that prints the running counts to the console 
    query = wordCounts\ 
     .writeStream\ 
     .outputMode('complete')\ 
     .format('console')\ 
     .start() 

    query.awaitTermination() 

Antwort

0

Sie sind in der richtigen Art und Weise, aber leider Kafka 0,10 wird noch nicht unterstützt von PySpark noch nicht. Wie Sie in der SPARK-16534 sehen können.

Die einzige Unterstützung für pySpark ist Kafka 0.8 bis jetzt. Sie können also auf den Wert 0.8 migrieren oder Ihren Code in Scala ändern.

+0

Es funktioniert nach dem Hinzufügen des Pakets spark-streaming-kafka-0-10-assembly_2.10: 2.2.0 –

Verwandte Themen