Ich habe ein Spark-Programm in Python geschrieben, das richtig funktioniert.Überprüfen, ob in einem RDD Wert existiert
Allerdings ist es in Bezug auf Speicherverbrauch ineffizient & Ich versuche, es zu optimieren. Ich führe es auf AWS EMR und EMR tötet den Job für zu viel Speicher verbrauchen.
Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Ich glaube, das Speicherproblem auf die Tatsache zurückzuführen ist, dass ich meine RDDs sammle (dh mit .collect()) bei verschiedenen Anlässen, weil in späteren Stadien, ich brauche zu testen, ob irgendein Wert in der Liste vorhanden ist aus diesen RDDs gemacht oder nicht.
Also, momentan sieht mein Code wie folgt aus:
myrdd = data.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
und später irgendwann im Code
if word in myrdd:
mylist.append(word)
myrdd2 = data2.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
if word in myrdd2:
mylist2.append(word)
und dann bin ich dieses Muster mehrere Male wiederholen.
Gibt es eine Möglichkeit, den Betrieb
if word in myrdd:
do something
ohne Sammeln des rdd zuerst zu tun?
Gibt es eine Funktion wie rdd.contains()?
P.S: Ich speichere nichts im Speicher. Mein Funke Zusammenhang sieht wie folgt aus:
jobName = "wordcount"
sc = SparkContext(appName = jobName)
......
......
sc.stop()
nicht verwenden .collect() es bringt alle Daten zum Fahrer, der ein Problem verursacht, wenn Sie größere Datenmenge haben. benutze myrdd2.foreachRDD und überprüfe, ob der Wert vorhanden ist – Backtrack
word = sc.broadcast ([w1, w2, w3]) valuepresent = myrd.filter {lambda x: x in wort} sowas wird das auch eine workaround würde ich mir vorstellen – Backtrack