Exception in thread "main" java.nio.channels.ClosedChannelException bei kafka.network.BlockingChannel.send (BlockingChannel.scala: 100) bei kafka.consumer.SimpleConsumer.liftedTree1 1 $ (SimpleConsumer.scala: 78) bei kafka.consumer.SimpleConsumer.kafka $ consumer $ SimpleConsumer $$ sendRequest (SimpleConsumer.scala: 68) bei kafka.consumer.SimpleConsumer.send (SimpleConsumer.scala: 91) bei kafka. javaapi.consumer.SimpleConsumer.send (SimpleConsumer.scala: 68) bei cmb.SparkStream.kafka.kafkaOffsetTool.getTopicOffsets (kafkaOffsetTool.java:47) bei cmb.SparkStream.LogClassify.main (LogClassify.java:95) bei sun.reflect.NativeMethodAccessorImpl.invoke0 (native Methode) bei sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) bei sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) bei java.lang .reflect.Method.invoke (Method.java:606) bei org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain (SparkSubmit.scala: 729) bei org.apache .spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala: 185) bei org.apache.spark.deploy.SparkSubmit $ .submit (SparkSubmit.scala: 210) bei org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 124) bei org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)erhalten, das Thema und die Partition Offset
mein Code:
public static Map<TopicAndPartition, Long> getTopicOffsets(String zkServers, String topic) {
Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
for (String zkserver : zkServers.split(",")) {
SimpleConsumer simpleConsumer = new SimpleConsumer(zkserver.split(":")[0],
Integer.valueOf(zkserver.split(":")[1]), Consts.getKafkaConfigBean().getDuration(), 1024,
"consumser");
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));
TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
for (PartitionMetadata part : metadata.partitionsMetadata()) {
Broker leader = part.leader();
if (leader != null) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId());
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
kafka.api.OffsetRequest.LatestTime(), 10000);
OffsetRequest offsetRequest = new OffsetRequest(
ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo),
kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
if (!offsetResponse.hasError()) {
long[] offsets = offsetResponse.offsets(topic, part.partitionId());
retVals.put(topicAndPartition, offsets[0]);
}
}
}
}
simpleConsumer.close();
}
return retVals;
}