2016-10-27 3 views
0

In Apache Spark kann man mehrere RDDs effizient mit sparkContext.union() Methode verbinden. Gibt es etwas Ähnliches, wenn jemand mehrere RDDs schneiden möchte? Ich habe in sparkContext Methoden gesucht und ich konnte nichts oder irgendwo anders finden. Eine Lösung könnte sein, die rdds zu vereinigen und dann die Duplikate zu holen, aber ich denke nicht, dass es so effizient sein könnte. Angenommen, ich habe folgendes Beispiel mit Schlüssel/Wert-Paar Sammlungen:Apache Spark - Kreuzung mehrerer RDDs

val rdd1 = sc.parallelize(Seq((1,1.0),(2,1.0))) 
val rdd2 = sc.parallelize(Seq((1,2.0),(3,4.0),(3,1.0))) 

Ich möchte eine neue Sammlung abzurufen, die folgende Elemente aufweist:

(1,2.0) (1,1.0) 

Aber natürlich für mehrere RDDs und nicht nur zwei .

+0

warum möchten Sie mehrere rdds schneiden? und auf welcher Grundlage? – Shankar

+0

Ich denke jetzt ist meine Frage besser zu verstehen. –

Antwort

2

Versuchen:

val rdds = Seq(
    sc.parallelize(Seq(1, 3, 5)), 
    sc.parallelize(Seq(3, 5)), 
    sc.parallelize(Seq(1, 3)) 
) 
rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys 
+0

Das funktioniert, danke. Aber wenn jede Sammlung Schlüssel/Wert-Paare statt Ganzzahlen hätte, die nicht funktionieren würden, oder? Außerdem verwendet diese Methode einen Join. Normalerweise ist ein Hash-Partitionierer eine gute Übung, oder? –

+0

Sollte so lange arbeiten, wie Elemente gehashed werden können. Es sei denn, Sie möchten etwas anderes ausgeben. Versteh die zweite Frage nicht. –

+0

Eine gute Übung vor der Verwendung von Join zwischen rdds ist die Verwendung der Hash-Partitionierung, um redundante Umstellungen zu vermeiden und effizienter zu gestalten. In Ihrem Code verwenden Sie keine Hash-Partitionierung. –

2

Es gibt eine intersection method auf RDD, aber es dauert nur eine andere RDD:

def intersection(other: RDD[T]): RDD[T] 

Lassen Sie uns die Methode, die Sie in Bezug auf diese wollen implementieren.

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = { 
    rdds.reduce { case (left, right) => left.intersection(right) 
} 

Wenn Sie schon bei der Umsetzung der Funken sah Joins, können Sie die Ausführung optimieren, indem die größte RDD erste Stelle zu setzen:

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = { 
    rdds.sortBy(rdd => -1 * rdd.partitions.length) 
    .reduce { case (left, right) => left.intersection(right) 
} 

EDIT: Es sieht aus wie ich dein Beispiel falsch gelesen: Ihr Text sah aus, als ob du nach dem umgekehrten Verhalten für rdd.union gesucht hättest, aber dein Beispiel deutete an, dass du dich nach Schlüssel schneiden lassen willst. Meine Antwort bezieht sich nicht auf diesen Fall.