So verwenden: - Funken Structured Streaming (2.1.0) - Kafka 0.10.2.0 - Scala 2.11Spark-Streaming Illegal: Dieser Verbraucher wurde bereits geschlossen
ich mit dem Standard-API von kafka bin raubend, also grundsätzlich:
val df = spark.readStream
.format("kafka")
.option(...)
Einrichten der Optionen (über SSL) und alles. Dann setze ich einige Aktionen usw. ein und starte den Stream usw. (es läuft einwandfrei). Von Zeit zu Zeit löst es jedoch eine Ausnahme aus:
17/05/30 11:05:23 WARN TaskSetManager: Lost task 23.0 in stage 77.0 (TID 3329, spark-worker-3, executor 0): java.lang.IllegalStateException: This consumer has already been closed.
at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1611)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1622)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:278)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:177)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:89)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:147)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:136)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Alle Tipps, warum könnte dies fehlschlagen?
Könnten Sie meine andere Nachricht auf den Beitrag überprüfen? Vielen Dank im Voraus – buggy
Hallo nochmal, ich bekomme es immer noch in 2.2.0. Ich habe ein Problem geöffnet und es wurde abgelehnt. Könnten Sie mir helfen? ist ein bisschen blockiert für mich atm: https://issues.apache.org/jira/browse/SPARK-21453 – buggy