3

Die Doc Kafka einen Ansatz gibt etwa mit folgenden beschrieben:Wie Multithread-Verbraucher in Kafka 0.9.0 verwenden?

Einen Verbraucher pro Thema: Eine einfache Möglichkeit ist, die jeweils ihre eigenen Verbraucher> Instanz geben fädelt.

Mein Code:

public class KafkaConsumerRunner implements Runnable { 

    private final AtomicBoolean closed = new AtomicBoolean(false); 
    private final CloudKafkaConsumer consumer; 
    private final String topicName; 

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) { 
     this.consumer = consumer; 
     this.topicName = topicName; 
    } 

    @Override 
    public void run() { 
     try { 
      this.consumer.subscribe(topicName); 
      ConsumerRecords<String, String> records; 
      while (!closed.get()) { 
       synchronized (consumer) { 
        records = consumer.poll(100); 
       } 
       for (ConsumerRecord<String, String> tmp : records) { 
        System.out.println(tmp.value()); 
       } 
      } 
     } catch (WakeupException e) { 
      // Ignore exception if closing 
      System.out.println(e); 
      //if (!closed.get()) throw e; 
     } 
    } 

    // Shutdown hook which can be called from a separate thread 
    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

    public static void main(String[] args) { 
     CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder() 
       .withBootstrapServers("172.31.1.159:9092") 
       .withGroupId("test") 
       .build(); 
     ExecutorService executorService = Executors.newFixedThreadPool(5); 
     executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log")); 
     executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info")); 
     executorService.shutdown(); 
    } 
} 

aber es funktioniert nicht und löst eine Ausnahme:

java.util.ConcurrentModificationException: KafkaConsumer ist nicht sicher für den Zugang Multi-Threaded

Außerdem habe ich die Quelle von Flink gelesen (eine Open-Source-Plattform für verteilte Datenströme an d Batch-Datenverarbeitung). Flink mit Multithread-Consumer ist meinem ähnlich.

long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT))); 
pollLoop: while (running) { 
    ConsumerRecords<byte[], byte[]> records; 
    //noinspection SynchronizeOnNonFinalField 
    synchronized (flinkKafkaConsumer.consumer) { 
     try { 
      records = flinkKafkaConsumer.consumer.poll(pollTimeout); 
     } catch (WakeupException we) { 
      if (running) { 
       throw we; 
      } 
      // leave loop 
      continue; 
     } 
    } 

flink code of mutli-thread

Was ist los?

Antwort

6

Kafka-Verbraucher ist nicht Thread sicher. Wie Sie in Ihrer Frage darauf hingewiesen, erklärt das Dokument, dass

Eine einfache Möglichkeit ist es, jedem Thread seine eigene Verbraucher Instanz

Aber in Ihrem Code zu geben, können Sie die gleichen Verbraucher Beispiel durch unterschiedliche gewickelt haben KafkaConsumerRunner-Instanzen. Daher greifen mehrere Threads auf dieselbe Consumer-Instanz zu. Die Kafka-Dokumentation zeigt deutlich

Der Kafka-Verbraucher ist NICHT Thread-sicher. Alle Netzwerk-E/A finden im Thread der Anwendung statt, die den Anruf tätigt. Es liegt in der Verantwortung des Benutzers , sicherzustellen, dass der Multithread-Zugriff ordnungsgemäß synchronisiert ist ( ). Unsynchronisierter Zugriff führt zu ConcurrentModificationException.

Das ist genau die Ausnahme, die Sie erhalten haben.

2

Es wirft die Ausnahme auf Ihren Anruf zu abonnieren. this.consumer.subscribe(topicName);

bewegen, die wie folgt in einem synchronisierten Block Block:

@Override 
public void run() { 
    try { 
     synchronized (consumer) { 
      this.consumer.subscribe(topicName); 
     } 
     ConsumerRecords<String, String> records; 
     while (!closed.get()) { 
      synchronized (consumer) { 
       records = consumer.poll(100); 
      } 
      for (ConsumerRecord<String, String> tmp : records) { 
       System.out.println(tmp.value()); 
      } 
     } 
    } catch (WakeupException e) { 
     // Ignore exception if closing 
     System.out.println(e); 
     //if (!closed.get()) throw e; 
    } 
} 
+0

Für mich arbeiten. – Prasath

2

Vielleicht ist nicht der Fall, aber wenn Sie die Verarbeitung von Daten von Runden Themen werden mergin, dann können Sie Daten aus mehreren Themen mit dem Lese gleicher Verbraucher. Wenn nicht, dann ist es vorzuziehen, getrennte Jobs zu erstellen, die jedes Thema verbrauchen.