Ich konnte Kafka Struktur Streaming-Programmierung zuvor ausführen. Aber plötzlich versagen alle meine Struktur-Streaming-Python-Programme mit einem Fehler. Ich nahm grundlegende Kafka-Struktur-Streaming-Programmierung von Spark-Website, die auch mit demselben Fehler fehlschlägt.Kafka Structured Streaming Fehler
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:376) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)
Funken einreichen Ich bin mit
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 C:\Users\ranjith.gangam\PycharmProjects\sparktest\Structured_streaming.py
Dies ist ein Code, den ich von Spark-Github nahm
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.selectExpr("CAST(value AS STRING)")
words = lines.select(
# explode turns each item in an array into a separate row
explode(
split(lines.value, ' ')
).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.start()
query.awaitTermination()
Es funktioniert nach dem Hinzufügen des Pakets spark-streaming-kafka-0-10-assembly_2.10: 2.2.0 –