Ich versuche, die SimpleConsumer in Kafka 9 zu verwenden, um Benutzer zu ermöglichen, versetzt von einer Zeit, Ereignisse zu wiederholen - aber die Nachrichten ich von Kafka sind in einer sehr seltsamen Codierung empfange:Kafka Java SimpleConsumer seltsame Codierung
7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p=
������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"
Mit dem KafkaConsumer lassen sich diese Meldungen gut parsen. Hier ist der Code, den ich Nachrichten mit dem SimpleConsumer abzurufen bin mit:
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
log.debug("Found an old offset - skip");
continue;
}
readOffset = messageAndOffset.nextOffset();
int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
byte[] data = messageAndOffset.message().payload().array();
byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
log.debug("Read " + new String(realData, "UTF-8"));
}
ich den Code hinzugefügt, um den ersten x Bytes zu überspringen, nachdem ich immer UTF-32 Fehler immer über Bytes zu hoch zu sein, die ich nehme an, weil Kafka fügt den Nutzdaten Informationen wie Nachrichtengröße voran. Ist das ein Avro-Artefakt?
Sieht nicht wie Avro aus - zumindest nicht binär Avro-Codierung. Bei der Binärcodierung würden Sie die Schema-Informationen nicht im Datensatz erhalten. –
Mein Code ist etwas anders - Anstatt 'payload() .array()' zu verwenden, mache ich es so, wie es hier gemacht wird: https://cwiki.apache.org/confluence/display/KAFKA/0.8. 0 + SimpleConsumer + Beispiel ZB: 'payload(). Get (Bytes)' wobei 'Bytes' vom Typ' byte [] 'ist. Die 'get()' Methode kopiert die Daten, während 'array()' das tatsächliche Array zurückgibt und in den Javadocs für 'ByteBuffer' steht:" Änderungen am Inhalt dieses Puffers bewirken, dass der Inhalt des zurückgegebenen Arrays geändert wird, und und umgekehrt." Vielleicht passiert so etwas? –
@Gandalf Würden Sie bitte Ihre Nachricht nur in Notepad ++ öffnen. Wenn Sie es mit anderen Wordpad oder Notizblock öffnen, wird es gefährlich aussehen. Also öffne es in Notepad ++ und lass es uns wissen. – SkyWalker