2017-01-03 3 views
4

Ich versuche, die Summe der Knotenwerte in einem Spark Graph Graph zu berechnen. Kurz gesagt ist das Diagramm ein Baum und der oberste Knoten (Wurzel) sollte alle Kinder und ihre Kinder summieren. Mein Graph ist eigentlich ein Baum, wie diese und die erwarteten summierten Wert 1850 sollte sieht: auf diese sieht wie folgt ausSpark GraphX ​​Aggregation Summation

         +----+ 
        +---------------> | VertexID 14 
        |    | | Value: 1000 
       +---+--+   +----+ 
    +------------>  | VertexId 11 
    |   |  | Value:  +----+ 
    |   +------+ Sum of 14 & 24 | VertexId 24 
+---++    +--------------> | Value: 550 
| | VertexId 20     +----+ 
| | Value: 
+----++Sum of 11 & 911 
     | 
     |   +-----+ 
     +----------->  | VertexId 911 
        |  | Value: 300 
        +-----+ 

Der erste Stich:

val vertices: RDD[(VertexId, Int)] = 
     sc.parallelize(Array((20L, 0) 
     , (11L, 0) 
     , (14L, 1000) 
     , (24L, 550) 
     , (911L, 300) 
    )) 

    //note that the last value in the edge is for factor (positive or negative) 
    val edges: RDD[Edge[Int]] = 
     sc.parallelize(Array(
     Edge(14L, 11L, 1), 
     Edge(24L, 11L, 1), 
     Edge(11L, 20L, 1), 
     Edge(911L, 20L, 1) 
    )) 

    val dataItemGraph = Graph(vertices, edges) 


    val sum: VertexRDD[(Int, BigDecimal, Int)] = dataItemGraph.aggregateMessages[(Int, BigDecimal, Int)](
     sendMsg = { triplet => triplet.sendToDst(1, triplet.srcAttr, 1) }, 
     mergeMsg = { (a, b) => (a._1, a._2 * a._3 + b._2 * b._3, 1) } 
    ) 

    sum.collect.foreach(println) 

Dies gibt die folgende :

(20,(1,300,1)) 
(11,(1,1550,1)) 

Es tut die Summe für Vertex 11, aber es wird nicht auf den Stammknoten aufgerollt (Ecke 20). Was fehlt mir oder gibt es einen besseren Weg? Natürlich kann der Baum eine beliebige Größe haben und jeder Eckpunkt kann eine beliebige Anzahl von Kinderkanten aufweisen.

+0

Ich frage mich jetzt, ob Pregel die richtige Methode ist zu verwenden. – will

Antwort

2

Gegeben ist der Graph gerichtet es (wie in Beispiel Sie es zu sein scheint) möglich sein sollte, ein Pregel-Programm zu schreiben, das tut, was Sie fragen nach:

val result = 
dataItemGraph.pregel(0, activeDirection = EdgeDirection.Out)(
    (_, vd, msg) => msg + vd, 
    t => Iterator((t.dstId, t.srcAttr)), 
    (x, y) => x + y 
) 

result.vertices.collect().foreach(println) 

// Output is: 
// (24,550) 
// (20,1850) 
// (14,1000) 
// (11,1550) 
// (911,300) 

Ich verwende EdgeDirection.Out so dass die Nachrichten nur von unten nach oben gesendet werden (sonst würden wir in eine Endlosschleife geraten).

Verwandte Themen