2016-03-29 21 views
1

Ich möchte die indirekten Knoten finden, die mit einem bestimmten Knoten verbunden sind. I wie unten ...So finden Sie die indirekten Knoten, die mit einem bestimmten Knoten in Spark Graphx verbunden sind

graph.connectedComponents 

jedoch mit Hilfe der angeschlossenen Komponenten Klasse von Graph versucht, wird es geben für die alle graph..but i für einen bestimmten Knoten soll.

Ich habe versucht, wie unten auch zu tun.

Dies gibt die direkten Knoten eines bestimmten Knotens und ich rekursive dies nur mit RDD-Operationen. Könnte jemand bitte dabei helfen?

Antwort

1

versuchen, etwas wie folgt aus:

graph.edges.filter(_.srcId == x).map(e => (e.dstId, null)).join(
    graph.collectNeighborIds(EdgeDirection.Either) 
).flatMap{t => t._2._2}.collect.toSet 

Wenn Sie als diese tiefer gehen wollen, würde ich so etwas wie die Pregel API verwenden. Im Wesentlichen können Sie damit wiederholt Nachrichten von Knoten zu Knoten senden und die Ergebnisse aggregieren.

Edit: Pregel Lösung

Ich habe endlich die die Iterationen auf eigene Faust zu stoppen. Bearbeitungen unten. Vor diesem Diagramm:

graph.vertices.collect 
res46: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array()), (8,Array()), (1,Array()), (9,Array()), (5,Array()), (6,Array()), (2,Array()), (3,Array()), (7,Array()))  

graph.edges.collect 
res47: Array[org.apache.spark.graphx.Edge[Double]] = Array(Edge(1,2,0.0), Edge(2,3,0.0), Edge(3,4,0.0), Edge(5,6,0.0), Edge(6,7,0.0), Edge(7,8,0.0), Edge(8,9,0.0), Edge(4,2,0.0), Edge(6,9,0.0), Edge(7,9,0.0)) 

Wir werden Nachrichten vom Typ senden Array[Long] - ein Array aller VertexIds von angeschlossenen Knoten. Nachrichten werden stromaufwärts gehen - die dst sendet die src ihre VertexId zusammen mit allen anderen nachgeschalteten VertexIds. Wenn der Upstream-Knoten die Verbindung bereits kennt, wird keine Nachricht gesendet. Schließlich weiß jeder Knoten über jeden verbundenen Knoten und es werden keine weiteren Nachrichten gesendet.

Zuerst definieren wir unsere vprog. Nach der Dokumentation:

das benutzerdefinierte Vertex-Programm, das auf jedem Scheitelpunkt verläuft und empfängt die eingehende Nachricht und berechnet einen neuen Scheitelwert. Bei der ersten Iteration wird das Vertex-Programm an allen Scheitelpunkten aufgerufen und übergibt die Standardmeldung . Bei nachfolgenden Iterationen wird das Vertex-Programm nur an den Knoten aufgerufen, die Nachrichten empfangen.

def vprog(id: VertexId, orig: Array[Long], newly: Array[Long]) : Array[Long] = { 
    (orig ++ newly).toSet.toArray 
} 

Dann definieren wir unsere sendMsg - bearbeitet:src vertauscht & dst

einen Benutzer bereitgestellte Funktion, die aus Kanten von Vertices angewendet wird, die Nachrichten in der aktuellen Iteration empfangenen

def sendMsg(trip: EdgeTriplet[Array[Long],Double]) : Iterator[(VertexId, Array[Long])] = { 
    if (trip.srcAttr.intersect(trip.dstAttr ++ Array(trip.dstId)).length != (trip.dstAttr ++ Array(trip.dstId)).toSet.size) { 
    Iterator((trip.srcId, (Array(trip.dstId) ++ trip.dstAttr).toSet.toArray)) 
    } else Iterator.empty } 

Next unser mergeMsg:

ein Benutzer bereitgestellte Funktion, die zwei eingehende Nachrichten vom Typ nimmt A und verschmilzt sie zu einer einzigen Nachricht vom Typ A. Diese Funktion muss kommutativ und assoziativ sein und idealerweise die Größe A sollte nicht erhöhen.

Leider werden wir die Regel im letzten Satz brechen oben:

def mergeMsg(a: Array[Long], b: Array[Long]) : Array[Long] = { 
    (a ++ b).toSet.toArray 
} 

Dann führen wir pregel - bearbeitet: entfernt maxIterations, standardmäßig Int.MaxValue

val result = graph.pregel(Array[Long]())(vprog, sendMsg, mergeMsg) 

Und Sie können die Ergebnisse betrachten:

result.vertices.collect 
res48: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array(4, 2, 3)), (8,Array(8, 9)), (1,Array(1, 2, 3, 4)), (9,Array(9)), (5,Array(5, 6, 9, 7, 8)), (6,Array(6, 7, 9, 8)), (2,Array(2, 3, 4)), (3,Array(3, 4, 2)), (7,Array(7, 8, 9))) 
+0

Gut. Das ist überhaupt kein einfaches Problem. Es gibt eine aggregateMessages-API, die Sie sich ansehen sollten, und einen Algorithmus namens "Pregel", den Sie verwenden könnten. Im Wesentlichen können Sie damit wiederholt Nachrichten von Knoten zu Knoten senden und die Ergebnisse aggregieren. Es ist schwierig, dies in einem Kommentar zu erklären - aber stellen Sie sich vor, dass Knoten 7 in der ersten Iteration des Algorithmus (8,9) an Knoten 2 senden würde und Knoten 8 (10,11) an Knoten 7 senden würde In der zweiten Iteration würde der Knoten 7 (10, 11) an den Knoten 2 senden. So haben Sie zwei Wiederholungen von Pregel, die Sie beantworten müssen. –

+0

Sicher David .. ich werde damit arbeiten ... und werde Sie aktualisieren ... Danke – Devndra

+0

Also, großartig - jetzt akzeptiere und upvote meine Antwort !! –

Verwandte Themen