1

So verwenden: - Funken Structured Streaming (2.1.0) - Kafka 0.10.2.0 - Scala 2.11Spark-Streaming Illegal: Dieser Verbraucher wurde bereits geschlossen

ich mit dem Standard-API von kafka bin raubend, also grundsätzlich:

val df = spark.readStream 
    .format("kafka") 
    .option(...) 

Einrichten der Optionen (über SSL) und alles. Dann setze ich einige Aktionen usw. ein und starte den Stream usw. (es läuft einwandfrei). Von Zeit zu Zeit löst es jedoch eine Ausnahme aus:

17/05/30 11:05:23 WARN TaskSetManager: Lost task 23.0 in stage 77.0 (TID 3329, spark-worker-3, executor 0): java.lang.IllegalStateException: This consumer has already been closed. 
at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1611) 
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1622) 
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198) 
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:278) 
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:177) 
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:89) 
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:147) 
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:136) 
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52) 
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) 
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

Alle Tipps, warum könnte dies fehlschlagen?

Antwort

0

https://issues.apache.org/jira/browse/SPARK-18682 repariert es bei der Implementierung der Batch-Kafka-Quelle. Sie sollten es in Spark 2.1.1 nicht sehen können. Wenn dieser Fehler weiterhin in Spark 2.1.1 angezeigt wird, erstellen Sie ein Spark-Ticket unter https://issues.apache.org/jira/browse/SPARK

+0

Könnten Sie meine andere Nachricht auf den Beitrag überprüfen? Vielen Dank im Voraus – buggy

+0

Hallo nochmal, ich bekomme es immer noch in 2.2.0. Ich habe ein Problem geöffnet und es wurde abgelehnt. Könnten Sie mir helfen? ist ein bisschen blockiert für mich atm: https://issues.apache.org/jira/browse/SPARK-21453 – buggy

Verwandte Themen