ich folgende Problem haben: Ich muss für jede ID aus der Spalte A, um alle Kombinationen von Werten in der Spalte B finden und die Ergebnisse als DatenrahmenVerwenden von sc.parallelize innerhalb von map() oder einer anderen Lösung?
Im folgenden Beispiel wird der Eingangsdatenrahmen
A B
0 5 10
1 1 20
2 1 15
3 3 50
4 5 14
5 1 30
6 1 15
7 3 33
zurückkehren ich brauche die folgende Ausgabe Datenrahmen zu erhalten (es ist für GraphX \ GraphFrame)
src dist A
0 10 14 5
1 50 33 3
2 20 15 1
3 30 15 1
4 20 30 1
die einzige Lösung, die ich bis jetzt dachte, es ist:
df_result = df.drop_duplicates().\
map(lambda (A,B):(A,[B])).\
reduceByKey(lambda p, q: p + q).\
map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))
print df_result.take(3)
Ausgabe: [(1, [(20,15), (30,20), (30,15)]), (5, [(10,14)]), (3, [(50 , 33)])]
Und hier stecke ich fest :(Wie man es zu dem Datenrahmen zurück, den ich brauche? Eine Idee war parallelize zu verwenden:
import spark_sc
edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))
Für spark_sc
Ich habe andere Datei mit dem Namen spark_sc.py
def init():
global sc
global sqlContext
sc = SparkContext(conf=conf,
appName="blablabla",
pyFiles=['my_file_with_code.py'])
sqlContext = SQLContext(sc)
aber mein Code es fehlgeschlagen:
AttributeError: 'module' object has no attribute 'sc'
wenn ich die spark_sc.sc()
verwenden nicht in map()
funktioniert es.
Irgendeine Idee, was ich im letzten Schritt vermisse? ist es überhaupt möglich, parallelize()
zu verwenden? oder brauche ich eine ganz andere lösung? Danke!