Ich kann nicht herausfinden, wie man einen Datenframe zu elasticsearch mit Python von Spark schreiben. Ich folgte den Schritten von here.Python Spark Dataframe zu Elasticsearch
Hier ist mein Code:
# Read file
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true') \
.load('/vagrant/data/input/input.csv', schema = customSchema)
df.registerTempTable("data")
# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")
es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
Above Code gibt
Verursacht durch: net.razorvine.pickle.PickleException: erwartete Null Argumente für den Bau von ClassDict (für pyspark.sql .types._create_row)
ich begann auch das Skript aus: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py
um sicherzustellen, dass elasticsearch-hadoop
welche version von elasticseach benutzen sie? – eliasah
@ eliasah'2.4.0', versuchte auch mit 'elasticsearch-hadoop-5.0.0-alpha5.jar' für die 2.x-Versionen von es – dimzak