2016-07-23 14 views
0

Ich habe die folgenden RDD enthält Sätze von Elementen, die ich nach Artikel Ähnlichkeit gruppieren möchte (Elemente in der gleichen Gruppe gelten als ähnlich. Ähnlichkeit ist transitiv und alle Elemente in Mengen, die atleast haben ein gemeinsames Element ist auch ähnlich)Reduzieren Spark RDD, um mehrere Werte zurückzugeben

Eingang RDD betrachtet:

Set(w1, w2) 
Set(w1, w2, w3, w4) 
Set(w5, w2, w6) 
Set(w7, w8, w9) 
Set(w10, w5, w8) --> All the first 5 set elements are similar as each of the sets have atleast one common item 
Set(w11, w12, w13) 

ich die oben RDD möchte

Set(w1, w2, w3, w4, w5, w6, w7, w8, w9, w10) 
Set(w11, w12, w13) 

Jede sugge zu reduziert werden Wie könnte ich das machen? Ich kann etwas wie unten nicht ausführen, wo ich ignorieren könnte, wenn ich zwei Sätze reduziere, wenn sie keine gemeinsamen Elemente enthalten:

Danke.

Antwort

0

Ihr reduce Algorithmus ist tatsächlich falsch. Zum Beispiel, was passiert, wenn ein Set nicht mit dem nächsten Set zusammengeführt werden kann, aber immer noch mit einem anderen Set im Collect zusammengeführt werden kann.

Es gibt wahrscheinlich bessere Möglichkeiten, aber ich denke mir eine Lösung aus, indem ich es in ein Graphproblem umwandle und Graphx verwende.

val data = Array(Set("w1", "w2", "w3"), Set("w5", "w6"), Set("w7"), Set("w2", "w3", "w4")) 
val setRdd = sc.parallelize(data).cache 

// Generate an unique id for each item to use as vertex's id in the graph 
val itemToId = setRdd.flatMap(_.toSeq).distinct.zipWithUniqueId.cache 
val idToItem = itemToId.map { case (item, itemId) => (itemId, item) } 

// Convert to a RDD of set of itemId 
val newSetRdd = setRdd.zipWithUniqueId 
    .flatMap { case (sets, setId) => 
    sets.map { item => (item, setId) } 
    }.join(itemToId).values.groupByKey().values 

// Create an RDD containing edges of the graph 
val edgeRdd = newSetRdd.flatMap { set => 
    val seq = set.toSeq 
    val head = seq.head 
    // Add an edge from the first item to each item in a set, 
    // including itself 
    seq.map { item => Edge[Long](head, item)} 
    } 

val graph = Graph.fromEdges(edgeRdd, Nil) 

// Run connected component algorithm to check which items are similar. 
// Items in the same component are similar 
val verticesRDD = graph.connectedComponents().vertices 

verticesRDD.join(idToItem).values.groupByKey.values.collect.foreach(println) 
+0

Ausgezeichnet. Vielen Dank. Ich habe Sparks Graphx-Bibliothek nie erforscht und es ist Zeit, dass ich es mache. – soontobeared

Verwandte Themen