2017-12-06 3 views
0

Ich habe ein Problem in Apache Funken GraphX ​​drucken, habe ich versucht, ein Diagramm mit dieser Methode in der Haupt zu partitionieren:Wie man val zu PartitionBy

graph.partitionBy(HDRF, 128) 

HDRF ist eine Methode zu tun, Partitionierung, würde Ich mag einen val zu drucken, die in ihm ist, habe ich versucht zu drucken, aber es ausdrucken nicht alles

/EDIT/

package app 

import org.apache.spark.graphx._ 
import org.apache.spark._ 
import org.apache.spark.rdd.RDD 


/** 
    * Main del sistema 
    */ 

object Main{ 


    def main(args: Array[String]) { 

    val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("HDRF")) 

    // mostra solo i log in caso di errore 
    sc.setLogLevel("ERROR") 

    //modifico il file di testo preso in ingresso 
    val edges:RDD[Edge[String]]= 
     sc.textFile("data/u1.base").map{ line => 
     val fields= line.split("\t") 
     Edge(fields(0).toLong,fields(1).toLong,fields(2)) 
     } 

    val graph: Graph[Any,String] =Graph.fromEdges(edges,"defaultProperty") 

    graph.partitionBy(HDRF,128) 


    } 
} 

.

package app 

import org.apache.spark.graphx._ 
import scala.collection.concurrent.TrieMap 

object HDRF extends PartitionStrategy{ 
    private var init=0; //lo puoi usare per controllare una fase di inizializzazione che viene eseguita solo la prima volta 

    private var partitionsLoad:Array[Long] = Array.empty[Long] //carico (numero di archi) di ogni partizione 
    private val vertexIdListPartitions: TrieMap[Long, List[Long]] = TrieMap() //lista di partizioni associate a ogni vertice 
    private val vertexIdEdges: TrieMap[Long, Long] = TrieMap() //grado di ogni vertice 

    private var edges = 0 

    private var sum :Long= 0 

    override def getPartition(src:VertexId,dst:VertexId,numParts:Int): PartitionID ={ 
    var valoreMax:Long =Int.MaxValue 
    var partScarica:Int = -1 
    var c:Int = 0 
    if(init==0){ 
     init=1 
     partitionsLoad=Array.fill[Long](numParts)(0) 
    } 


    //AGGIORNA IL GRADO CONOSCIUTO DEI VERTICI src E dst NELLA VARIABILE vertexIdEdges 
    vertexIdEdges(src)=vertexIdEdges(src)+1 
    vertexIdEdges(dst)=vertexIdEdges(dst)+1 
    sum=vertexIdEdges(src) + vertexIdEdges(dst) 

    //PARTIZIONA IL GRAFO 
    if((!vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){ 
     //NESSUNO DEI DUE VERTICI E' STATO MAI INSERITO IN QUALCHE PARTIZIONE 
     //SCELGO LA PARTZIIONE PIU' SCARICA E LI ASSEGNO A QUELLA 
     while(c==numParts){ 
     if(partitionsLoad(c)<valoreMax){ 
      valoreMax=partitionsLoad(c) 
      partScarica=c 
     } 
     c=c+1 
     } 
     if(partScarica != -1) { 
     partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1 
     vertexIdListPartitions(partScarica).union(List(src, dst)) 
     } 
     return partScarica 

    }else if(((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst)))||((!vertexIdListPartitions.contains(src))&&(vertexIdListPartitions.contains(dst)))){ 
     //UNO SOLO DEI DUE VERTICI E' GIA' PRESENTE IN ALMENO UNA PARTIZIONE 
     if((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){ 
     //SI TRATTA DI src 
     //SCELGO LA PARTIZIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE src E CI REPLICO dst 
     while(c==numParts){ 
      if(partitionsLoad(c)<valoreMax){ 
      if(vertexIdListPartitions(c).contains(src)) { 
       valoreMax = partitionsLoad(c) 
       partScarica = c 
      } 
      } 
      c=c+1 
     } 
     if(partScarica != -1) { 
      partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1 
      vertexIdListPartitions(partScarica).union(List(dst)) 
     } 

     }else{ 
     //SI TRATTA DI dst 
     //SCELGO LA PARTZIIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE dst E CI REPLICO src 

     while(c==numParts){ 
      if(partitionsLoad(c)<valoreMax){ 
      if(vertexIdListPartitions(c).contains(src)) { 
       valoreMax = partitionsLoad(c) 
       partScarica = c 
      } 
      } 
      c=c+1 
     } 
     if(partScarica != -1) { 
      partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1 
      vertexIdListPartitions(partScarica).union(List(src)) 
     } 

     } 
    }else if(!vertexIdListPartitions(src).intersect(vertexIdListPartitions(dst)).isEmpty){ 
     //ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI ED ESISTE UNA INTERSEZIONE DEI SET NON NULLA (CIOE' ESISTE ALMENO UNA PARTIZIONE CHE LI CONTIENE ENTRAMBI) 
     //SCELGO NELL'INTERSEZIONE DEI SET LA PARTIZIONE PIU' SCARICA 

     while(c==numParts) { 
     if (partitionsLoad(c) < valoreMax) { 
      if (vertexIdListPartitions(c).contains(src) && vertexIdListPartitions(c).contains(dst)) { 
      valoreMax = partitionsLoad(c) 
      partScarica = c 
      } 
     } 
     c = c + 1 
     } 
     if(partScarica != -1) { 
     partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1 
     vertexIdListPartitions(partScarica).union(List(src)) 
     } 

    }else { 
     //ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI MA L'INTERSEZIONE DEI SET E' NULLA (CIOE' NON ESISTE ALCUNA PARTIZIONE CHE LI CONTIENE ENTRAMBI) 
     if((vertexIdEdges(src))>=(vertexIdEdges(dst))){ 
     //SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO dst QUELLA PIU' SCARICA E CI COPIO src 

     while(c==numParts){ 
      if(partitionsLoad(c)<valoreMax){ 
      if(vertexIdListPartitions(c).contains(dst)) { 
       valoreMax = partitionsLoad(c) 
       partScarica = c 
      } 
      } 
      c=c+1 
     } 
     if(partScarica != -1) { 
      partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1 
      vertexIdListPartitions(partScarica).union(List(src)) 
     } 

     }else{ 
     //SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO src QUELLA PIU' SCARICA E CI COPIO dst 

     while(c==numParts){ 
      if(partitionsLoad(c)<valoreMax){ 
      if(vertexIdListPartitions(c).contains(src)) { 
       valoreMax = partitionsLoad(c) 
       partScarica = c 
      } 
      } 
      c=c+1 
     } 
     if(partScarica != -1) { 
      partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1 
      vertexIdListPartitions(partScarica).union(List(dst)) 
     } 
     } 

    } 
    edges=edges+1 
    if(edges==80000) { 
     print(sum) 
    } 
    return partScarica 
    } 
} 

Ich muss Summe drucken, aber ich verstehe nicht, warum es nicht erscheint.

+0

Erstens ist es schwierig, die Frage zu beantworten, ohne die Implementierung von 'HDRF' zu sehen. Zweitens, seien Sie bitte genau zu Ihrer Erwartung - wann und wo erwarten Sie diesen Ausdruck? 'HDRF' wird von Spark auf den verschiedenen Arbeiterknoten verwendet (d. H. Nicht notwendigerweise auf dem Treiber, der Ihr" main "ausführt) und nur dann, wenn das Ergebnis dieser Partitionierung in einer _action_ verwendet wird (was die träge Berechnung des Graphen auslöst). –

+0

Ich habe den Beitrag bearbeitet und den Code hier eingefügt –

Antwort

0

partitionBy, wie viele Graph Funktionen, eine träge bewertet Operation, die ein neues Graph Objekt erzeugt, sondern berechnen nicht wirklich, dass die Grafik, bis es notwendig ist - das heißt, bis einige Aktion wird auf das Ergebnis durchgeführt (zB Zählen , persistierend oder sammelnd).

Mit einem einfacheren Beispiel das können wir sehen, ob wir auf das Ergebnis wirken, diese Drucke zu sehen sein wird:

object SimpleExample extends PartitionStrategy { 
    override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { 
    println("partitioning!") 
    numParts 
    } 
} 

val result = graph.partitionBy(SimpleExample, 128) // nothing printed so far... 

result.edges.count() // now that we act on the result, 
// we see "paritioning!" printed (several times). 

HINWEIS, die von einem PartitionStrategy Druck (oder jede weitergegeben Transformationsfunktion Spark zu sein durchgeführt an einem , Graph oder Dataset) ist nicht zu hilfreich: diese Funktionen werden auf dem Worker-Knoten ausgeführt, daher werden diese Drucke in Ausgaben von verschiedenen Prozessen auf verschiedenen Maschinen "gestreut" und würde wahrscheinlich nicht sein sichtbar in der Ausgabe der Treiber-App lierung (Ihre Hauptfunktion).

Verwandte Themen