2017-04-26 2 views
0

Ich versuche, 3 Dateien beizutreten und die endgültige Datei in der Konsole mit Pyspark auszugeben. Ich habe sie konvertiert, um RDDs zu paaren und ich kann 2 von ihnen ohne jedes Problem verbinden. Aber aus irgendeinem Grund bin ich nicht in der Lage, eine dritte gepaarte RDD mit der zuvor verbundenen RDD zu verbinden. Unten ist die Struktur von 3 Dateien.Join mehrere gepaarte RDDs in pyspark

EmployeeManager.csv

E01,John 
E02,Kate 
E03,Emily 

EmployeeName.csv

E01,Brick 
E02,Blunt 
E03,Leo 

EmployeeSalary.csv

E01,50000 
E02,50000 
E03,45000 

Unten ist der pyspark Code, die ich bisher habe.

from pyspark import SparkConf, SparkContext 
sc = SparkContext(conf=SparkConf()) 

manager = sc.textFile('spark1/EmployeeManager.csv') 
name = sc.textFile('spark1/EmployeeName.csv') 
salary = sc.textFile('spark1/EmployeeSalary.csv') 

managerPairRDD = manager.map(lambda x: x.split(',')) 
namePairRDD = name.map(lambda x: x.split(',')) 
salaryPairRDD = salary.map(lambda x: x.split(',')) 

ns = namePairRDD.join(salaryPairRDD) 
print 'After name and salary join: \n %s' %ns.collect() 

nsm = managerPairRDD.join(ns) 
print 'After joining 3 files: %s' %nsm.collect() 

Das Programm wird während des letzten Schritts gestoppt. Unten ist der Konsolenausgang

[[email protected] Spark]$ pyspark q7.py 
WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0. 
Use ./bin/spark-submit <python file> 
SLF4J: Class path contains multiple SLF4J bindings. 
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 
After name and salary join:              
[(u'E02', (u'Blunt', u'50000')), (u'E03', (u'Leo', u'45000')), (u'E01', (u'Brick', u'50000'))] 
[Stage 3:=======================================>     (2 + 0)/3] 

Bitte lassen Sie mich wissen, wie man vorgeht, um dieses Problem zu lösen. Jede Hilfe wird sehr geschätzt.

Danke,

+0

Können Sie weitere Informationen zur Spark-Version geben? Ich mache das auf Spark 2.1.0 und es funktioniert für mich. –

+0

Ich bin auf Spark 1.6.0 auf meiner Cloudera VM ausgeführt. –

+0

Sie können die Details über die Phase überprüfen, in der der Auftrag fehlschlägt, die Funken verwenden, um mehr zu debuggen – voldy

Antwort

0

Schließlich dachte ich dies aus, indem die Eingabedateien als Datenrahmen umgewandelt werden.

from pyspark import SparkConf, SparkContext 
from pyspark import SQLContext 

sc = SparkContext(conf=SparkConf()) 
sqlContext = SQLContext(sc) 

manager = sc.textFile('spark1/EmployeeManager.csv') 
name = sc.textFile('spark1/EmployeeName.csv') 
salary = sc.textFile('spark1/EmployeeSalary.csv') 

manager_df = manager.map(lambda x: list(x.split(','))).toDF(["col1","col2"]) 
name_df = name.map(lambda x: list(x.split(','))).toDF(["col1","col2"]) 
salary_df = salary.map(lambda x: list(x.split(','))).toDF(["col1","col2"]) 

nsm = name_df.alias('name_df') \ 
.join(salary_df.alias('salary_df'), name_df.col1==salary_df.col1) \ 
.join(manager_df.alias('manager_df'), name_df.col1==manager_df.col1) \ 
.select(name_df.col1, name_df.col2, salary_df.col2, manager_df.col2) 

nsm.saveAsTextFile('/spark1/q7sol')