2017-06-07 3 views
0

Ich erhalte von einem Remote-Server Kafka Avro Nachrichten in Python (mit dem Verbraucher von Confluent Kafka Python-Bibliothek), die Clickstream-Daten mit JSON-Wörterbüchern mit Feldern wie Benutzer-Agent, Standort darstellen , URL, usw. Hier ist, wie eine Nachricht aussieht:So dekodieren/deserialisieren Kafka Avro Strings mit Python

b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\[email protected]\x02\xec\xc09#J\[email protected]\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.' 

Wie kann man es entschlüsseln? Ich habe versucht, Bson decode, aber die Zeichenfolge wurde nicht als UTF-8 erkannt, da es eine spezifische Avro-Codierung ist, denke ich. Ich fand https://github.com/verisign/python-confluent-schemaregistry, aber es unterstützt nur Python 2.7. Idealerweise würde ich gerne mit Python 3.5+ und MongoDB arbeiten, um die Daten zu verarbeiten und zu speichern, da es meine aktuelle Infrastruktur ist.

Antwort

0

Ich dachte Avro-Bibliothek war nur Avro-Dateien lesen, aber es tatsächlich gelöst das Problem der Decodierung Kafka-Nachrichten, wie folgt: Ich zuerst importieren Sie die Bibliotheken und geben Sie die Schemadatei als Parameter und erstellen Sie dann eine Funktion zum Decodieren der Nachricht in ein Wörterbuch, das ich in der Verbraucherschleife verwenden kann.

from confluent_kafka import Consumer, KafkaError 
from avro.io import DatumReader, BinaryDecoder 
import avro.schema 

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read()) 
reader = DatumReader(schema) 

def decode(msg_value): 
    message_bytes = io.BytesIO(msg_value) 
    decoder = BinaryDecoder(message_bytes) 
    event_dict = reader.read(decoder) 
    return event_dict 

c = Consumer() 
c.subscribe(topic) 
running = True 
while running: 
    msg = c.poll() 
    if not msg.error(): 
     msg_value = msg.value() 
     event_dict = decode(msg_value) 
     print(event_dict) 
    elif msg.error().code() != KafkaError._PARTITION_EOF: 
     print(msg.error()) 
     running = False 
Verwandte Themen