2016-05-31 5 views
0

Ich benutze Kafka 0.10.0. Vor der Verarbeitung möchte ich die Größe der Datensätze in einer Partition wissen.Wie finde ich den Offset-Bereich für eine Topic-Partition in Kafka 0.10?

In Version 0.9.0.1 habe ich den Unterschied zwischen latest und earliest Offset für eine Partition mit dem folgenden Code gefunden. In der neuen Version bleibt es beim Abrufen der consumer#position-Methode stecken.

package org.apache.kafka.example.utils; 

import java.util.ArrayList; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 

import org.apache.commons.lang3.Range; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.common.PartitionInfo; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.kafka.common.serialization.ByteArraySerializer; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 

public class FindTopicRange { 

    private static Logger logger = LogManager.getLogger(); 

    public FindTopicRange() { 
     // TODO Auto-generated constructor stub 
    } 

    public static Map<TopicPartition, Range<Long>> getOffsets(String topic) { 

     Map<TopicPartition, Range<Long>> partitionToRange = new HashMap<>(); 
     try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerConfigs())) { 

      List<TopicPartition> partitions = new ArrayList<>(); 
      for (PartitionInfo partitionInfo : consumer.partitionsFor(topic)) { 
       partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); 
      } 
      consumer.assign(partitions); 

      for (TopicPartition partition : partitions) { 
       consumer.seekToBeginning(Collections.singletonList(partition)); 
       long earliestOffset = consumer.position(partition); 

       consumer.seekToEnd(Collections.singletonList(partition)); 
       long latestOffset = consumer.position(partition); 
       partitionToRange.put(partition, Range.between(earliestOffset, latestOffset)); 
      } 
      return partitionToRange; 
     } catch (Exception e) { 
      logger.error("Exception while getting offset range information for topic : {}", topic, e); 
     } 
     return partitionToRange; 
    } 

    private static Properties getConsumerConfigs() { 
     Properties configs = new Properties(); 
     configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
     configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "test"); 
     configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10240); 

     configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); 
     configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); 
     return configs; 
    } 

    public static void main(String[] args) { 
     System.out.println(getOffsets("hello")); 
    } 

} 

Stacktrace für den obigen Aufruf ist unten dargestellt:

"main" prio=10 tid=0x00007f1750013800 nid=0x443 runnable [0x00007f1756b88000] 
    java.lang.Thread.State: RUNNABLE 
     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) 
     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) 
     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) 
     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) 
     - locked <0x00000007c21cba00> (a sun.nio.ch.Util$2) 
     - locked <0x00000007c21cb9f0> (a java.util.Collections$UnmodifiableSet) 
     - locked <0x00000007c21cb8d8> (a sun.nio.ch.EPollSelectorImpl) 
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) 
     at org.apache.kafka.common.network.Selector.select(Selector.java:454) 
     at org.apache.kafka.common.network.Selector.poll(Selector.java:277) 
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:324) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:306) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1405) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1196) 
+0

Gibt es noch eine andere Methode, um den "neuesten" und "frühesten" Offset zu erhalten, ohne 'KafkaConsumer' zu instanziieren? –

Antwort

0

Haben Sie zu fälschen versucht, es mit einer neuen Verbrauchergruppe? Dieser Beitrag zeigt, dass es Ihnen die Lag-Werte geben kann other method

1

Ich konnte Ihr Beispiel in scala arbeiten (arbeitete bereits an ähnlichen Code). Der einzige Zusatz, den ich gemacht habe, war das Hinzufügen einer consumer.poll zu dem Code, vorausgesetzt, dass consumer.subscribe und consumer.assign faul sind.

val partitions = new util.ArrayList[TopicPartition] 

for (partitionInfo <- consumer.partitionsFor(topic)) { 
partitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))} 

val recordTemp = consumer.poll(1000) 

for (partition <- partitions) { 
    consumer.seekToBeginning(Collections.singletonList(partition)) 
    println(consumer.position(partition)) 
    consumer.seekToEnd(Collections.singletonList(partition)) 
    println(consumer.position(partition)) 
} 
Verwandte Themen