2016-04-22 4 views
1

Ich versuche, serialisierte Nachrichten über einen Kafka-Broker mit Python 2.7 und Apache Avro (Python-Client) auszutauschen. Ich würde gerne wissen, ob es eine Möglichkeit gibt, Nachrichten auszutauschen, ohne vorher ein Schema zu erstellen.Python - Schema-lose Apache Avro Datenserialisierung

Dies ist der Code (unter Verwendung eines Schemas, sensor.avsc, die Sache, die ich vermeiden wollen):

from kafka import SimpleProducer, KafkaClient 
import avro.schema 
import io, random 
from avro.io import DatumWriter 

# To send messages synchronously 
kafka = KafkaClient('localhost:9092') 
producer = SimpleProducer(kafka, async = False) 

# Kafka topic 
topic = "sensor_network_01" 

# Path to user.avsc avro schema that i don't want 
schema_path="sensor.avsc" 
schema = avro.schema.parse(open(schema_path).read()) 


for i in xrange(100): 
    writer = avro.io.DatumWriter(schema) 
    bytes_writer = io.BytesIO() 
    encoder = avro.io.BinaryEncoder(bytes_writer) 
    # creation of random data 
    writer.write({"sensor_network_name": "Sensor_1", "value": random.randint(0,10), "threshold_value":10 }, encoder) 

    raw_bytes = bytes_writer.getvalue() 
    producer.send_messages(topic, raw_bytes) 

Dies ist die sensor.avsc Datei:

{ 
    "namespace": "sensors.avro", 
    "type": "record", 
    "name": "Sensor", 
    "fields": [ 
     {"name": "sensor_network_name", "type": "string"}, 
     {"name": "value", "type": ["int", "null"]}, 
     {"name": "threshold_value", "type": ["int", "null"]} 
    ] 
} 

Antwort

0

I haven Ich habe noch niemanden gesehen, der das gemacht hat, aber ich habe es selbst gewollt. Sie müssen es vielleicht selbst schreiben, aber es sollte nicht so schlimm sein - vorausgesetzt, das zu serialisierende Objekt ist einfach; Alles, was Sie tun müssen, ist, die Felder durchzulaufen und eine Karte von Python-Typen zu Avro-Typen zu haben. Verschachtelte Felder erfordern eine Rekursion, um in jedes Objekt zu graben.

1

Dieser Code:

import avro.schema 
import io, random 
from avro.io import DatumWriter, DatumReader 
import avro.io 

# Path to user.avsc avro schema 
schema_path="user.avsc" 
schema = avro.schema.Parse(open(schema_path).read()) 


for i in xrange(1): 
    writer = avro.io.DatumWriter(schema) 
    bytes_writer = io.BytesIO() 
    encoder = avro.io.BinaryEncoder(bytes_writer) 
    writer.write({"name": "123", "favorite_color": "111", "favorite_number": random.randint(0,10)}, encoder) 
    raw_bytes = bytes_writer.getvalue() 

    print(raw_bytes) 

    bytes_reader = io.BytesIO(raw_bytes) 
    decoder = avro.io.BinaryDecoder(bytes_reader) 
    reader = avro.io.DatumReader(schema) 
    user1 = reader.read(decoder) 
    print(" USER = {}".format(user1)) 

für dieses Schema zu tun

{"namespace": "example.avro", 
"type": "record", 
"name": "User", 
"fields": [ 
    {"name": "name", "type": "string"}, 
    {"name": "favorite_number", "type": ["int", "null"]}, 
    {"name": "favorite_color", "type": ["string", "null"]} 
] 
} 

ist das, was Sie brauchen.

Kredit geht an this gist