Ich benutze Kafka 0.10.0. Vor der Verarbeitung möchte ich die Größe der Datensätze in einer Partition wissen.Wie finde ich den Offset-Bereich für eine Topic-Partition in Kafka 0.10?
In Version 0.9.0.1 habe ich den Unterschied zwischen latest
und earliest
Offset für eine Partition mit dem folgenden Code gefunden. In der neuen Version bleibt es beim Abrufen der consumer#position
-Methode stecken.
package org.apache.kafka.example.utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.Range;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class FindTopicRange {
private static Logger logger = LogManager.getLogger();
public FindTopicRange() {
// TODO Auto-generated constructor stub
}
public static Map<TopicPartition, Range<Long>> getOffsets(String topic) {
Map<TopicPartition, Range<Long>> partitionToRange = new HashMap<>();
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerConfigs())) {
List<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partitionInfo : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
consumer.assign(partitions);
for (TopicPartition partition : partitions) {
consumer.seekToBeginning(Collections.singletonList(partition));
long earliestOffset = consumer.position(partition);
consumer.seekToEnd(Collections.singletonList(partition));
long latestOffset = consumer.position(partition);
partitionToRange.put(partition, Range.between(earliestOffset, latestOffset));
}
return partitionToRange;
} catch (Exception e) {
logger.error("Exception while getting offset range information for topic : {}", topic, e);
}
return partitionToRange;
}
private static Properties getConsumerConfigs() {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10240);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return configs;
}
public static void main(String[] args) {
System.out.println(getOffsets("hello"));
}
}
Stacktrace für den obigen Aufruf ist unten dargestellt:
"main" prio=10 tid=0x00007f1750013800 nid=0x443 runnable [0x00007f1756b88000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
- locked <0x00000007c21cba00> (a sun.nio.ch.Util$2)
- locked <0x00000007c21cb9f0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000007c21cb8d8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
at org.apache.kafka.common.network.Selector.select(Selector.java:454)
at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:183)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:324)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:306)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1405)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1196)
Gibt es noch eine andere Methode, um den "neuesten" und "frühesten" Offset zu erhalten, ohne 'KafkaConsumer' zu instanziieren? –