2016-06-15 10 views
1

Mein Spark Cluster hat 1 Master und 2 Arbeiter. Die Anwendung liest CSV-Dateien von s3 in DataFrames, registriert sie als temporäre Tabellen und verwendet sqlContext, um SQL-Abfragen zum Erstellen neuer Datenrahmen auszuführen. Dann werden diese DFs in MySql DB gespeichert. Diese Jobs laufen alle auf mehreren Knoten.Warum wird nur einer von Spark-Jobs mit nur einem Executor ausgeführt?

Aber wenn ich diese Tabellen von DB zurück zu DataFrames, registrieren Sie sie als temporäre Tabellen und führen Sie sqlContext Abfrage, wird die gesamte Verarbeitung von nur einem Knoten erledigt. Was könnte das verursachen? Hier

ist ein Beispiel für meine Code:

DataFrame a = sqlContext.read().format("com.databricks.spark.csv").options(options) 
       .load("s3://s3bucket/a/part*"); 
DataFrame b = sqlContext.read().format("com.databricks.spark.csv").options(options) 
       .load("s3://s3bucket/b/part*"); 

a.registerTempTable("a"); 
b.registerTempTable("b"); 

DataFrame c = sqlContext.sql("SELECT a.name, b.name from a join b on a.id = b.a_id"); 

c.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "c", prop); 

// other jobs are similar 

Map<String, String> dOptions = new HashMap<String, String>(); 
dOptions.put("driver", MYSQL_DRIVER); 
dOptions.put("url", MYSQL_CONNECTION_URL); 

dOptions.put("dbtable", "(select * from c) AS c"); 
rC= sqlContext.read().format("jdbc").options(dOptions).load(); 
rC.cache(); 

dOptions.put("dbtable", "(select * from d) AS d"); 
rD= sqlContext.read().format("jdbc").options(dOptions).load(); 
rD.cache(); 

dOptions.put("dbtable", "(select * from f) AS f"); 
rF= sqlContext.read().format("jdbc").options(dOptions).load(); 
rF.cache(); 

rC.registerTempTable("rC"); 
rD.registerTempTable("rD"); 
rF.registerTempTable("rF"); 

DataFrame result = sqlContext.sql("SELECT rC.name, rD.name, rF.date from rC join rD on rC.name = rD.name join rF on rC.date = rF.date"); 

result.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "result_table", prop); 
+1

Was verwenden Sie, um Ihre Arbeit einzureichen? Wenn Sie "einen Knoten" meinen, meinen Sie, dass Sie nur einen Mitarbeiter auf der Master-Benutzeroberfläche sehen können? –

+0

@Hawknight Ich benutze Spark-Submit, um Job zu übermitteln. Hier ist der vollständige Befehl: "spark-submit --class MyClass --deploy-mode cluster s3: //bucket/file.jar". Ich bin monitornia über Ganglia sowie Spark UI. Beide erkennen Arbeiter und ich sehe, dass einige Jobs parallel ausgeführt werden. Während des Jobs, den ich oben gepostet habe, hält die Parallelisierung jedoch an und bestimmte Aufgaben (innerhalb der Phase) werden nur von einem Arbeiterknoten ausgeführt. Hier ist ein Screenshot von Ganglia UI, der das zeigt. http://pokit.org/get/img/2a5bcd853b97aad2bc9e86a90c9b2733.png – KikiRiki

+0

Nur basierend auf dem Code, ist es ziemlich schwer zu sagen, warum nur ein Arbeiter auf bestimmten Stufen arbeiten würde. Können Sie versuchen genau zu bestimmen, in welchem ​​Stadium und welcher Aufgabe das Problem auftritt? –

Antwort

0

Könnten Sie mit uns teilen Sie Ihre SparkConf() Objekt?

Ein SparkConf() - Objekt enthält die Konfiguration für eine Spark-Anwendung. Es werden verschiedene Parameter als Funkenschlüsselwertpaare wie einzustellen:

-Das Master

-Anzahl der Vollstrecker

-Anzahl Zieher Kerne

-heap Speicher zugewiesen

-other ..

+1

Hier mein SparkConf Objekt: SparkConf conf = neu SparkConf(). SetAppName ("org.spark.SchemaTransformer"). SetMaster ("Garn-Cluster"); Ausführungsumgebung: spark.master = Garn executor.cores = 4 executor.memory = 5120M – KikiRiki

+1

Ich denke, Sie vermissen einige Argumente. Versuchen Sie, folgende Methoden hinzuzufügen. So etwas wie: SparkConf(). Set ("master", "Garn") \ .set ("spark.submit.deployMode", "cluster") .set ("spark.executor.instances", "8 ") \ .set (" spark.executor.cores "," 4 ") \ .set (" spark.executor.memory "," 5120M ") \ .set (" spark.driver.memory "," 5120M ") \ .set (" spark.yarn.memoryOverhead“, "10000M") \ .set ("spark.yarn.driver.memoryOverhead", "10000M") – theudbald

Verwandte Themen