4

Ich habe zwei Kafka Consumer ConsumerA und ConsumerB. Ich möchte diese zwei Kafka-Konsumenten unabhängig voneinander auf derselben Maschine laufen lassen. Es gibt überhaupt keine Beziehung zwischen ihnen. Diese beiden Kafka-Konsumenten werden zu verschiedenen Themen auf derselben Maschine arbeiten.Wie können mehrere Kafka-Konsumenten auf derselben Box unabhängig voneinander ausgeführt werden?

  • Jeder Verbraucher sollte ein anderes Properties-Objekt haben.
  • Jeder Verbraucher sollte eine andere Thread-Pool-Konfiguration haben, da sie bei Bedarf unabhängig von anderen Konsumenten auf Multithreading-Weise (Verbrauchergruppe) ausgeführt werden können.

Unten ist mein Design:

Consumer-Klasse (Auszug):

public abstract class Consumer implements Runnable { 
    private final Properties consumerProps; 
    private final String consumerName; 

    public Consumer(String consumerName, Properties consumerProps) { 
     this.consumerName = consumerName; 
     this.consumerProps = consumerProps; 
    } 

    protected abstract void shutdown(); 
    protected abstract void run(String consumerName, Properties consumerProps); 

    @Override 
    public final void run() { 
     run(consumerName, consumerProps); 
    } 
} 

ConsumerA Klasse:

public class ConsumerA extends Consumer { 
    private final AtomicBoolean closed = new AtomicBoolean(false); 
    private KafkaConsumer<byte[], byte[]> consumer; 

    public ConsumerA(String consumerName, Properties consumerProps) { 
     super(consumerName, consumerProps); 
    } 

    @Override 
    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

    @Override 
    protected void run(String consumerName, Properties consumerProps) { 
     consumer = new KafkaConsumer<>(consumerProps); 
     consumer.subscribe(getTopicsBasisOnConsumerName()); 

     Map<String, Object> config = new HashMap<>(); 
     config.put(Config.URLS, TEST_URL); 
     GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config); 

     try { 
      while (!closed.get()) { 
       ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE); 
       for (ConsumerRecord<byte[], byte[]> record : records) { 
        GenericRecord payload = decoder.decode(record.value()); 
        // extract data from payload 
        System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", 
             record.topic(), record.partition(), record.offset(), record.key(), record.value()); 
       } 
       consumer.commitAsync(); 
      } 
     } catch (WakeupException ex) { 
      // Ignore exception if closing 
      System.out.println("error= ", ex); 
      if (!closed.get()) throw e;    
     } catch (Exception ex) { 
      System.out.println("error= ", ex);  
     } finally { 
      try { 
       consumer.commitSync(); 
      } finally { 
       consumer.close(); 
      } 
     } 
    } 
} 

ConsumerA B-Klasse:

// similar to `ConsumerA` but with specific details of B 

ConsumerHandler Klasse:

public final class ConsumerHandler { 
    private final ExecutorService executorServiceConsumer; 
    private final Consumer consumer; 
    private final List<Consumer> consumers = new ArrayList<>(); 

    public ConsumerHandler(Consumer consumer, int poolSize) { 
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize); 
    this.consumer = consumer; 
    for (int i = 0; i < poolSize; i++) { 
     this.consumers.add(consumer); 
     executorServiceConsumer.submit(consumer); 
    } 
} 
    public void shutdown() { 
    Runtime.getRuntime().addShutdownHook(new Thread() { 
     @Override 
     public void run() { 
     for (Consumer consumer : consumers) { 
      consumer.shutdown(); 
     } 
     executorServiceConsumer.shutdown(); 
     try { 
      executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } 
     } 
    }); 
    } 
} 

Unten ist meine Hauptklasse in einem meinem Projekt, bei dem, wenn ich meine Server starten, wird kommen, ruft zunächst automatisch und von diesem Ort, wo ich meint alle kafka starten Verbraucher, wo ich meine ConsumerA und ConsumerB ausführen. Und sobald Shutdown aufgerufen wird, gebe ich alle Ressourcen frei, indem ich bei allen meinen Kafka-Konsumenten den Shutdown anrufe.

Ist das der richtige Entwurf für diese Art von Problem, wo ich mehrere kafka Verbraucher auf der gleichen Box laufen lassen möchte? Lassen Sie mich wissen, ob es eine bessere und effizientere Lösung für dieses Problem gibt. Im Allgemeinen werde ich drei oder vier Kafka-Konsumenten maximal auf der gleichen Box betreiben und jeder Konsument kann bei Bedarf eine eigene Konsumentengruppe haben.

Hier ist die Javadoc für KafkaConsumer, die ich in beiden meinen Verbraucher verwende. Und basierend auf dieser article Ich habe meinen Verbraucher erstellt, es ist nur, dass ich abstrakte Klasse verwendet habe, um es zu erweitern. Suche nach "Alles zusammenfügen" in diesem Link.

In den Dokumenten wird erwähnt, dass Verbraucher nicht Thread-sicher sind, aber es sieht so aus, als ob mein Code die gleiche Verbraucherinstanz für jeden Thread im Pool wiederverwendet.

Was ist der beste Weg, um dieses Thema Threadsicherheit zu lösen und immer noch die gleichen Funktionen zu erreichen?

+1

Wenn dies Code funktioniert, gehört die Frage auf http://codereview.stackexchange.com/ – jaco0646

Antwort

-1

Versuchen Sie Apache Samza. Es löst diese Verbraucherprobleme. Kein unordentlicher (und manchmal problematischer) Umgang mit Threads, Redundanz durch Clustering, bewährte Lösung durch Billionen von bewährten verarbeiteten Messages usw. Wir führen derzeit mehrere Jobs im Cluster aus. Unser Code ist viel weniger komplex als das, was Sie hier haben.

+0

Wie hilft mir Samza hier? Es ist eine Art Wrapper, der Daten von Kafka verbraucht? – john

0

Ein schneller Vorschlag, Entschuldigung, wenn Sie schon darüber wissen. Variablen auf Klassenebene sind niemals Thread-sicher. Wenn Sie für jeden Thread ein anderes Properties-Objekt benötigen, deklarieren Sie sie lieber auf Methodenebene und stellen Sie sie als Parameter anderen Methoden zur Verfügung, auf die Sie auf das Properties-Objekt zugreifen müssen.

0

Einfachste zu lösende Lösung "Was ist der beste Weg, um dieses Thread-Sicherheitsproblem zu lösen und dennoch dieselben Funktionen zu erreichen?" :

Implementieren Sie kein Multi Threading (Thread-API/Executor-Dienst), sondern verwenden Sie jeden Consumer als einen einzelnen Consumer in seinem eigenen separaten JVM-Prozess, also wenn Sie 4 Consumer auf demselben Computer benötigen und nicht möchten beschäftigen Sie sich mit Mutli Threading Kopfschmerzen dann haben Sie Ihre Kafka Consumer Code JAR in eigenen 4 separaten Java-Prozesse laufen.

Verwandte Themen