2017-09-21 1 views
0

Ich habe Kafka Verbraucher mit spring-kafka lib implementiert. Ich habe ein Kafka-Thema mit 2 Partitionen und auch ich verwende ConcurrentKafkaListenerContainerFactory mit Gleichzeitigkeitsstufe auf 2 als Ergebnis festgelegt, jede Containerinstanz sollte von der einzelnen Partition gemäß Spring-Kafka documentation konsumieren.Frühling-Kafka Hörer Konkurenz

Der KafkaMessageListenerContainer empfängt alle Nachrichten von allen Themen/Partitionen in einem einzigen Thread. Der ConcurrentMessageListenerContainer delegiert an 1 oder mehr KafkaMessageListenerContainers, um Multithread-Verbrauch bereitzustellen.

Es ist meine Consumer-Klasse:

@Component 
public class KafkaConsumer { 
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>(); 

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group") 
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException { 
     String message = record.value().toString(); 
     Event event = EventFactory.createEvent(message); 
     String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID); 
     // add event to hashMap 
     LinkedBlockingQueue<Event> queue = hashMap.get(customerId); 
     if (queue == null) { 
      queue = new LinkedBlockingQueue<>(); 
      queue.add(event); 
      hashMap.put(customerId, queue); 
     } else { 
      queue.add(event); 
     } 
    } 
} 

Wie Sie sehen, ich 'Hashmap Sammlung haben, dass ich meine Ereignisse in einer entsprechenden Warteschlange auf Nachricht basiert setzen ‚CUSTOMER_ID‘ Attribut. Diese Funktionalität erfordert eine zusätzliche Synchronisation im Falle des Zugriffs mit mehreren Threads. Wie ich sehe, erstellt Spring-kafka nur eine Bean-Instanz für alle Container anstelle einer separaten Bean-Instanz für jeden Container, um Probleme mit der Parallelität zu vermeiden.

Wie kann ich diese Logik programmgesteuert ändern?

Ich sehe die einzige seltsame Möglichkeit, dieses Problem zu beheben, ist die Verwendung von zwei JVMs eine separate Anwendung mit Single-Threaded Consumer drin, als Ergebnis der Zugriff auf KafkaConsumer-Klasse mit # receive-Methode wird single-threaded sein.

Antwort

1

Das ist richtig. So funktioniert es. Das Framework stützt sich wirklich nicht auf eine Bean, sondern nur auf seine Methode, Nachrichten an die Funktion zu senden.

Sie können zwei @KafkaListener Methoden für jede Partition in Ihrem Thema verwenden. Das stimmt, dass Datensätze von einer Partition an die @KafkaListener in einem einzigen Thread geliefert werden. Also, wenn Sie wirklich nicht mit diesem Zustand leben können, können Sie zwei HashMap für jeden Thread verwenden.

Die allgemeine Idee hinter dieser Hörer Abstraktion ist genau statesless Verhalten. Das KafkaConsumer ist die reguläre Feder Singleton Bean. Sie müssen damit leben und Ihre Lösung entsprechend dieser Situation neu gestalten.

+0

thx für die Erklärung – MeetJoeBlack