2016-06-22 17 views
0

Ich habe zwei RDD [Array [String]], nennen wir sie rdd1 und rdd2. Ich würde eine neue RDD erstellen, die nur die Einträge von rdd2 nicht in rdd1 (basierend auf einem Schlüssel) enthält. Ich benutze Spark auf Scala via Intellij.Vergleichen von zwei RDDs

gruppiert I RDD1 und RDD2 durch einen Schlüssel (Ich werde nur die Schlüssel der beiden RDDs vergleichen):

val rdd1Grouped = rdd1.groupBy(line => line(0)) 
val rdd2Grouped = rdd2.groupBy(line => line(0)) 

Dann habe ich eine leftOuterJoin verwendet:

val output = rdd1Grouped.leftOuterJoin(rdd2Grouped).collect { 
    case (k, (v, None)) => (k, v) 
} 

aber das doesn‘ t scheint das richtige Ergebnis zu geben.

Was ist los damit? Irgendwelche schlägt vor?

Beispiel RDDS (jede Zeile ist ein Array [String], OFC):

rdd1      rdd2     output (in some form) 

1,18/6/2016    2,9/6/2016     2,9/6/2016 
1,18/6/2016    2,9/6/2016 
1,18/6/2016    2,9/6/2016 
1,18/6/2016    2,9/6/2016 
1,18/6/2016    1,20/6/2016 
3,18/6/2016    1,20/6/2016 
3,18/6/2016    1,20/6/2016 
3,18/6/2016 
3,18/6/2016 
3,18/6/2016 

In diesem Fall möchte ich nur hinzufügen, um den Eintrag "2,9/6/2016", weil der Schlüssel "2" ist nicht in rdd1.

Antwort

1

neue RDD nur die Einträge von RDD2, die nicht in RDD1

verbinden links alle Schlüssel in RDD1 behalten würde und fügen Sie Spalten von RDD2 Schlüsselwerten entsprechen. Also klar links Join/Outer Join ist nicht die Lösung.

rdd1Grouped.subtractByKey(rdd2Grouped) wäre in Ihrem Fall geeignet.

P.S. : Beachte auch, dass, wenn rdd1 kleiner ist, es besser gesendet wird. Auf diese Weise würde zum Zeitpunkt des Subtrahierens nur die zweite rdd gestreamt werden.

+0

seit 'rdd1' und' rdd2' sind nicht RDDs von Tupeln, subtractByKey kann nicht wirklich auf sie genannt werden, wie sie ist. Vermutlich fehlen einige Aufrufe von 'keyBy', z. 'rdd1.keyBy (_ (0)) .subtractByKey (rdd2.keyBy (_ (0))). values' –

+0

richtig, ich sollte rdd1Grouped und rdd2Grouped verwenden, anstatt nur rdd1 und rdd2. Korrigiere jetzt in meiner Antwort. –

+1

tatsächlich ist die Gruppierung redundant (und oft teuer), und sie ändert den Ergebnistyp - ich denke, 'keyBy' macht hier mehr Sinn. –

0

Schalter rdd1Grouped und rdd2Grouped und dann filter verwenden:

val output = rdd2Grouped.leftOuterJoin(rdd1Grouped).filter(line => { 
    line._2._2.isEmpty 
}).collect