2017-06-13 9 views
3

Ich benutze unten Code, um von Kafka Thema zu lesen, und verarbeiten Sie die Daten.Kafkaconsumer ist nicht sicher für Multithreading-Zugriff

JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record)) 
       .transform(new Function<JavaRDD<Row>, JavaRDD<Row>>() { 
        //JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD(); 
        StructType schema = DataTypes.createStructType(fields); 

        public JavaRDD<Row> call(JavaRDD<Row> rdd) throws Exception { 
         records = rdd.union(records); 
         return rdd; 
        } 
     }); 

     transformedMessages.foreachRDD(record -> { 
      //System.out.println("Aman" +record.count()); 
      StructType schema = DataTypes.createStructType(fields); 

      Dataset ds = ss.createDataFrame(records, schema); 
      ds.createOrReplaceTempView("trades"); 
      System.out.println(ds.count()); 
      ds.show(); 

     }); 

während der Code ausgeführt wird, i unter Ausnahme bin immer:

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access 
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1197) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) 
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) 
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 

Die Tatsache, dass ich nur ein DSTREAM haben, bin ich nicht sicher, warum ich diese Ausnahme immer bin. Ich lese von 3 Partitionen in einem Kafka-Thema. Ich nehme an, dass der "createDirectStream" 3 Verbraucher erstellen wird, um die Daten zu lesen.

Unten ist der Code für für KafkaConsumer erwerben Methode:

private void acquire() { 
     this.ensureNotClosed(); 
     long threadId = Thread.currentThread().getId(); 
     if(threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) { 
      throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); 
     } else { 
      this.refcount.incrementAndGet(); 
     } 
    } 
+0

Dies ist seltsam. Laufen Sie lokal oder ein Cluster? Wenn Cluster, welche Art? Könnten Sie den Code, in dem der Stream erstellt wird, und die Implementierung von 'processData' hinzufügen? – maasg

+0

Es scheint, es ist ein böser Bug: https://issues.apache.org/jira/browse/SPARK-19185 – maasg

+0

Ich bin auf lokalen, aber das Kafka-Thema ist zentralisiert. Die Methode "processData" deserialisiert gerade die Nachrichten, die wir im Stream erhalten. Nach meinem Verständnis liest ein Verbraucher von einer Kafka-Partition. In diesem Fall greifen entweder mehrere Konsumenten auf die gleiche Kafka-Partition zu oder die Konsumenten werden gemischt. –

Antwort

0

Funken 2.2.0 hat eine Abhilfe keinen Cache verwenden. Verwenden Sie einfach spark.streaming.kafka.consumer.cache.enabled zu false. Werfen Sie einen Blick auf diese pull Anfrage

Verwandte Themen