2015-08-31 6 views
6

Ich versuche, eine große CSV an Kafka zu senden. Die Grundstruktur besteht darin, eine Zeile der CSV-Datei zu lesen und sie mit der Kopfzeile zu komprimieren.Senden von großen CSV an Kafka mit Python Spark

a = dict(zip(header, line.split(",") 

Diese dann in ein Json mit umgewandelt wird:

message = json.dumps(a) 

dann verwende ich kafka-Python-Bibliothek, die Nachricht zu senden

from kafka import SimpleProducer, KafkaClient 
kafka = KafkaClient("localhost:9092") 
producer = SimpleProducer(kafka) 
producer.send_messages("topic", message) 

Mit PYSPARK ich einfach eine RDD erstellt haben von Nachrichten aus der CSV-Datei

sc = SparkContext() 
text = sc.textFile("file.csv") 
header = text.first().split(',') 
def remove_header(itr_index, itr): 
    return iter(list(itr)[1:]) if itr_index == 0 else itr 
noHeader = text.mapPartitionsWithIndex(remove_header) 

messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(",")) 

Jetzt möchte ich diese Nachrichten senden: Ich habe eine Funktion

def sendkafka(message): 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka) 
    return producer.send_messages('topic',message) 

Dann definiere ich eine neue RDD erstellen, um die Nachrichten zu senden

sentRDD = messageRDD.map(lambda x: kafkasend(x)) 

ich dann rufen sentRDD.count()

Welche beginnt zu wühlen und Nachrichten zu senden

Leider ist dies sehr langsam. Es sendet 1000 Nachrichten pro Sekunde. Dies ist in einem Cluster von 10 Knoten mit jeweils 4 CPUs und 8 GB Speicher.

Im Vergleich dauert das Erstellen der Nachrichten ca. 7 Sekunden auf einer 10 Millionen Zeile csv. ~ ungefähr 2gb

Ich denke, das Problem ist, dass ich einen Kafka-Produzenten innerhalb der Funktion instanziiere. Allerdings, wenn ich nicht spreche dann beklagt sich, dass der Produzent nicht existiert, obwohl ich versucht habe, es global zu definieren.

Vielleicht kann jemand etwas Licht darauf werfen, wie dieses Problem angegangen werden kann.

Danke,

Antwort

6

Sie einen einzelnen Hersteller pro Partition erstellen und entweder mapPartitions oder foreachPartition verwenden:

def sendkafka(messages): 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka) 
    for message in messages: 
     yield producer.send_messages('topic', message) 

sentRDD = messageRDD.mapPartitions(sendkafka) 

Wenn oben allein werden Sie es nicht helfen, können versuchen, zu verlängern, um ein asynchronous producer verwenden.

In Spark 2.x ist es auch möglich, eine Kafka-Datenquelle zu verwenden. Sie werden spark-sql-kafka Glas, passend Funken- und Scala-Version (hier 2.2.0 und 2.11 jeweils) umfassen haben:

spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 

convert Daten an einen DataFrame (wenn es nicht DataFrame ist bereits):

messageDF = spark.createDataFrame(messageRDD, "string") 

und schreiben DataFrameWriter mit: zero323

(messageDF.write 
    .format("kafka") 
    .option("topic", topic_name) 
    .option("kafka.bootstrap.servers", bootstrap_servers) 
    .save()) 
+0

Dank. mit einem einzigen Produzenten außerhalb des Funkens mit einem Async könnte ich 8000 pro Sekunde bekommen. Also habe ich etwas optimiert. Ich entdeckte, dass ich 15 Partitionen für diese csv hatte, also gab ich dem Job 15 Kerne.Ich spielte dann mit den asynchronen Optionen, bis die Stapelgröße 20000 war. Das gab mir einen maximalen Durchsatz von 225 Tausend pro Sekunde. Mit etwas Tuning habe ich also einen vernünftigen Preis bekommen. Das sind 45 Sekunden, um eine 10 Millionen Zeile csv zu streamen. –

+1

@PhineasDashevsky, wäre es sehr hilfreich, wenn Sie den Code für Ihre endgültige Lösung teilen könnten. – Picarus

+0

https://iabdb.me/2015/09/09/kafka-on-the-shore-my-experiences-benchmarking-apache-kafka-part-i/ In diesem Artikel habe ich den Code und eine längere Beschreibung wie es geht. –