2

Ich kann nicht herausfinden, wie man einen Datenframe zu elasticsearch mit Python von Spark schreiben. Ich folgte den Schritten von here.Python Spark Dataframe zu Elasticsearch

Hier ist mein Code:

# Read file 
df = sqlContext.read \ 
    .format('com.databricks.spark.csv') \ 
    .options(header='true') \ 
    .load('/vagrant/data/input/input.csv', schema = customSchema) 

df.registerTempTable("data") 

# KPIs 
kpi1 = sqlContext.sql("SELECT * FROM data") 

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"} 
kpi1.rdd.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf) 

Above Code gibt

Verursacht durch: net.razorvine.pickle.PickleException: erwartete Null Argumente für den Bau von ClassDict (für pyspark.sql .types._create_row)

ich begann auch das Skript aus: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py um sicherzustellen, dass elasticsearch-hadoop

+0

welche version von elasticseach benutzen sie? – eliasah

+0

@ eliasah'2.4.0', versuchte auch mit 'elasticsearch-hadoop-5.0.0-alpha5.jar' für die 2.x-Versionen von es – dimzak

Antwort

1

Für den Anfang geladen wird saveAsNewAPIHadoopFilethis may happen only accidentally ein RDD von (key, value) Paaren und in Ihrem Fall erwartet. Das gleiche gilt für das von Ihnen deklarierte Wertformat.

Ich bin nicht vertraut mit Elastic sondern nur auf der Grundlage der Argumente, die Sie wahrscheinlich etwas ähnliches wie dies versuchen sollten:

kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...) 

Da Elastic-Hadoop bereitstellen SQL-Datenquelle sollten Sie auch in der Lage sein, das überspringen und speichern direkt Daten:

df.write.format("org.elasticsearch.spark.sql").save(...) 
0

zero323 Wie gesagt, ist der einfachste Weg, um einen Datenrahmen von PySpark zu Elasticsearch laden ist, mit der Methode

Dataframe.write.format("org.elasticsearch.spark.sql").save("index/type")