Hallo Ich habe versucht, KAFKA zu lernen und Probleme mit meinem Remote-Poller/Verbraucher.KAFKA REMOTE AWS consumer.poll
Ich habe KAFKA in AWS EC2 Instanz mit privaten und öffentlichen IP eingerichtet. Meine server.properties sieht so aus.
Hörer = KLARTEXT: //172.31.31.58: 9092 #AWS Private IP
advertised.listeners = KLARTEXT: // 35 ?? ?? ??:... 9092 #AWS öffentliche IP-Masked
Meine AWS EC2-Sicherheitsgruppe ist so konfiguriert, dass zu Testzwecken Datenverkehr über eine beliebige IP-Adresse an einem beliebigen Port zugelassen wird.
Wenn ich produzieren/konsumieren Nachrichten lokal mit in meine EC2-Instanz folgende Skripte es funktioniert perfekt
bin/kafka-console-producer.sh --broker-Liste localhost: 9092 --topic Test
ist/kafka-console-consumer.sh --bootstrap-Server localhost: 9092 --topic Test --from-Anfang
Aber wenn ich versuche zu gleicher kafka Instanz von meinem Fern Laptop Eclipse-Code ausgeführt wird mein Java zu verbinden API, mein Code hängt für immer in consumer.poll (100). Mache ich hier etwas falsch?
Properties props = new Properties();
props.put("bootstrap.servers", "35.??.??.??:9092");//my aws public ip configured in advertised.listeners
props.put("group.id", "test123");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
} }
Könnten Sie die Protokolldatei für Ihre Verbraucher posten? Es wäre hilfreich, die Debug-Meldungen zu sehen. – PragmaticProgrammer