Ich bin ein Apache Kafka Verbraucher zu einem anderen bereits laufenden Kafka abonnieren. Nun, mein Problem ist, dass, wenn mein Produzent Nachrichten an einen Server sendet ... mein Konsument sie nicht empfängt. Here I Producer Code geben,Consumer keine Nachricht in Apache Kafka
Properties properties = new Properties();
properties.put("metadata.broker.list","Running kafka ip addr:9092");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
String filePath="filepath";
File rootFile= new File(filePath);
Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE);
for(File file : allFiles) {
StringBuilder sb = new StringBuilder();
sb.append(file);
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString());
System.out.println("sending msg from producer.."+sb.toString());
producer.send(message);
}
producer.close();
Hier Code Consumer,
properties.put("bootstrap.servers","Running zookeaper ip addr:2181");
properties.put("group.id","test-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = "+record.topic());
System.out.println("topic = "+record.partition());
System.out.println("topic = "+record.offset());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.printf("commit failed", e) ;
}
}
Ich benutze diese Abhängigkeit:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
ich alle Informationen aus dieser Verbindung erhalten:
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Wenn wir Verbraucher betrieben, erhielten wir keine Benachrichtigung von der Verbraucherseite. Bitte gib mir eine Idee.
versuchen Sie herauszufinden, wo das Problem ist: auf Verbraucher- oder Herstellergröße. Dafür: Offsets im Thema überprüfen. Es kann von der Kommandozeile – Natalia
ausgeführt werden Sie führen es als eine JAR-Datei im Cluster? .. Bitte überprüfen Sie Ihren Port zookeeper. –
@Natalia: Ich kann die Nachrichten über den Produzenten posten. Ich kann sehen, dass die Nummer der Nachricht zusammen mit der Protokollgröße zunimmt. Aber Offset wird nicht erhöht ... –