Ich versuche, Daten aus einer Datei in ein Kafka-Thema zu schreiben. Mein Code sieht so aus:Kafka Producer überspringt Nachrichten
Properties properties = new Properties();
properties.put("bootstrap.servers", <bootstrapServers>);
properties.put("key.serializer", StringSerializer.class.getCanonicalName());
properties.put("value.serializer", StringSerializer.class.getCanonicalName());
properties.put("retries",100);
properties.put("linger.ms",5);
properties.put("acks", "all");
KafkaProducer<Object, String> producer = new KafkaProducer<>(properties);
try (BufferedReader bf = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
String line;
int count = 0;
while ((line = bf.readLine()) != null) {
count++;
producer.send(new ProducerRecord<>(topicName, line));
}
producer.flush();
Logger.log("Done producing data messages. Total no of records produced:" + count);
} catch (InterruptedException | ExecutionException | IOException e) {
Throwables.propagate(e);
} finally {
producer.close();
}
Die Größe der Daten liegt über 1 Million Datensätze.
Wenn prüfe ich den Offset von Daten über Broker mit folgendem Befehl ein, gibt es nur die Hälfte der Meldungen (etwa 5,00,000) auf dem Thema geschrieben:
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_list> --time -1 --topic <topic_name>
Ausgabe des obigen Befehls:
topic_name:1:292954
topic_name:0:296787
Welche Änderungen sollte ich in Ansatz tun, um sicherzustellen, dass alle über das Thema geschrieben werden.
Können Sie die tatsächliche Ausgabe des GetOffsetShell-Befehls anzeigen? – C4stor
Die Ausgabe in der Frage hinzugefügt. –
Was ist der Wert von count in der Anwendungsprotokolldatei? Zeigt es 1m? – notionquest