2016-11-11 3 views
3

Ich versuche, ein Paar Rdd Elastic Search auf Elastic Cloud auf Version 2.4.0 zu schreiben. Ich benutze Plugin, um auf ES zu schreiben. Hier ist der Code, den ich zu IHM schreiben bin mit:Schreiben von RDD von Spark zu Elastic Search schlägt fehl

def predict_imgs(r): 
    import json 
    out_d = {} 
    out_d["pid"] = r["pid"] 
    out_d["other_stuff"] = r["other_stuff"] 

    return (r["pid"], json.dumps(out_d)) 

res2 = res1.map(predict_imgs) 

es_write_conf = { 
"es.nodes" : image_es, 
#"es.port" : "9243", 
"es.resource" : "index/type", 
"es.nodes.wan.only":"True", 
"es.write.operation":"upsert", 
"es.mapping.id":"product_id", 
"es.nodes.discovery" : "false", 
"es.net.http.auth.user": "username", 
"es.net.http.auth.pass": "pass", 
"es.input.json": "true", 
"es.http.timeout":"1m", 
"es.scroll.size":"10", 
"es.batch.size.bytes":"1mb", 
"es.http.retries":"1", 
"es.batch.size.entries":"5", 
"es.batch.write.refresh":"False", 
"es.batch.write.retry.count":"1", 
"es.batch.write.retry.wait":"10s"} 

res2.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

Der Fehler I ist wie folgt:

Py4JJavaError: An error occurred while calling  z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 

Der interessante Teil ist dies funktioniert, wenn ich eine Aufnahme auf dem ersten tun aus ihm Elemente auf RDD2 und dann eine neue rdd machen und schreiben sie es auf eS, es funktioniert einwandfrei:

x = sc.parallelize([res2.take(1)]) 
x.saveAsNewAPIHadoopFile(
path='-', 
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
keyClass="org.apache.hadoop.io.NullWritable", 
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
conf=es_write_conf) 

ich bin mit Elastic Cloud (Cloud-Angebot von Elastic Search) und Databricks (Wolke Fehler bei Apache Spark) Könnte es sein, dass ES nicht in der Lage ist, mit dem Durchsatz von Spark-Schreiben an ES Schritt zu halten? Ich erhöhte unsere Elastic Cloud Größe von 2 GB RAM auf 8 GB RAM.

Gibt es empfohlene Konfigurationen für die es_write_conf, die ich oben verwendet habe? Irgendwelche anderen confs, die Sie sich vorstellen können? Hilft das Aktualisieren auf ES 5.0?

Jede Hilfe wird geschätzt. Habe seit ein paar Tagen damit zu kämpfen. Vielen Dank.

Antwort

2

Es sieht aus wie ein Problem mit pyspark Berechnungen, nicht unbedingt elasticsearch speichern Prozess. Stellen Sie sicher, Ihre RDDs OK sind von:

  1. count() auf RDD1 ausüben (zu "materialisieren" Ergebnisse)
  2. Performing count() auf RDD2

Wenn Zählungen in Ordnung sind, versuchen mit Caching-Ergebnisse, bevor sie in ES Speichern :

res2.cache() 
res2.count() # to fill the cache 
res2.saveAsNewAPIHadoopFile(... 

Es ist das Problem scheint immer noch versuchen, auf toten Testamentsvollstrecker stderr und stdout suchen (Sie können sie auf Testamentsvollstrecker Registerkarte in S finden ParkUI).

Ich bemerkte auch die sehr kleine Losgröße in es_write_conf, versuchen Sie, es auf 500 oder 1000 zu erhöhen, um bessere Leistung zu erhalten.

Verwandte Themen