Ich versuche, 3 Dateien beizutreten und die endgültige Datei in der Konsole mit Pyspark auszugeben. Ich habe sie konvertiert, um RDDs zu paaren und ich kann 2 von ihnen ohne jedes Problem verbinden. Aber aus irgendeinem Grund bin ich nicht in der Lage, eine dritte gepaarte RDD mit der zuvor verbundenen RDD zu verbinden. Unten ist die Struktur von 3 Dateien.Join mehrere gepaarte RDDs in pyspark
EmployeeManager.csv
E01,John
E02,Kate
E03,Emily
EmployeeName.csv
E01,Brick
E02,Blunt
E03,Leo
EmployeeSalary.csv
E01,50000
E02,50000
E03,45000
Unten ist der pyspark Code, die ich bisher habe.
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf())
manager = sc.textFile('spark1/EmployeeManager.csv')
name = sc.textFile('spark1/EmployeeName.csv')
salary = sc.textFile('spark1/EmployeeSalary.csv')
managerPairRDD = manager.map(lambda x: x.split(','))
namePairRDD = name.map(lambda x: x.split(','))
salaryPairRDD = salary.map(lambda x: x.split(','))
ns = namePairRDD.join(salaryPairRDD)
print 'After name and salary join: \n %s' %ns.collect()
nsm = managerPairRDD.join(ns)
print 'After joining 3 files: %s' %nsm.collect()
Das Programm wird während des letzten Schritts gestoppt. Unten ist der Konsolenausgang
[[email protected] Spark]$ pyspark q7.py
WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.
Use ./bin/spark-submit <python file>
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
After name and salary join:
[(u'E02', (u'Blunt', u'50000')), (u'E03', (u'Leo', u'45000')), (u'E01', (u'Brick', u'50000'))]
[Stage 3:=======================================> (2 + 0)/3]
Bitte lassen Sie mich wissen, wie man vorgeht, um dieses Problem zu lösen. Jede Hilfe wird sehr geschätzt.
Danke,
Können Sie weitere Informationen zur Spark-Version geben? Ich mache das auf Spark 2.1.0 und es funktioniert für mich. –
Ich bin auf Spark 1.6.0 auf meiner Cloudera VM ausgeführt. –
Sie können die Details über die Phase überprüfen, in der der Auftrag fehlschlägt, die Funken verwenden, um mehr zu debuggen – voldy