2016-05-06 5 views
0

Ich habe unter Kafka Verbraucher, die ich unter bestimmten Bedingungen anhalten und dann später wieder aufnehmen, um alle vorherigen Nachricht zu verbrauchen. Eine Idee besteht darin, ein geteiltes Flag zu verwenden, das von einem anderen Thread aktualisiert werden kann und vor dem Verzehr, d. H. iterator.next().message() Ich überprüfe den Wert des Flags. Wenn es wahr ist, konsumiere die Nachricht nicht. Ich wollte nur überprüfen, ob ich in die richtige Richtung denke oder ob es einen besseren Weg gibt.Pause hohe Ebene Kafka Verbraucher

class KafkaConsumer implements Runnable { 
     KafkaStream<byte[], byte[]> topicStream; 
     ConsumerConnector consumerConnectorObj; 

     public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream, 
       final ConsumerConnector consumerConnectorObj) { 
      this.topicStream = topicStream; 
      this.consumerConnectorObj = consumerConnectorObj; 
     } 

     @Override 
     public void run() { 
      if (topicStream != null) { 
       ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator(); 
       while (true) { 
         if (iterator != null) { 
          boolean nextFlag = true; 
          try { 
           iterator.hasNext(); 
          } catch (ConsumerTimeoutException e) { 
           LOG.warn("Consumer timeout occured", e); 
           nextFlag = false; 
          } 

          if (nextFlag) { 
           byte[] msg = iterator.next().message(); 
          } 
         } 
       } 
      } 
     } 
    } 

Antwort

0

Normalerweise sollten Sie nicht zwischendurch manuell synchronisieren müssen, weil Sie eine gute Chance haben, es falsch zu verstehen. Schauen Sie sich java.util.concurrent für Helfer an. In Ihrem Fall könnte es ein Semaphor sein. Am einfachsten wäre es, den Semaphor vor dem Verarbeiten einer Nachricht zu erfassen, ihn anschließend zurückzugeben und in der nächsten Schleife erneut zu erfassen.

Meine Ahnung ist, dass dies nicht der beste Weg ist, es zu tun. Ich würde lieber availablePermits() anrufen und weiter verbrauchen, während die Nummer größer ist. Erst wenn es auf Null fällt, versuche, den Semaphor zu erhalten. Dies blockiert den Thread, bis der andere Thread erneut eine Genehmigung erteilt hat. Dadurch wird Ihr Arbeitsthread entsperrt und die Genehmigung, die Sie sofort wieder zurückgeben sollten, übergeben und mit der Schleife wie oben begonnen.

while (true) { 
    if (semaphore.availablePermits()<=0) { 
    semaphore.acquire(); // will block 
    // eventually another thread increments the semaphore again 
    // and we arrive here 
    semaphore.release(); 
    } 
    // keep processing messages 
} 

können Sie zusätzliche Ideen in Antworten auf this question finden.

+0

danke Harald Ich werde es versuchen und werde zurückkommen –