Community, können Sie mir bitte helfen zu verstehen, warum ~ 3% meiner Nachrichten nicht in HDFS
enden? Ich habe einen einfachen Produzenten in JAVA
geschrieben, um 10 Millionen Nachrichten zu generieren.Confluence HDFS Connector verliert Nachrichten
public static final String TEST_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public KafkaProducerWrapper(String topic) throws UnknownHostException {
// store topic name
this.topic = topic;
// initialize kafka producer
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "myserver-1:9092");
config.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
config.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
config.put("schema.registry.url", "http://myserver-1:8089");
config.put("acks", "all");
producer = new KafkaProducer(config);
// parse schema
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(TEST_SCHEMA);
}
public void send() {
// generate key
int key = (int) (Math.random() * 20);
// generate record
GenericData.Record r = new GenericData.Record(schema);
r.put("str1", "text" + key);
r.put("str2", "text2" + key);
r.put("int1", key);
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "K" + key, (GenericRecord) r);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
logger.error("Send failed for record {}", record, e);
messageErrorCounter++;
return;
}
logger.debug("Send succeeded for record {}", record);
messageCounter++;
}
});
}
public String getStats() { return "Messages sent: " + messageCounter + "/" + messageErrorCounter; }
public long getMessageCounter() {
return messageCounter + messageErrorCounter;
}
public void close() {
producer.close();
}
public static void main(String[] args) throws InterruptedException, UnknownHostException {
// initialize kafka producer
KafkaProducerWrapper kafkaProducerWrapper = new KafkaProducerWrapper("my-test-topic");
long max = 10000000L;
for (long i = 0; i < max; i++) {
kafkaProducerWrapper.send();
}
logger.info("producer-demo sent all messages");
while (kafkaProducerWrapper.getMessageCounter() < max)
{
logger.info(kafkaProducerWrapper.getStats());
Thread.sleep(2000);
}
logger.info(kafkaProducerWrapper.getStats());
kafkaProducerWrapper.close();
}
Und ich benutze den Confluent HDFS Connector
in Standalone-Modus Daten HDFS
zu schreiben. Die Konfiguration ist wie folgt:
name=hdfs-consumer-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=my-test-topic
hdfs.url=hdfs://my-cluster/kafka-test
hadoop.conf.dir=/etc/hadoop/conf/
flush.size=100000
rotate.interval.ms=20000
# increase timeouts to avoid CommitFailedException
consumer.session.timeout.ms=300000
consumer.request.timeout.ms=310000
heartbeat.interval.ms= 60000
session.timeout.ms= 100000
Der Anschluss der Daten in HDFS schreibt, aber nach 20000 ms warten (aufgrund rotate.interval.ms
) nicht alle Nachrichten empfangen werden.
scala> spark.read.avro("/kafka-test/topics/my-test-topic/partition=*/my-test-topic*")
.count()
res0: Long = 9749015
Eine Idee, was ist der Grund für dieses Verhalten? Wo ist mein Fehler? Ich verwende Confluent 3.0.1/Kafka 10.0.0.1.
Sind die letzten Nachrichten nicht in HDFS verschoben? Wenn ja, ist es wahrscheinlich, dass Sie in das hier beschriebene Problem kommen https://github.com/confluentinc/kafka-connect-hdfs/pull/100 Versuchen Sie, eine weitere Nachricht an das Thema nach der rotate.interval.ms hat senden abgelaufen, um dies zu bestätigen, ist das, worauf Sie gerade stoßen. Wenn Sie basierend auf der Zeit drehen müssen, ist es wahrscheinlich eine gute Idee, das Problem zu beheben. – dawsaw
Das ist die Lösung! Ich habe auf ** Confluent 3.1.1 ** aktualisiert und kann alle Nachrichten in 'HDFS' sehen. Willst du es als Antwort schreiben und ich gebe dir das Lob, das du verdienst? –
Ja klar, wusste nicht, dass es wirklich einen Unterschied gab :) – dawsaw