2017-04-25 3 views
0

Wie unten gezeigt, mein Code ist ein High-Level-Verbraucher holen ein Thema mit 32 Partitionen in Kafka-Server, ich bin verwirrt, warum manchmal bekomme ich eine leere Rückkehr von consumer.poll(). Ich habe versucht, Poll-Timeout zu erhöhen, und wenn ich dann das Zeitlimit auf 1000 erhöht, dann hat jede Umfrage Rückdaten, während ich Timeout auf 10 oder 0 setze, dann sehe ich eine Menge leerer Rückkehr.Wenn Kafka Consumer Poll null Datensätze zurückgeben?

Kann mir jemand sagen, wie man ein korrektes Timeout einstellt?

def main(args: Array[String]): Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", "kafka-01:9098") 
    props.put("group.id", "kch1") 
    props.put("enable.auto.commit", "true") 
    props.put("auto.commit.interval.ms", "1000") 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 

    //props.put("max.poll.records", "1000") 
    val consumers = new Array[KafkaConsumer[String, String]](16) 
    for(i <- 0 to 15) { 
     consumers(i) = new KafkaConsumer[String, String](props) 
     consumers(i).subscribe(util.Arrays.asList("veh321")) 
    } 
    var cnt = 0 
    var cacheIterator: Iterator[ConsumerRecord[String, String]] = null 
    for(i <- 0 to 15) { 
     new Thread(new Runnable { 
     override def run(): Unit = { 
      var finish = false 
      while(!finish) { 
      val start = System.currentTimeMillis() 
      cacheIterator = consumers(i).poll(100).iterator() 
      val end = System.currentTimeMillis() - start 
      if (end > 10) { 
       println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}") 
      } 
      } 
     } 
     }).start() 
    } 

Antwort

0

Java Verbraucher beschäftigt Linux epoll als zugrunde liegende Implementierung Schema von java.nio.channels.Selector.select Aufruf (Timeout). Es ist sehr wahrscheinlich, dass nichts zurückgegeben wird, wenn Sie ihm nur 100 ms geben, um zu versuchen, wie viele SelectionKeys während dieses kurzen Zeitintervalls bereit sind.

Außerdem wird der Kunde während der gleichen 100 ms einige andere Aufgaben ausführen, einschließlich des Abrufkoordinatorstatus, sodass das Echtzeitintervall für die Datensatzabfrage offensichtlich weniger als 100 ms beträgt, was das Abrufen einiger wirklich nützlicher Dinge erschwert.

+0

Wenn ich also ein Polltimeout auf 1000ms setze, kann ich sehen, dass alle Abfragen Daten zurückgeben, einige von ihnen kosten etwa 200 ms, während andere nur ein paar Millisekunden kosten. Was passiert also, wenn die Umfragekosten Hunderte von Sekunden benötigen? Gibt es irgendwelche Parameter, um dies zu ändern? –

+0

Vor dem Abrufen von Datensätzen müssen viele zugrunde liegende Vorgänge ausgeführt werden, z. B. Metadaten abrufen, Verbindungen und Gruppen verwalten usw. Sie können "max.poll.records" so einstellen, dass die Anzahl der Nachrichten für eine einzelne Umfrageabfrage gesteuert wird, wenn Sie Java Consumer verwenden. – amethystic

+0

Vielen Dank für Sie Hilfe, also wenn ich einen großen Durchsatz möchte, kann ich eine große Umfrage einstellen.Records und eine große Poll Timeout, so dass die Zeiten der Umfrage verringern, bin ich richtig? –