2017-02-02 4 views
-1

Ich versuche, eine Spark Join mit Textdateien zu schreiben. Aber mein Beitritt funktioniert nicht so, wie ich es erwarte.Spark Join funktioniert nicht

val sc = new SparkContext("local[*]", "employeedata") 

val empoyees= sc.textFile("../somewhere/employee.data") 


val reputations= sc.textFile("../somewhere/reputations.data") 



val employeesRdd= empoyees.map(x=> (x.toString().split(",")(0), x)) 

val reputationsRdd= reputations.map(y=> (y.toString().split(",")(0), y)) 


val joineddata = employeesRdd.join(reputationsRdd).map(_._2) 

employee.data wäre wie unter

emp_id, Vorname, Nachname, Alter, Land, Bildung

reputations.data wäre wie unter

emp_id, Ruf

Aber meine Ergebnisse, die ich bekomme, wären wie folgt:

(empid, Vorname, Nachname, Alter, Land, Bildung ,, Employeeid, Ruf)

Aber ich brauche die unten stehende Ausgabe

(empid, Vorname, Nachname, Alter, Land, Bildung, auch entfernt Ruf)

zusätzliches Komma zwischen Mitarbeiter-ID und der Bildung sollten vor und die Mitarbeiter-ID entfernt werden der Ruf sollte

werden müssen

Kann jemand mir bitte helfen? Hier

Antwort

0

ist einige psuedo-Code (es kompilieren könnte und sogar funktionieren, wenn wir Glück haben!), Um Ihnen ein wenig Hilfe:

// split the fields and key by id 
// you could map the arrays to case classes here 
val employeesRdd= empoyees.map(x=> x.toString().split(",")) 
    .keyBy(e => e(0)) 

val reputationsRdd= reputations.map(y=> y.toString().split(",")) 
    .keyBy(r => r(0)) 

val joineddata = employeesRdd.join(reputationsRdd) 
    .map { case (key, (Array(emp_id, firstname,lastname,age,country,Education), Array(employee_id, reputation))) => 
     (empid,first name, last name, age, country,education,reputation) 
    }