Wie unten gezeigt, mein Code ist ein High-Level-Verbraucher holen ein Thema mit 32 Partitionen in Kafka-Server, ich bin verwirrt, warum manchmal bekomme ich eine leere Rückkehr von consumer.poll(). Ich habe versucht, Poll-Timeout zu erhöhen, und wenn ich dann das Zeitlimit auf 1000 erhöht, dann hat jede Umfrage Rückdaten, während ich Timeout auf 10 oder 0 setze, dann sehe ich eine Menge leerer Rückkehr.Wenn Kafka Consumer Poll null Datensätze zurückgeben?
Kann mir jemand sagen, wie man ein korrektes Timeout einstellt?
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka-01:9098")
props.put("group.id", "kch1")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//props.put("max.poll.records", "1000")
val consumers = new Array[KafkaConsumer[String, String]](16)
for(i <- 0 to 15) {
consumers(i) = new KafkaConsumer[String, String](props)
consumers(i).subscribe(util.Arrays.asList("veh321"))
}
var cnt = 0
var cacheIterator: Iterator[ConsumerRecord[String, String]] = null
for(i <- 0 to 15) {
new Thread(new Runnable {
override def run(): Unit = {
var finish = false
while(!finish) {
val start = System.currentTimeMillis()
cacheIterator = consumers(i).poll(100).iterator()
val end = System.currentTimeMillis() - start
if (end > 10) {
println(s"${Thread.currentThread().getId} + Duration is ${end}, ${cacheIterator.hasNext} ${cacheIterator.size}")
}
}
}
}).start()
}
Wenn ich also ein Polltimeout auf 1000ms setze, kann ich sehen, dass alle Abfragen Daten zurückgeben, einige von ihnen kosten etwa 200 ms, während andere nur ein paar Millisekunden kosten. Was passiert also, wenn die Umfragekosten Hunderte von Sekunden benötigen? Gibt es irgendwelche Parameter, um dies zu ändern? –
Vor dem Abrufen von Datensätzen müssen viele zugrunde liegende Vorgänge ausgeführt werden, z. B. Metadaten abrufen, Verbindungen und Gruppen verwalten usw. Sie können "max.poll.records" so einstellen, dass die Anzahl der Nachrichten für eine einzelne Umfrageabfrage gesteuert wird, wenn Sie Java Consumer verwenden. – amethystic
Vielen Dank für Sie Hilfe, also wenn ich einen großen Durchsatz möchte, kann ich eine große Umfrage einstellen.Records und eine große Poll Timeout, so dass die Zeiten der Umfrage verringern, bin ich richtig? –