2015-08-10 14 views
6

Ich bin sehr neu zu funken, aber ich möchte ein Diagramm aus Beziehungen erstellen, die ich von einer Hive-Tabelle erhalten. Ich habe eine Funktion gefunden, die dies erlaubt, ohne die Eckpunkte zu definieren, aber ich kann sie nicht zum Laufen bringen.Wie erstellt man ein Diagramm aus Array [(Beliebige, Beliebige)] mit Graph.fromEdgeTuples

Ich weiß, dass dies nicht ein reproduzierbares Beispiel ist aber hier ist mein Code:

import org.apache.spark.SparkContext 
import org.apache.spark.graphx._ 
import org.apache.spark.rdd.RDD 
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc) 
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect() 
val data_2010 = data.filter(line => line(0)==2010) 
val couples = data_2010.map(line=>(line(2),line(3)) //country to country 

val graph = Graph.fromEdgeTuples(couples, 1) 

Die letzte Zeile erzeugt den folgenden Fehler:

val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1) 
<console>:31: error: type mismatch; 
found : Array[(Any, Any)] 
required: Seq[(org.apache.spark.graphx.VertexId,org.apache.spark.graphx.VertexId)] 
Error occurred in an application involving default arguments. 
val graph = Graph.fromEdgeTuples(sc.parallelize(couples), 1) 

Paare wie folgt aussehen:

couples: Array[(Any, Any)] = Array((MWI,MOZ), (WSM,AUS), (MDA,CRI), (KNA,HTI), (PER,ERI), (SWE,CUB), (DEU,PRK), (THA,DJI), (BIH,SVK), (RUS,THA), (SGP,BLR), (MEX,TGO), (TUR,ZAF), (ZWE,SYC), (UGA,GHA), (OMN,SVN), (NZL,SYR), (CHE,SLV), (CZE,LUX), (TGO,COM), (TTO,WLF), (NGA,PAN), (FJI,UKR), (BRA,ECU), (EGY,SWE), (ITA,ARG), (MUS,MLT), (MDG,DZA), (ARE,SUR), (CAN,GUY), (OMN,COG), (NAM,FIN), (ITA,HMD), (SWE,CHE), (SDN,NER), (TUN,USA), (THA,GMB), (HUN,TTO), (FRA,BEN), (NER,TCD), (CHN,JPN), (DNK,ZAF), (MLT,UKR), (ARM,OMN), (PRT,IDN), (BEN,PER), (TTO,BRA), (KAZ,SMR), (CPV,""), (ARG,ZAF), (BLR,TJK), (AZE,SVK), (ITA,STP), (MDA,IRL), (POL,SVN), (PRY,ETH), (HKG,MOZ), (QAT,GAB), (THA,MUS), (PHL,MOZ), (ITA,SGS), (ARM,KHM), (ARG,KOR), (AUT,GMB), (SYR,COM), (CZE,GBR), (DOM,USA), (CYP,LAO), (USA,LBR) 

Wie kann ich in das geeignete Format konvertieren?

Antwort

7

Als erstes können Sie nicht String als VertexId verwenden, so dass Sie Etiketten zuordnen müssen. Dann müssen wir eine Zuordnung von Label zu ID vorbereiten. Solange die Anzahl der eindeutigen Werte relativ klein ist, ist der einfachste Ansatz ein Broadcast-Variable zu erstellen:

val idMap = sc.broadcast(couples // -> Array[(Any, Any)] 
    // Make sure we use String not Any returned from Row.apply 
    // And convert to Seq so we can flatten results 
    .flatMap{case (x: String, y: String) => Seq(x, y)} // -> Array[String] 
    // Get different keys 
    .distinct // -> Array[String] 
    // Create (key, value) pairs 
    .zipWithIndex // -> Array[(String, Int)] 
    // Convert values to Long so we can use it as a VertexId 
    .map{case (k, v) => (k, v.toLong)} // -> Array[(String, Long)] 
    // Create map 
    .toMap) // -> Map[String,Long] 

Als nächstes können wir die oben verwenden Mapping auszuführen:

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples 
    .map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))} 
) 

Endlich haben wir eine bekommen graph:

val graph = Graph.fromEdgeTuples(edges, 1) 
+0

Waw, danke! Ich werde das als erstes morgen versuchen. Würde es Ihnen etwas ausmachen, die verschiedenen Methoden auszuarbeiten, die Sie verwendet haben? Ich bekomme die allgemeine Idee, aber es wäre sehr nützlich für mich in der Lage zu sein, jeden Schritt zu verstehen und nicht nur das zu kopieren –

+1

Sicher, ich habe einige Kommentare hinzugefügt und Informationen eingeben. – zero323

+0

Total arbeiten, vielen Dank für die Erklärung. Weißt du, ob das möglich ist, um einen Graphen mit Spark zu visualisieren, ich arbeite mit der Konsole, also gibt es keine grafische Schnittstelle, denke ich? –

Verwandte Themen