2017-01-27 4 views
1

Ich habe ein Skript, ein paar hundert GB Daten zu verarbeiten, und ich habe Probleme, wenn ich versuche, über 500 GB zu verarbeiten, darunter alles funktioniert es gut. Debugging der App zuerst habe ich den Fehler über die Grenze der spark.driver.maxResultSize Wert, so dass ich diesen Wert auf 4g erhöht und die Aufgabe, die jetzt scheiterte funktioniert, aber, jetzt habe ich ein anderes Problem, wenn ich versuchen, die Ergebnisse in eine Parkett-Datei zu speichern, schlägt die Aufgabe und diesen Fehler zu werfenSet 2 Konfigurationswerte mit sparkConf(). Set

17/01/27 06:35:27 INFO DAGScheduler: Job 7 failed: parquet at NativeMethodAccessorImpl.java:-2, took 12.106390 s 
17/01/27 06:35:27 ERROR InsertIntoHadoopFsRelation: Aborting job. 
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 146:0 was 765207245 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize 

So scheint ich den spark.akka.frameSize Wert erhöhen müssen

Meine Frage ist, ich bin schon Erhöhung von maxResultSize mit der Funktion sparkConf(). set, aber ich weiß nicht wie (oder die Syntax), um beide Werte in der sparkConf() .set zu erhöhen. Diese

ist, wie mein Code in jenen Teilen aussehen:

conf = (SparkConf().set("spark.driver.maxResultSize", "4g")) 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

Und die Aufgabe, das heißt Fehler:

sqlContext.sql(sql).coalesce(5).write.parquet(sys.argv[3], mode='overwrite') 

Nur eine Sache, ich kann die conf Dateien im Funken Cluster nicht ändern Außerdem verwenden wir luigi, um die Aufgabe zu funken, also kann ich die Zeichenkette zum Senden von Funken zum Zeitpunkt der Ausführung des Skripts nicht ändern (deshalb modifiziere ich die Parameter direkt aus dem Skript)

jede Anleitung wird geschätzt.

+1

Was ist 'sc = SparkContext (conf = SparkConf(). Satz ("spark.driver.maxResultSize", "4g") schreiben. Satz (" spark.akka.frameSize "," 256M "))"? –

+1

Hoppla ... _ "spark.akka.frameSize ... Maximale Nachrichtengröße (in MB)" _ >> es sollte '.set (" spark.akka.frameSize "," 256 ") sein' –

Antwort

1

RTFM - direkt aus der Spark 1.6.3 Python API documentation ...

Klasse pyspark. SparkConf(...)

Alle Setter-Methoden in dieser Klasse Unterstützung Verkettungs.
Für Beispiel können Sie conf.setMaster"local").setAppName("My app")