Ich habe unter Kafka Verbraucher, die ich unter bestimmten Bedingungen anhalten und dann später wieder aufnehmen, um alle vorherigen Nachricht zu verbrauchen. Eine Idee besteht darin, ein geteiltes Flag zu verwenden, das von einem anderen Thread aktualisiert werden kann und vor dem Verzehr, d. H. iterator.next().message()
Ich überprüfe den Wert des Flags. Wenn es wahr ist, konsumiere die Nachricht nicht. Ich wollte nur überprüfen, ob ich in die richtige Richtung denke oder ob es einen besseren Weg gibt.Pause hohe Ebene Kafka Verbraucher
class KafkaConsumer implements Runnable {
KafkaStream<byte[], byte[]> topicStream;
ConsumerConnector consumerConnectorObj;
public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream,
final ConsumerConnector consumerConnectorObj) {
this.topicStream = topicStream;
this.consumerConnectorObj = consumerConnectorObj;
}
@Override
public void run() {
if (topicStream != null) {
ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator();
while (true) {
if (iterator != null) {
boolean nextFlag = true;
try {
iterator.hasNext();
} catch (ConsumerTimeoutException e) {
LOG.warn("Consumer timeout occured", e);
nextFlag = false;
}
if (nextFlag) {
byte[] msg = iterator.next().message();
}
}
}
}
}
}
danke Harald Ich werde es versuchen und werde zurückkommen –