2016-11-25 8 views
2

Ich habe ein Spark-Programm in Python geschrieben, das richtig funktioniert.Überprüfen, ob in einem RDD Wert existiert

Allerdings ist es in Bezug auf Speicherverbrauch ineffizient & Ich versuche, es zu optimieren. Ich führe es auf AWS EMR und EMR tötet den Job für zu viel Speicher verbrauchen.

Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

Ich glaube, das Speicherproblem auf die Tatsache zurückzuführen ist, dass ich meine RDDs sammle (dh mit .collect()) bei verschiedenen Anlässen, weil in späteren Stadien, ich brauche zu testen, ob irgendein Wert in der Liste vorhanden ist aus diesen RDDs gemacht oder nicht.

Also, momentan sieht mein Code wie folgt aus:

myrdd = data.map(lambda word: (word,1))  \ 
     .reduceByKey(lambda a,b: a+b) \ 
     .filter(lambda (a, b): b >= 5) \ 
     .map(lambda (a,b) : a)   \ 
     .collect() 

und später irgendwann im Code

if word in myrdd: 
    mylist.append(word) 

myrdd2 = data2.map(lambda word: (word,1))  \ 
     .reduceByKey(lambda a,b: a+b) \ 
     .filter(lambda (a, b): b >= 5) \ 
     .map(lambda (a,b) : a)   \ 
     .collect() 

if word in myrdd2: 
    mylist2.append(word) 

und dann bin ich dieses Muster mehrere Male wiederholen.

Gibt es eine Möglichkeit, den Betrieb

if word in myrdd: 
    do something 

ohne Sammeln des rdd zuerst zu tun?

Gibt es eine Funktion wie rdd.contains()?

P.S: Ich speichere nichts im Speicher. Mein Funke Zusammenhang sieht wie folgt aus:

jobName = "wordcount" 
sc = SparkContext(appName = jobName) 

...... 
...... 

sc.stop() 
+2

nicht verwenden .collect() es bringt alle Daten zum Fahrer, der ein Problem verursacht, wenn Sie größere Datenmenge haben. benutze myrdd2.foreachRDD und überprüfe, ob der Wert vorhanden ist – Backtrack

+0

word = sc.broadcast ([w1, w2, w3]) valuepresent = myrd.filter {lambda x: x in wort} sowas wird das auch eine workaround würde ich mir vorstellen – Backtrack

Antwort

1

Fehlermeldung aus GARN sagt, dass collect ist kein Problem, weil Ihr Testamentsvollstrecker (und kein Treiber) Gedächtnisprobleme haben.

Zuerst versuchen Sie, den Fehler Vorschlag und Boost spark.yarn.executor.memoryOverhead zu folgen - wenn Sie pyspark auf YARN laufen lassen, können Sie YARN sagen, um ein wenig größere Behälter für den Arbeitsspeicher der Pythonarbeiter zuzuteilen.

Als nächstes betrachten Sie Operationen, für die Executoren große Mengen an Speicher benötigen. Sie verwenden reduceByKey, vielleicht können Sie die Anzahl der Partitionen erhöhen, um sie hinsichtlich des verwendeten Speichers kleiner zu machen. Schauen Sie sich numPartitions Parameter: http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey

Schließlich, wenn Sie überprüfen möchten, ob rdd einen Wert enthält dann filtern, dass nur durch diesen Wert und überprüfen Sie es mit count oder first, zum Beispiel:

looking_for = "....." 
contains = rdd.filter(lambda a: a == looking_for).count() > 0 
+0

Danke. Werden die Executoren durch viele RDDs belastet? Für z. Wenn ich etwas wie myrddalias = myrdd mache, belastet das die Erinnerung zusätzlich oder ist das in Ordnung? – Piyush

+1

es kopiert nur die Referenz, rdds selbst wird nicht geklont – Mariusz

+0

Das Problem ist, dass Looking_for ist eine RDD und wenn ich einen Filter auf eine andere RDD es zeigt mir einen Fehler, dass ich nicht eine Transformation in eine andere setzen kann. Looking_for ist eine Liste und ich möchte meine rdd auf der Grundlage eines bestimmten Wertes, der in der Suche nach for RDD existiert oder nicht, zurückschneiden.Der genaue Fehler - Ausnahme: Es scheint, dass Sie versuchen, eine RDD zu senden oder eine RDD von einer Aktion oder einer Umwandlung zu referenzieren. RDD-Transformationen und -Aktionen können nur vom Treiber und nicht innerhalb anderer Transformationen aufgerufen werden. – Piyush

Verwandte Themen