2016-10-17 8 views
1

Wie Liste der verbundenen Verbraucher auf Kafka erhalten? Wie Verbraucher auf Broker verbunden sind, gibt es irgendein Java-Dienstprogramm wie ZkClient/ZkUtils, um Liste der verbundenen Verbraucher in Kafka 0.9.0.x zu erhalten? Wie wir verwenden, um die Liste der Broker unter Dienstprogramm zu erhalten:Wie überprüft man, ob ein bestimmter Verbraucher über Java mit Kafka 0.9.0.x verbunden ist?

 ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 60000); 

     if(zkClient!=null){ 
      List<String> brokerIds = zkClient.getChildren(ZkUtils.BrokerIdsPath()); 
      if(CollectionUtils.isNotEmpty(brokerIds) && brokerIds.contains(brokerId)){ 
       logger.debug("Broker:{{}} is connected to Zookeeper.",brokerId); 
       flag = true;  
      } 
      else{ 
       logger.error("ERROR:Broker:{{}} is not connected to Zookeeper.",brokerId); 
      } 
      zkClient.close(); 
     } 

Ich bin mit Kafka 0.9.0.x mit unter Java lib von Maven:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.1</version> 
</dependency> 

AKTUALISIERT:

Ich öffnete eine 'Kafka-Konsole-Consumer.bat' und lief es einmal dann überquerte die Eingabeaufforderung cmd. Dann ging weiter zu "zoekeeper-shell.bat" und ls/verbraucher es zeigte dann [console-consumer-6008], aber meine programmierten Verbraucher werden nicht angezeigt. mit zkClient.getChildren(ZkUtils.ConsumersPath()) kann ich jetzt nur genannten Verbraucher anzeigen.

Antwort

3

Nicht sicher genau das, was Information benötigen Sie, aber ich habe ein Beispielprogramm, das die gleichen Informationen als kafka-consumer-groups.sh --describe gibt.

Um diesen Code zu verwenden, fügen Sie diese Abhängigkeiten zu Ihrem Pom hinzu.

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.9.0.1</version> 
</dependency> 

Dann:

import org.apache.kafka.clients.CommonClientConfigs; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.TopicPartition; 
import kafka.admin.AdminClient; 
import kafka.coordinator.GroupOverview; 

Properties props = new Properties(); 
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092"); 
AdminClient adminClient = AdminClient.create(props); 

List<GroupOverview> groups = scala.collection.JavaConversions.seqAsJavaList(
     adminClient.listAllConsumerGroupsFlattened()); 
for (GroupOverview group : groups) { 
    String groupId = group.groupId(); 

    Properties consProps = new Properties(); 
    consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092"); 
    consProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
    consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
    consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
    consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
    consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer consumer = new KafkaConsumer(consProps); 

    List<AdminClient.ConsumerSummary> groupSummaries = scala.collection.JavaConversions.seqAsJavaList(
      adminClient.describeConsumerGroup(groupId)); 

    System.out.println("GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER"); 

    for (AdminClient.ConsumerSummary summary : groupSummaries) { 
     String owner = summary.clientId() + "_" + summary.clientHost(); 
     List<TopicPartition> topicPartitions = scala.collection.JavaConversions.seqAsJavaList(
       summary.assignment()); 
     for (TopicPartition tp : topicPartitions) { 

      // Get current offset 
      long currentOffset = consumer.committed(tp).offset(); 

      // get log end offset 
      consumer.assign(Arrays.asList(tp)); 
      consumer.seekToEnd(); 
      long logEndOffset = consumer.position(tp); 

      long lag = logEndOffset - currentOffset; 

      System.out.println(groupId + ", " + tp.topic() + ", " + tp.partition() + ", " + 
        currentOffset + ", " + logEndOffset + ", " + lag + ", " + owner); 
     } 
    } 
} 
+0

Danke, genau das, was ich brauchte, um eine Liste der laufenden Verbraucher zu bekommen. Dies wurde mit der Methode "AdminClient" + "listAllConsumerGroupsFlattened()" erreicht. Die Dinge sind im Kafak noch ziemlich versteckt. – usman

0

Es ist ziemlich das Gleiche, aber Sie müssen ZkUtils.ConsumersPath (=/Consumers) überprüfen.

Die Konsumentenstruktur in Zookeeper ist die nächste/consumers/[groupId]/ids/[consumerId], so dass Sie die Gruppen und Verbraucher für jede Gruppe navigieren können.

+0

ZkUtils.ConsumersPath (/ Verbraucher) immer [] zurückgegeben. Ich denke, dass Verbrauchergruppeninformationen jetzt auf kafka gespeichert sind. Ich habe diesen Teil bereits durchgegangen, um die Verbraucherliste zu überprüfen. – usman

+1

In 0.9.x und 0.10.x bestehen weiterhin Consumer-Gruppen und Consumer. Sie können es im Code überprüfen ZkUtils.getConsumers erhalten die Kinder des ConsumersPath. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala#L864 – gasparms

+0

zkClient.getChildren (ZkUtils.ConsumersPath()) gibt leer [] zurück. – usman

0

Für 0.9.x neue Verbraucher und listen alle aktiven Verbrauchergruppen:

  1. alle Makler finden und „Listgroups“ Anfrage an jeden Broker und erhalten alle Gruppeninformationen senden;

Details dazu finden Sie auf $KAFKA_HOME/bin/kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService.list()) beziehen

Für 0.9.x neue Verbraucher und bestimmte detaillierte Verbrauchergruppe Informationen beschreiben:

  1. finden Sie Verbraucher Gruppenkoordinator und senden Sie "DescribeGroups" Anfrage an ihn und erhalten alle Gruppenmitglieder Informationen und Partition Zuweisungsinformationen;
  2. rufen Sie KafkaConsumer.committed (TopicPartition-Partition) auf, um den letzten festgeschriebenen Offset für die angegebene Partition abzurufen.

Einzelheiten können Sie beziehen sich auf $KAFKA_HOME/bin/kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService.describe())

Bitte beachtet werden, dass alte Verbraucher und neue Verbraucher haben ganz andere Implementierung darüber. (Beide Logik ist in kafka.admin.ConsumerGroupCommand implementiert.

+0

diese Bearbeitung war ein Fehler musste in meiner eigenen Frage hinzufügen, tut mir leid für sie weiß nicht, wie Sie es verwerfen. – usman

+0

Übrigens verwende ich Java-Code auf Windows, Kafka Windows haben nicht "kafka-consumer-groups.bat", was jetzt zu tun ist. – usman

Verwandte Themen