2016-10-19 1 views
1

ich folgende Problem haben: Ich muss für jede ID aus der Spalte A, um alle Kombinationen von Werten in der Spalte B finden und die Ergebnisse als DatenrahmenVerwenden von sc.parallelize innerhalb von map() oder einer anderen Lösung?

Im folgenden Beispiel wird der Eingangsdatenrahmen

 A  B  
0  5 10  
1  1 20  
2  1 15  
3  3 50  
4  5 14  
5  1 30  
6  1 15  
7  3 33  

zurückkehren ich brauche die folgende Ausgabe Datenrahmen zu erhalten (es ist für GraphX ​​\ GraphFrame)

 src dist  A 
0  10 14  5 
1  50 33  3 
2  20 15  1 
3  30 15  1 
4  20 30  1 

die einzige Lösung, die ich bis jetzt dachte, es ist:

df_result = df.drop_duplicates().\ 
       map(lambda (A,B):(A,[B])).\ 
       reduceByKey(lambda p, q: p + q).\ 
       map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)])) 

print df_result.take(3) 

Ausgabe: [(1, [(20,15), (30,20), (30,15)]), (5, [(10,14)]), (3, [(50 , 33)])]

Und hier stecke ich fest :(Wie man es zu dem Datenrahmen zurück, den ich brauche? Eine Idee war parallelize zu verwenden:

import spark_sc 

edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs])) 

Für spark_sc Ich habe andere Datei mit dem Namen spark_sc.py

def init(): 
    global sc 
    global sqlContext 

    sc = SparkContext(conf=conf, 
        appName="blablabla", 
        pyFiles=['my_file_with_code.py']) 

    sqlContext = SQLContext(sc) 

aber mein Code es fehlgeschlagen:

AttributeError: 'module' object has no attribute 'sc' 

wenn ich die spark_sc.sc() verwenden nicht in map() funktioniert es.

Irgendeine Idee, was ich im letzten Schritt vermisse? ist es überhaupt möglich, parallelize() zu verwenden? oder brauche ich eine ganz andere lösung? Danke!

Antwort

1

Sie müssen auf jeden Fall eine andere Lösung, die so einfach sein könnte, wie:

from pyspark.sql.functions import greatest, least, col 

df.alias("x").join(df.alias("y"), ["A"]).select(
    least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A" 
).where(col("src") != col("dst")).distinct() 

wo:

df.alias("x").join(df.alias("y"), ["A"]) 

Tisch A mit sich selbst verbindet,

least("x.B", "y.B").alias("src") 

und

greatest("x.B", "y.B") 

Wählen Sie Wert mit einer niedrigeren id als Quelle und höhere ID als Ziel. Schließlich:

where(col("src") != col("dst")) 

Tropfen Selbst Schleifen.

Im Allgemeinen ist es nicht möglich, SparkContext aus einer Aktion oder einer Transformation zu verwenden (nicht, dass es in Ihrem Fall sinnvoll wäre, dies zu tun).

Verwandte Themen