2016-11-21 3 views
0

Ich versuche, Daten aus einer Datei in ein Kafka-Thema zu schreiben. Mein Code sieht so aus:Kafka Producer überspringt Nachrichten

Properties properties = new Properties(); 
    properties.put("bootstrap.servers", <bootstrapServers>); 
    properties.put("key.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("value.serializer", StringSerializer.class.getCanonicalName()); 
    properties.put("retries",100); 
    properties.put("linger.ms",5); 
    properties.put("acks", "all"); 

    KafkaProducer<Object, String> producer = new KafkaProducer<>(properties); 

    try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) { 
     String line; 
     int count = 0; 
     while ((line = bf.readLine()) != null) { 
      count++; 
      producer.send(new ProducerRecord<>(topicName, line)); 
     } 
    producer.flush(); 
     Logger.log("Done producing data messages. Total no of records produced:" + count); 
    } catch (InterruptedException | ExecutionException | IOException e) { 
     Throwables.propagate(e); 
    } finally { 
     producer.close(); 
    } 

Die Größe der Daten liegt über 1 Million Datensätze.

Wenn prüfe ich den Offset von Daten über Broker mit folgendem Befehl ein, gibt es nur die Hälfte der Meldungen (etwa 5,00,000) auf dem Thema geschrieben:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> --time -1 --topic <topic_name> 

Ausgabe des obigen Befehls:

topic_name:1:292954 
topic_name:0:296787 

Welche Änderungen sollte ich in Ansatz tun, um sicherzustellen, dass alle über das Thema geschrieben werden.

+0

Können Sie die tatsächliche Ausgabe des GetOffsetShell-Befehls anzeigen? – C4stor

+0

Die Ausgabe in der Frage hinzugefügt. –

+0

Was ist der Wert von count in der Anwendungsprotokolldatei? Zeigt es 1m? – notionquest

Antwort

0

Die Nachricht zum Senden ist asynchron. Möglicherweise überprüfen Sie die Offsets, bevor alle Nachrichten verarbeitet werden.

+0

Die Aufbewahrungsdauer des Logs beträgt 24 Stunden. Und ich überprüfe Nachrichten direkt nach der Erstellung einer Nachricht, die kaum 4-5 Minuten dauert. –

+0

Sie haben die Antwort vollständig geändert. –

+0

Ja mein Schlechter Ich dachte darüber nach und Retention wäre kein Problem, ich habe vergessen, vor der Änderung zu aktualisieren –

Verwandte Themen