2016-11-30 1 views
1

Community, können Sie mir bitte helfen zu verstehen, warum ~ 3% meiner Nachrichten nicht in HDFS enden? Ich habe einen einfachen Produzenten in JAVA geschrieben, um 10 Millionen Nachrichten zu generieren.Confluence HDFS Connector verliert Nachrichten

public static final String TEST_SCHEMA = "{" 
     + "\"type\":\"record\"," 
     + "\"name\":\"myrecord\"," 
     + "\"fields\":[" 
     + " { \"name\":\"str1\", \"type\":\"string\" }," 
     + " { \"name\":\"str2\", \"type\":\"string\" }," 
     + " { \"name\":\"int1\", \"type\":\"int\" }" 
     + "]}"; 

public KafkaProducerWrapper(String topic) throws UnknownHostException { 
    // store topic name 
    this.topic = topic; 

    // initialize kafka producer 
    Properties config = new Properties(); 
    config.put("client.id", InetAddress.getLocalHost().getHostName()); 
    config.put("bootstrap.servers", "myserver-1:9092"); 
    config.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); 
    config.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); 
    config.put("schema.registry.url", "http://myserver-1:8089"); 
    config.put("acks", "all"); 

    producer = new KafkaProducer(config); 

    // parse schema 
    Schema.Parser parser = new Schema.Parser(); 
    schema = parser.parse(TEST_SCHEMA); 
} 

public void send() { 
    // generate key 
    int key = (int) (Math.random() * 20); 

    // generate record 
    GenericData.Record r = new GenericData.Record(schema); 
    r.put("str1", "text" + key); 
    r.put("str2", "text2" + key); 
    r.put("int1", key); 

    final ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, "K" + key, (GenericRecord) r); 
    producer.send(record, new Callback() { 
     public void onCompletion(RecordMetadata metadata, Exception e) { 
      if (e != null) { 
       logger.error("Send failed for record {}", record, e); 
       messageErrorCounter++; 
       return; 
      } 
      logger.debug("Send succeeded for record {}", record); 
      messageCounter++; 
     } 
    }); 
} 

public String getStats() { return "Messages sent: " + messageCounter + "/" + messageErrorCounter; } 

public long getMessageCounter() { 
    return messageCounter + messageErrorCounter; 
} 

public void close() { 
    producer.close(); 
} 

public static void main(String[] args) throws InterruptedException, UnknownHostException { 
    // initialize kafka producer 
    KafkaProducerWrapper kafkaProducerWrapper = new KafkaProducerWrapper("my-test-topic"); 

    long max = 10000000L; 
    for (long i = 0; i < max; i++) { 
     kafkaProducerWrapper.send(); 
    } 

    logger.info("producer-demo sent all messages"); 
    while (kafkaProducerWrapper.getMessageCounter() < max) 
    { 
     logger.info(kafkaProducerWrapper.getStats()); 
     Thread.sleep(2000); 
    } 

    logger.info(kafkaProducerWrapper.getStats()); 
    kafkaProducerWrapper.close(); 
} 

Und ich benutze den Confluent HDFS Connector in Standalone-Modus Daten HDFS zu schreiben. Die Konfiguration ist wie folgt:

name=hdfs-consumer-test 
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector 
tasks.max=1 

topics=my-test-topic 

hdfs.url=hdfs://my-cluster/kafka-test 
hadoop.conf.dir=/etc/hadoop/conf/ 
flush.size=100000 
rotate.interval.ms=20000 

# increase timeouts to avoid CommitFailedException 
consumer.session.timeout.ms=300000 
consumer.request.timeout.ms=310000 

heartbeat.interval.ms= 60000 
session.timeout.ms= 100000 

Der Anschluss der Daten in HDFS schreibt, aber nach 20000 ms warten (aufgrund rotate.interval.ms) nicht alle Nachrichten empfangen werden.

scala> spark.read.avro("/kafka-test/topics/my-test-topic/partition=*/my-test-topic*") 
    .count() 
res0: Long = 9749015 

Eine Idee, was ist der Grund für dieses Verhalten? Wo ist mein Fehler? Ich verwende Confluent 3.0.1/Kafka 10.0.0.1.

+2

Sind die letzten Nachrichten nicht in HDFS verschoben? Wenn ja, ist es wahrscheinlich, dass Sie in das hier beschriebene Problem kommen https://github.com/confluentinc/kafka-connect-hdfs/pull/100 Versuchen Sie, eine weitere Nachricht an das Thema nach der rotate.interval.ms hat senden abgelaufen, um dies zu bestätigen, ist das, worauf Sie gerade stoßen. Wenn Sie basierend auf der Zeit drehen müssen, ist es wahrscheinlich eine gute Idee, das Problem zu beheben. – dawsaw

+0

Das ist die Lösung! Ich habe auf ** Confluent 3.1.1 ** aktualisiert und kann alle Nachrichten in 'HDFS' sehen. Willst du es als Antwort schreiben und ich gebe dir das Lob, das du verdienst? –

+1

Ja klar, wusste nicht, dass es wirklich einen Unterschied gab :) – dawsaw

Antwort

1

Sind die letzten Nachrichten nicht in HDFS verschoben? Wenn ja, ist es wahrscheinlich, dass Sie in das Problem, das hier beschrieben wird, sind https://github.com/confluentinc/kafka-connect-hdfs/pull/100

Versuchen Sie, eine weitere Nachricht an das Thema zu senden, nachdem die rotate.interval.ms abgelaufen ist, um zu bestätigen, dass dies ist, was Sie gerade ausführen. Wenn Sie basierend auf der Zeit drehen müssen, ist es wahrscheinlich eine gute Idee, das Problem zu beheben.