2017-01-09 1 views
1

Ich versuche, eine Nachricht zu übergeben, die ein Diagramm übergibt, um rekursive Features zu berechnen. Ich erhalte einen Fehler, wenn ich ein Diagramm definiere, dessen Scheitelpunkte die Ausgabe von aggregateMessages sind. Code für KontextGraphX ​​VertexRDD NullPointerException

> val newGraph = Graph(newVertices, edges) 

newGraph: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected] 

//This is the RDD that causes the problem 
> val result = newGraph.aggregateMessages[List[Double]](
    {triplet => triplet.sendToDst(triplet.srcAttr)}, 
    {(a,b) => a.zip(b).map { case (x, y) => x + y }}, 
    {TripletFields.Src}) 

result: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[1990] at RDD at VertexRDD.scala:57 

> result.take(1) 
res121: Array[(org.apache.spark.graphx.VertexId, List[Double])] = Array((1944425548,List(0.0, 0.0, 137.0, 292793.0))) 

Bisher kein Problem, aber wenn ich versuche

> val newGraph2 = Graph(result, edges) 

newGraph2: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected] 

> val result2 = newGraph2.aggregateMessages[List[Double]](
    {triplet => triplet.sendToDst(triplet.srcAttr)}, 
    {(a,b) => a.zip(b).map { case (x, y) => x + y }}, 
    {TripletFields.Src}) 

> result2.count 

bekomme ich folgende (getrimmt) Fehler:

result2: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[2009] at RDD at VertexRDD.scala:57 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4839.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4839.0 (TID 735, 10.0.2.15): java.lang.NullPointerException 
    at $anonfun$2.apply(<console>:62) 
    at $anonfun$2.apply(<console>:62) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531) 
    at $anonfun$1.apply(<console>:61) 
    at $anonfun$1.apply(<console>:61) 
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
... 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
... 
Caused by: java.lang.NullPointerException 
    at $anonfun$2.apply(<console>:62) 
    at $anonfun$2.apply(<console>:62) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531) 
    at $anonfun$1.apply(<console>:61) 
    at $anonfun$1.apply(<console>:61) 
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    ... 3 more 

Ich glaube nicht, das ist ein Typ Mismatch-Fehler, weil aggregateMessages eine VertexRDD zurückgibt, irgendwelche Ideen, warum ich dieses Problem bekomme?

Antwort

1

Nicht alle Knoten im Diagramm werden von aggregateMessages zurückgegeben, nur die, die eine Nachricht empfangen. Die NullPointerException wird durch Kanten im Diagramm verursacht, die auf diese Knoten zeigen, sowie das Fehlen eines Standardknotenwerts in der Diagrammdefinition.