2016-04-20 6 views
0

Ich versuche eine optimierte Möglichkeit zu finden, eine Liste von einzigartigen Co-Location-Paarungen zu generieren. Ich habe versucht, dies mit einer Reihe von Flatmaps und verschiedenen Abfragen zu tun, aber ich habe festgestellt, dass die Flatmap beim Ausführen von Millionen von Datensätzen nicht übermäßig performant ist. Jede Hilfe bei der Optimierung wäre dankbar.Spark Aufgabenoptimierung

Der Datensatz ist (Geohash, ID) und ich bin dies auf 30 Node Cluster ausgeführt.

val rdd = sc.parallelize(Seq(("gh5", "id1"), ("gh4", "id1"), ("gh5", "id2"),("gh5", "id3")) 

val uniquePairings = rdd.groupByKey().map(value => 
    value._2.toList.sorted.combinations(2).map{ 
    case Seq(x, y) => (x, y)}.filter(id => 
    id._1 != id._2)).flatMap(x => x).distinct()  

voutput = Array(("id1","id2"),("id1","id3"),("id2","id3")) 

Antwort

1

Eine einfache join sollte hier mehr als genug sein. Zum Beispiel mit DataFrames:

val df = rdd.toDF 
df.as("df1").join(df.as("df2"), 
    ($"df1._1" === $"df2._1") && 
    ($"df1._2" < $"df2._2") 
).select($"df1._2", $"df2._2") 

oder Datensätze

val ds = rdd.toDS 
ds.as("ds1").joinWith(ds.as("ds2"), 
    ($"ds1._1" === $"ds2._1") && 
    ($"ds1._2" < $"ds2._2") 
).map{ case ((_, x), (_, y)) => (x, y)} 
+0

Danke für die Antwort, die Datenrahmen bieten irgendwelche Leistungsvorteile des Joins in Bezug auf das Shuffling. – SChorlton

+0

Nicht eine, die ich kenne, aber es ist effizienter als Standard-Co-Gruppe 'Join' auf RDDs. – zero323

+0

Danke Ich habe eine leicht veränderte Version ausgeführt, um die Schlüssel zuerst zu sortieren, so dass die Suche eher lokal stattfinden sollte. Ich nehme an, Spark wird zunächst lokal aussehen? – SChorlton

0

Schauen Sie in die kartesische Funktion. Es erzeugt eine RDD, die alle möglichen Kombinationen der Eingangs-RDDs ist. Beachten Sie, dass dies eine teure Operation (N^2 in der Größe des RDD)

Cartesian example

+0

Vielen Dank für die Antwort, aber läuft die gleiche Funktion eines cartesianischen mit erscheint viel langsamer. Ich nehme an, das liegt daran, dass es kein echtes n^2 Problem ist. – SChorlton

+0

Sicher könnte sein. Es könnte einen Versuch wert sein, Ihre RDD vor der kartesischen Operation zwischenzuspeichern. Wenn Sie darüber nachdenken, fühlt sich Ihr ursprünglicher Ansatz am besten an, um diese Aufgabe zu erfüllen. Sie sollten jedoch reduceByKey anstelle von groupByKey verwenden. Diese Aufgabe ist assoziativ (Sie brauchen also nicht groupByKey) und reduceByKey speichert einige Datenmischungen (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) – David

+0

Danke wieder habe ich einen combreyByKey in mein vollständiges Beispiel geschrieben, da die Datenstruktur etwas komplexer ist als im Beispiel, aber danke für die heads-ups. Die Aufgabe scheint ungefähr 4 bis 5 Stunden für 11 Millionen Datensätze zu dauern, was nur ein bisschen auf der hohen Seite scheint. Nicht sicher, ob das tatsächlich realistisch ist. – SChorlton