Ich versuche, ein Paar Rdd Elastic Search auf Elastic Cloud auf Version 2.4.0
zu schreiben. Ich benutze Plugin, um auf ES zu schreiben. Hier ist der Code, den ich zu IHM schreiben bin mit:Schreiben von RDD von Spark zu Elastic Search schlägt fehl
def predict_imgs(r):
import json
out_d = {}
out_d["pid"] = r["pid"]
out_d["other_stuff"] = r["other_stuff"]
return (r["pid"], json.dumps(out_d))
res2 = res1.map(predict_imgs)
es_write_conf = {
"es.nodes" : image_es,
#"es.port" : "9243",
"es.resource" : "index/type",
"es.nodes.wan.only":"True",
"es.write.operation":"upsert",
"es.mapping.id":"product_id",
"es.nodes.discovery" : "false",
"es.net.http.auth.user": "username",
"es.net.http.auth.pass": "pass",
"es.input.json": "true",
"es.http.timeout":"1m",
"es.scroll.size":"10",
"es.batch.size.bytes":"1mb",
"es.http.retries":"1",
"es.batch.size.entries":"5",
"es.batch.write.refresh":"False",
"es.batch.write.retry.count":"1",
"es.batch.write.retry.wait":"10s"}
res2.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
Der Fehler I ist wie folgt:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
Der interessante Teil ist dies funktioniert, wenn ich eine Aufnahme auf dem ersten tun aus ihm Elemente auf RDD2 und dann eine neue rdd machen und schreiben sie es auf eS, es funktioniert einwandfrei:
x = sc.parallelize([res2.take(1)])
x.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
ich bin mit Elastic Cloud (Cloud-Angebot von Elastic Search) und Databricks (Wolke Fehler bei Apache Spark) Könnte es sein, dass ES nicht in der Lage ist, mit dem Durchsatz von Spark-Schreiben an ES Schritt zu halten? Ich erhöhte unsere Elastic Cloud Größe von 2 GB RAM auf 8 GB RAM.
Gibt es empfohlene Konfigurationen für die es_write_conf
, die ich oben verwendet habe? Irgendwelche anderen confs
, die Sie sich vorstellen können? Hilft das Aktualisieren auf ES 5.0?
Jede Hilfe wird geschätzt. Habe seit ein paar Tagen damit zu kämpfen. Vielen Dank.