2017-06-16 2 views
0

Hallo Ich habe versucht, KAFKA zu lernen und Probleme mit meinem Remote-Poller/Verbraucher.KAFKA REMOTE AWS consumer.poll

Ich habe KAFKA in AWS EC2 Instanz mit privaten und öffentlichen IP eingerichtet. Meine server.properties sieht so aus.

Hörer = KLARTEXT: //172.31.31.58: 9092 #AWS Private IP

advertised.listeners = KLARTEXT: // 35 ?? ?? ??:... 9092 #AWS öffentliche IP-Masked

Meine AWS EC2-Sicherheitsgruppe ist so konfiguriert, dass zu Testzwecken Datenverkehr über eine beliebige IP-Adresse an einem beliebigen Port zugelassen wird.

Wenn ich produzieren/konsumieren Nachrichten lokal mit in meine EC2-Instanz folgende Skripte es funktioniert perfekt

bin/kafka-console-producer.sh --broker-Liste localhost: 9092 --topic Test

ist/kafka-console-consumer.sh --bootstrap-Server localhost: 9092 --topic Test --from-Anfang

Aber wenn ich versuche zu gleicher kafka Instanz von meinem Fern Laptop Eclipse-Code ausgeführt wird mein Java zu verbinden API, mein Code hängt für immer in consumer.poll (100). Mache ich hier etwas falsch?

 Properties props = new Properties(); 
    props.put("bootstrap.servers", "35.??.??.??:9092");//my aws public ip configured in advertised.listeners 
    props.put("group.id", "test123"); 
    props.put("enable.auto.commit", "false"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("test")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 

    } } 
+0

Könnten Sie die Protokolldatei für Ihre Verbraucher posten? Es wäre hilfreich, die Debug-Meldungen zu sehen. – PragmaticProgrammer

Antwort

1

Sind Sie sicher, dass es in poll() hängt? oder ist poll() nur eine leere ConsumerRecords zurück und es ist in der while(true) Looping?

Wenn Sie für die Gruppe keine Offsets festgelegt haben, startet der Consumer standardmäßig am Ende des Themas, sodass nur neue Nachrichten empfangen werden. In diesem Fall, wenn Sie Nachrichten konsumieren bereits im Thema möchten, müssen Sie auto.offset.reset auf earliest setzen (wie Sie in der Konsole-Verbraucher mit --from-beginning tat)

Edit:

Wenn es in tatsächlich festsitzt poll() Es könnte ein Verbindungsproblem sein. Um dies herauszufinden, sollten Sie Ihren Client mit aktivierter Protokollierung ausführen. Erstellen Sie eine Datei mit:

log4j.rootLogger=DEBUG, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n 

und starten Sie Ihren Client mit -Dlog4j.configuration=file:PATH_TO_FILE

+0

Ja Mickael, es hängt in der Umfrage, im Grunde geht es nicht weiter zur nächsten Zeile, das habe ich verifiziert. –

+0

Hallo Mickael, das ist definitiv ein Netzwerkproblem. Ich habe das früher in meinem Büronetzwerk versucht und es funktionierte nicht und funktioniert perfekt in einem anderen Netzwerk. Infact Ich lief mein Programm mit Debug-Protokoll aktiviert, wie Sie Ihren Vorschlag und nichts wurde protokolliert, die Netzwerkproblem angezeigt. Aber das funktioniert jetzt in einem anderen Netzwerk, und Ihr Vorschlag zu Netzwerkproblemen hat mich dazu gebracht zu denken, dies durch ein anderes Netzwerk zu tun. Vielen Dank. –

Verwandte Themen