1

Niedrige Anzahl der Schreibvorgänge in elasticsearch mit Spark Java.Spark + Elastic Suche schreiben Leistungsproblem

Hier sind die Konfigurationen

Verwendung 13.xlarge Maschinen für ES-Cluster

4 instances each have 4 processors. 
Set refresh interval to -1 and replications to '0' and other basic 
configurations required for better writing. 

Funken:

2 Knoten EMR-Cluster mit

2 Core instances 
    - 8 vCPU, 16 GiB memory, EBS only storage 
    - EBS Storage:1000 GiB 

1 Master node 
    - 1 vCPU, 3.8 GiB memory, 410 SSD GB storage 

ES Index hat 16 shards im Mapping definiert.

unter Config mit, wenn der Auftrag ausgeführt wird,

executor-memory - 8g 
spark.executor.instances=2 
spark.executor.cores=4 

und mit

es.batch.size.bytes - 6MB 
es.batch.size.entries - 10000 
es.batch.write.refresh - false 

mit dieser Konfiguration, versuche ich 1 Million Dokumente zu laden (jedes Dokument eine Größe von 1300 Bytes hat), so dass es Lädt die Last bei 500 Datensätzen/Dokumenten pro ES-Knoten.

und im Funken log bin jede Aufgabe sehen

-1116 bytes result sent to driver 

Funken-Code

JavaRDD<String> javaRDD = jsc.textFile("<S3 Path>"); 
    JavaEsSpark.saveJsonToEs(javaRDD,"<Index name>"); 

Auch wenn ich auf die Grafik In-Netzwerk sucht in ES-Cluster ist es sehr niedrig ist, und ich sehe, EMR sendet keine riesigen Daten über ein Netzwerk. Gibt es eine Möglichkeit, dass ich Spark anweisen kann, eine richtige Anzahl von Daten zu senden, um das Schreiben schneller zu machen?

ODER

Gibt es eine andere Konfiguration, die ich zu zwicken bin fehlt. Ursache Ich sehe 500docs pro Sekunde pro es-Instanz ist niedriger. Kann jemand bitte leitet, was mit dieser Einstellung ist fehlt meint es schreibt

Vielen Dank im Voraus Leistung

+0

In Ihrem Verzeichnis s3, lesen Sie eine einzelne Datei oder viele Dateien? –

+0

mit vielen Dateien – camelBeginner

Antwort

0

Sie können ein Problem haben, hier zu verbessern. spark.executor.instances=2

Sie sind auf zwei Executoren beschränkt, von denen Sie 4 basierend auf Ihrer Clusterkonfiguration haben könnten. Ich würde das auf 4 oder mehr ändern. Ich könnte auch versuchen, Executor-Speicher = 1500M, Kerne = 1, Instanzen = 16. Ich mag es, ein wenig Overhead in meinem Gedächtnis zu lassen, weshalb ich von 2G auf 1.5G gefallen bin (aber Sie können 1.5G nicht tun, also müssen wir 1500M machen). Wenn Sie sich über Ihre Executoren verbinden, verbessert dies die Leistung.

Würde Code zum Debuggen weiter benötigen. Ich frage mich, ob Sie nur in Ihrem Treiber und nicht in Ihren Worker-Knoten mit der elastischen Suche verbunden sind. Das heißt, Sie erhalten nur eine Verbindung statt einer für jeden Executor.

+0

Vielen Dank, Dan, wenn Sie sagen, Executoren auf 4 erhöhen, meinen Sie, den EMR-Cluster zu erhöhen, um 4 Instanzen anstelle von 2 zu haben? Die Art, wie ich mich mit ES verbinde, ist über den untenstehenden Code. SparkConf conf = new SparkConf(). SetAppName ("SparkES-Anwendung"); – camelBeginner

+0

SparkConf conf = new SparkConf().setAppName ("SparkES-Anwendung"); conf.set ("es.nodes", ""); conf.set ("es.batch.size.bytes", "6mb"); conf.set ("es.batch.size.entries", "10000"); conf.set ("es.batch.concurrent.request", "4"); conf.set ("es.batch.write.refresh", "false"); conf.set ("spark.kryoserializer.buffer", "24"); JavaSparkContext jsc = neuer JavaSparkContext (conf); JavaRDD javaRDD = jsc.textFile ("S3 PATH"); JavaEsSpark.saveJsonToEs (javaRDD, "Indexname"); – camelBeginner

+0

und die letzte zwei Zeile oben ist in einer Methode und aufgerufen von main() und ich sende einen Parameter zur Verwendung in der Methode loadSNindex (jsc); – camelBeginner