Ich habe ein Problem in Apache Funken GraphX drucken, habe ich versucht, ein Diagramm mit dieser Methode in der Haupt zu partitionieren:Wie man val zu PartitionBy
graph.partitionBy(HDRF, 128)
HDRF ist eine Methode zu tun, Partitionierung, würde Ich mag einen val zu drucken, die in ihm ist, habe ich versucht zu drucken, aber es ausdrucken nicht alles
/EDIT/
package app
import org.apache.spark.graphx._
import org.apache.spark._
import org.apache.spark.rdd.RDD
/**
* Main del sistema
*/
object Main{
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("HDRF"))
// mostra solo i log in caso di errore
sc.setLogLevel("ERROR")
//modifico il file di testo preso in ingresso
val edges:RDD[Edge[String]]=
sc.textFile("data/u1.base").map{ line =>
val fields= line.split("\t")
Edge(fields(0).toLong,fields(1).toLong,fields(2))
}
val graph: Graph[Any,String] =Graph.fromEdges(edges,"defaultProperty")
graph.partitionBy(HDRF,128)
}
}
.
package app
import org.apache.spark.graphx._
import scala.collection.concurrent.TrieMap
object HDRF extends PartitionStrategy{
private var init=0; //lo puoi usare per controllare una fase di inizializzazione che viene eseguita solo la prima volta
private var partitionsLoad:Array[Long] = Array.empty[Long] //carico (numero di archi) di ogni partizione
private val vertexIdListPartitions: TrieMap[Long, List[Long]] = TrieMap() //lista di partizioni associate a ogni vertice
private val vertexIdEdges: TrieMap[Long, Long] = TrieMap() //grado di ogni vertice
private var edges = 0
private var sum :Long= 0
override def getPartition(src:VertexId,dst:VertexId,numParts:Int): PartitionID ={
var valoreMax:Long =Int.MaxValue
var partScarica:Int = -1
var c:Int = 0
if(init==0){
init=1
partitionsLoad=Array.fill[Long](numParts)(0)
}
//AGGIORNA IL GRADO CONOSCIUTO DEI VERTICI src E dst NELLA VARIABILE vertexIdEdges
vertexIdEdges(src)=vertexIdEdges(src)+1
vertexIdEdges(dst)=vertexIdEdges(dst)+1
sum=vertexIdEdges(src) + vertexIdEdges(dst)
//PARTIZIONA IL GRAFO
if((!vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
//NESSUNO DEI DUE VERTICI E' STATO MAI INSERITO IN QUALCHE PARTIZIONE
//SCELGO LA PARTZIIONE PIU' SCARICA E LI ASSEGNO A QUELLA
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
valoreMax=partitionsLoad(c)
partScarica=c
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src, dst))
}
return partScarica
}else if(((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst)))||((!vertexIdListPartitions.contains(src))&&(vertexIdListPartitions.contains(dst)))){
//UNO SOLO DEI DUE VERTICI E' GIA' PRESENTE IN ALMENO UNA PARTIZIONE
if((vertexIdListPartitions.contains(src))&&(!vertexIdListPartitions.contains(dst))){
//SI TRATTA DI src
//SCELGO LA PARTIZIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE src E CI REPLICO dst
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(src)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(dst))
}
}else{
//SI TRATTA DI dst
//SCELGO LA PARTZIIONE PIU' SCARICA TRA QUELLE IN CUI E' PRESENTE dst E CI REPLICO src
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(src)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src))
}
}
}else if(!vertexIdListPartitions(src).intersect(vertexIdListPartitions(dst)).isEmpty){
//ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI ED ESISTE UNA INTERSEZIONE DEI SET NON NULLA (CIOE' ESISTE ALMENO UNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
//SCELGO NELL'INTERSEZIONE DEI SET LA PARTIZIONE PIU' SCARICA
while(c==numParts) {
if (partitionsLoad(c) < valoreMax) {
if (vertexIdListPartitions(c).contains(src) && vertexIdListPartitions(c).contains(dst)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c = c + 1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src))
}
}else {
//ENTRAMBI I VERTICI SONO PRESENTI IN DIVERSE PARTIZIONI MA L'INTERSEZIONE DEI SET E' NULLA (CIOE' NON ESISTE ALCUNA PARTIZIONE CHE LI CONTIENE ENTRAMBI)
if((vertexIdEdges(src))>=(vertexIdEdges(dst))){
//SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO dst QUELLA PIU' SCARICA E CI COPIO src
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(dst)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(src))
}
}else{
//SCELGO TRA LE PARTIZIONI A CUI E' ASSEGNATO src QUELLA PIU' SCARICA E CI COPIO dst
while(c==numParts){
if(partitionsLoad(c)<valoreMax){
if(vertexIdListPartitions(c).contains(src)) {
valoreMax = partitionsLoad(c)
partScarica = c
}
}
c=c+1
}
if(partScarica != -1) {
partitionsLoad(partScarica) = partitionsLoad(partScarica) + 1
vertexIdListPartitions(partScarica).union(List(dst))
}
}
}
edges=edges+1
if(edges==80000) {
print(sum)
}
return partScarica
}
}
Ich muss Summe drucken, aber ich verstehe nicht, warum es nicht erscheint.
Erstens ist es schwierig, die Frage zu beantworten, ohne die Implementierung von 'HDRF' zu sehen. Zweitens, seien Sie bitte genau zu Ihrer Erwartung - wann und wo erwarten Sie diesen Ausdruck? 'HDRF' wird von Spark auf den verschiedenen Arbeiterknoten verwendet (d. H. Nicht notwendigerweise auf dem Treiber, der Ihr" main "ausführt) und nur dann, wenn das Ergebnis dieser Partitionierung in einer _action_ verwendet wird (was die träge Berechnung des Graphen auslöst). –
Ich habe den Beitrag bearbeitet und den Code hier eingefügt –