2016-06-02 7 views
8

Ich versuche, die SimpleConsumer in Kafka 9 zu verwenden, um Benutzer zu ermöglichen, versetzt von einer Zeit, Ereignisse zu wiederholen - aber die Nachrichten ich von Kafka sind in einer sehr seltsamen Codierung empfange:Kafka Java SimpleConsumer seltsame Codierung

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p= 
          ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username" 

Mit dem KafkaConsumer lassen sich diese Meldungen gut parsen. Hier ist der Code, den ich Nachrichten mit dem SimpleConsumer abzurufen bin mit:

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { 
     long currentOffset = messageAndOffset.offset(); 
     if (currentOffset < readOffset) { 
      log.debug("Found an old offset - skip"); 
      continue; 
     } 

     readOffset = messageAndOffset.nextOffset(); 

     int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id 
     byte[] data = messageAndOffset.message().payload().array(); 
     byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset); 
     log.debug("Read " + new String(realData, "UTF-8")); 
} 

ich den Code hinzugefügt, um den ersten x Bytes zu überspringen, nachdem ich immer UTF-32 Fehler immer über Bytes zu hoch zu sein, die ich nehme an, weil Kafka fügt den Nutzdaten Informationen wie Nachrichtengröße voran. Ist das ein Avro-Artefakt?

+0

Sieht nicht wie Avro aus - zumindest nicht binär Avro-Codierung. Bei der Binärcodierung würden Sie die Schema-Informationen nicht im Datensatz erhalten. –

+0

Mein Code ist etwas anders - Anstatt 'payload() .array()' zu verwenden, mache ich es so, wie es hier gemacht wird: https://cwiki.apache.org/confluence/display/KAFKA/0.8. 0 + SimpleConsumer + Beispiel ZB: 'payload(). Get (Bytes)' wobei 'Bytes' vom Typ' byte [] 'ist. Die 'get()' Methode kopiert die Daten, während 'array()' das tatsächliche Array zurückgibt und in den Javadocs für 'ByteBuffer' steht:" Änderungen am Inhalt dieses Puffers bewirken, dass der Inhalt des zurückgegebenen Arrays geändert wird, und und umgekehrt." Vielleicht passiert so etwas? –

+0

@Gandalf Würden Sie bitte Ihre Nachricht nur in Notepad ++ öffnen. Wenn Sie es mit anderen Wordpad oder Notizblock öffnen, wird es gefährlich aussehen. Also öffne es in Notepad ++ und lass es uns wissen. – SkyWalker

Antwort

0

Ich fand nie eine gute Antwort auf diese - (... Pro Partition, obwohl die Umsetzung ist schlecht), aber ich schaltete den SimpleConsumerzur Verwendung von Kafka Abfrage für die Offsets ich brauchte, und dann die native KafkaConsumer seek(TopicPartition, offset) mit verwenden oder seekToBeginning(TopicPartition), um die Nachrichten abzurufen. Hoffentlich werden sie dem nativen Client die Fähigkeit hinzufügen, Nachrichten von einem bestimmten Zeitstempel in der nächsten Version abzurufen.

0

Suchst du das?

readOffset = messageAndOffset.nextOffset(); 
ByteBuffer payload = messageAndOffset.message().payload(); 

    if(payload == null) { 
     System.err.println("Message is null : " + readOffset); 
     continue; 
    } 

final byte[] realData = new byte[payload.limit()]; 
payload.get(realData); 
System.out.println("Read " + new String(realData, "UTF-8")); 
0

Sie können in regelmäßigen Abständen die Partition lügen einen Offset Sie mit dem Zeitstempel der Nachricht begehen (vielleicht nicht jeder begehen) und dann kann man ein gewisses Maß in der Zukunft zu Ihrem Verbraucher Offsets eingestellt. Ich nehme an, dass dies für das Debugging der Produktion ist.

Ich bezweifle, dass sie eine solche Funktion hinzufügen, scheint es nicht machbar, wenn man bedenkt, wie Kafka funktioniert, obwohl ich mich irren kann, gibt es immer geniales Zeug. Ich würde das Logging machen.