Ich habe zwei Kafka Consumer ConsumerA
und ConsumerB
. Ich möchte diese zwei Kafka-Konsumenten unabhängig voneinander auf derselben Maschine laufen lassen. Es gibt überhaupt keine Beziehung zwischen ihnen. Diese beiden Kafka-Konsumenten werden zu verschiedenen Themen auf derselben Maschine arbeiten.Wie können mehrere Kafka-Konsumenten auf derselben Box unabhängig voneinander ausgeführt werden?
- Jeder Verbraucher sollte ein anderes Properties-Objekt haben.
- Jeder Verbraucher sollte eine andere Thread-Pool-Konfiguration haben, da sie bei Bedarf unabhängig von anderen Konsumenten auf Multithreading-Weise (Verbrauchergruppe) ausgeführt werden können.
Unten ist mein Design:
Consumer-Klasse (Auszug):
public abstract class Consumer implements Runnable {
private final Properties consumerProps;
private final String consumerName;
public Consumer(String consumerName, Properties consumerProps) {
this.consumerName = consumerName;
this.consumerProps = consumerProps;
}
protected abstract void shutdown();
protected abstract void run(String consumerName, Properties consumerProps);
@Override
public final void run() {
run(consumerName, consumerProps);
}
}
ConsumerA Klasse:
public class ConsumerA extends Consumer {
private final AtomicBoolean closed = new AtomicBoolean(false);
private KafkaConsumer<byte[], byte[]> consumer;
public ConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}
@Override
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
@Override
protected void run(String consumerName, Properties consumerProps) {
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(getTopicsBasisOnConsumerName());
Map<String, Object> config = new HashMap<>();
config.put(Config.URLS, TEST_URL);
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
try {
while (!closed.get()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<byte[], byte[]> record : records) {
GenericRecord payload = decoder.decode(record.value());
// extract data from payload
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (WakeupException ex) {
// Ignore exception if closing
System.out.println("error= ", ex);
if (!closed.get()) throw e;
} catch (Exception ex) {
System.out.println("error= ", ex);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}
ConsumerA B-Klasse:
// similar to `ConsumerA` but with specific details of B
ConsumerHandler Klasse:
public final class ConsumerHandler {
private final ExecutorService executorServiceConsumer;
private final Consumer consumer;
private final List<Consumer> consumers = new ArrayList<>();
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
this.consumers.add(consumer);
executorServiceConsumer.submit(consumer);
}
}
public void shutdown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Consumer consumer : consumers) {
consumer.shutdown();
}
executorServiceConsumer.shutdown();
try {
executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}
}
Unten ist meine Hauptklasse in einem meinem Projekt, bei dem, wenn ich meine Server starten, wird kommen, ruft zunächst automatisch und von diesem Ort, wo ich meint alle kafka starten Verbraucher, wo ich meine ConsumerA
und ConsumerB
ausführen. Und sobald Shutdown aufgerufen wird, gebe ich alle Ressourcen frei, indem ich bei allen meinen Kafka-Konsumenten den Shutdown anrufe.
Ist das der richtige Entwurf für diese Art von Problem, wo ich mehrere kafka Verbraucher auf der gleichen Box laufen lassen möchte? Lassen Sie mich wissen, ob es eine bessere und effizientere Lösung für dieses Problem gibt. Im Allgemeinen werde ich drei oder vier Kafka-Konsumenten maximal auf der gleichen Box betreiben und jeder Konsument kann bei Bedarf eine eigene Konsumentengruppe haben.
Hier ist die Javadoc für KafkaConsumer, die ich in beiden meinen Verbraucher verwende. Und basierend auf dieser article Ich habe meinen Verbraucher erstellt, es ist nur, dass ich abstrakte Klasse verwendet habe, um es zu erweitern. Suche nach "Alles zusammenfügen" in diesem Link.
In den Dokumenten wird erwähnt, dass Verbraucher nicht Thread-sicher sind, aber es sieht so aus, als ob mein Code die gleiche Verbraucherinstanz für jeden Thread im Pool wiederverwendet.
Was ist der beste Weg, um dieses Thema Threadsicherheit zu lösen und immer noch die gleichen Funktionen zu erreichen?
Wenn dies Code funktioniert, gehört die Frage auf http://codereview.stackexchange.com/ – jaco0646