2016-04-27 10 views
3

Ich bin neu bei Spark und GraphX ​​und habe einige Experimente mit seinem Algorithmus durchgeführt, um verbundene Komponenten zu finden. Ich habe bemerkt, dass die Struktur des Graphen einen starken Einfluss auf die Performance hat.Spark: GraphX ​​findet keine verbundenen Komponenten in Graphen mit wenigen Kanten und langen Pfaden

Es war in der Lage, Grafiken mit Millionen von Scheitelpunkten und Kanten zu berechnen, aber für eine bestimmte Gruppe von Diagrammen wurde der Algorithmus nicht rechtzeitig beendet, aber scheitert schließlich mit einem OutOfMemoryError: GC overhead limit exceeded.

Der Algorithmus scheint Probleme mit Graphen zu haben, die lange Pfade enthalten. Zum Beispiel für diesen Graph { (i,i+1) | i <- {1..200} } schlägt die Berechnung fehl. Allerdings, wenn ich transitive Kanten hinzugefügt, beendet die Berechnung sofort:

{ (i,j) | i <- {1..200}, j <- {i+1,200} } 

Auch wie diese Diagramme wurden kein Problem:

{ (i,1) | i <- {1..200} } 

Hier ist ein minimales Beispiel, das Problem zu reproduzieren:

import org.apache.spark._ 
import org.apache.spark.graphx._ 
import org.apache.spark.graphx.lib._ 
import org.apache.spark.storage.StorageLevel 

import scala.collection.mutable 

object Matching extends Logging { 

    def main(args: Array[String]): Unit = { 
    val fname = "input.graph" 
    val optionsList = args.drop(1).map { arg => 
     arg.dropWhile(_ == '-').split('=') match { 
     case Array(opt, v) => opt -> v 
     case _ => throw new IllegalArgumentException("Invalid argument: " + arg) 
     } 
    } 
    val options = mutable.Map(optionsList: _*) 

    val conf = new SparkConf() 
    GraphXUtils.registerKryoClasses(conf) 

    val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") 
     .map(PartitionStrategy.fromString(_)) 
    val edgeStorageLevel = options.remove("edgeStorageLevel") 
     .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) 
    val vertexStorageLevel = options.remove("vertexStorageLevel") 
     .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) 

    val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) 
    val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, 
     edgeStorageLevel = edgeStorageLevel, 
     vertexStorageLevel = vertexStorageLevel).cache() 
    log.info("Loading graph...") 
    val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) 
    log.info("Loading graph...done") 

    log.info("Computing connected components...") 
    val cc = ConnectedComponents.run(graph) 
    log.info("Computed connected components...done") 

    sc.stop() 
    } 
} 

Die Datei input.graph kann dies aussehen (10 Knoten, 9 Kanten verbinden sie):

1 2 
2 3 
3 4 
4 5 
5 6 
6 7 
7 8 
8 9 
9 10 

Wenn es fehlschlägt, hängt es in ConnectedComponents.run(graph). Die Fehlermeldung lautet:

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: GC overhead limit exceeded 
    at java.util.regex.Pattern.compile(Pattern.java:1054) 
    at java.lang.String.replace(String.java:2239) 
    at org.apache.spark.util.Utils$.getFormattedClassName(Utils.scala:1632) 
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:58) 
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:58) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:58) 
    at org.apache.spark.scheduler.StageInfo$$anonfun$1.apply(StageInfo.scala:80) 
    at org.apache.spark.scheduler.StageInfo$$anonfun$1.apply(StageInfo.scala:80) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
    at org.apache.spark.scheduler.StageInfo$.fromStage(StageInfo.scala:80) 
    at org.apache.spark.scheduler.Stage.<init>(Stage.scala:99) 
    at org.apache.spark.scheduler.ShuffleMapStage.<init>(ShuffleMapStage.scala:44) 
    at org.apache.spark.scheduler.DAGScheduler.newShuffleMapStage(DAGScheduler.scala:317) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$newOrUsedShuffleStage(DAGScheduler.scala:352) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage$1.apply(DAGScheduler.scala:286) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage$1.apply(DAGScheduler.scala:285) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:742) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at scala.collection.mutable.Stack.foreach(Stack.scala:170) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:285) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:389) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:386) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:386) 
    at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:398) 

ich einen lokalen Spark-Knoten leite und die JVM mit den folgenden Optionen starten:

-Dspark.master=local -Dspark.local.dir=/home/phil/tmp/spark-tmp -Xms8g -Xmx8g 

Können Sie mir helfen, zu verstehen, warum es Probleme mit diesem Spielzeug Graph hat (201 Knoten und 200 Kanten), aber andererseits kann ein realistischer Graph mit mehreren Millionen Kanten in etwa 80 Sekunden gelöst werden? (In beiden Beispielen verwende ich die gleiche Einrichtung und Konfiguration.)

UPDATE:

Kann auch in der Funkenschale wiedergegeben werden:

import org.apache.spark.graphx._ 
import org.apache.spark.graphx.lib._ 

val graph = GraphLoader.edgeListFile(sc, "input.graph").cache() 
ConnectedComponents.run(graph) 

ich einen Fehlerbericht erstellt: SPARK-15042

+1

Es ist großartig! es scheint, dass es auch in meinem Fall scheitern wird (ich warte immer noch). Ich nahm diesen Fall {(i, i + 1) | i <- {1..200}}. Anzahl der Überspringungsaufgaben wächst exponentiell. Ich verstehe, warum es 200 Stufen macht, aber weiß nicht, warum es scheitert – Hlib

Antwort

0

Nach SPARK-15042 besteht das Problem immer noch in 2.1.0-Snapshot.

Der Fortschritt bei der Fehlerbehebung ist in SPARK-5484 zu sehen.

Verwandte Themen