2017-12-16 2 views
0

Bitte helfen Sie mir, eine angepasste Partitioniererfunktion in Python für Spark zu schreiben.Benutzerdefinierter Partitionierer konnte lokale Variable in Spark nicht tragen

Ich habe eine Datei, die die Zuordnung zwischen dem Eintrag Datenschlüssel und Partitions-ID zu sagen, ich laden Sie es zuerst in eine dict Variable „data_to_partition_map“ in main.py

dann in Funken

sc.parallelize(input_lines).partitionBy(numPartitions=xx, partitionFunc=lambda x : data_to_partition_map[x]) 

Wenn ich betreibe diesen Code lokal, es gibt Fehler:

Traceback (most recent call last): 
    File "/home/weiyu/workspace/dice/process_platform_spark/process/roadCompile/main.py", line 111, in <module> 
    .partitionBy(numPartitions=tile_partitioner.num_partitions, partitionFunc=lambda x: tile_tasks_in_partitions[x]) 
    File "/home/weiyu/app/odps-spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1785, in partitionBy 
    File "/home/weiyu/app/odps-spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1392, in __call__ 
    File "/home/weiyu/app/odps-spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 289, in get_command_part 
AttributeError: 'function' object has no attribute '_get_object_id' 

es scheint Funke kann nicht Lambda-Objekt serialisiert, hat jemand eine Idee zu diesem Fehler haben und mir sagen, wie man es beheben ? Vielen Dank

Antwort

1

Haben Sie versucht, eine Funktion zu verwenden, die einfach das dict-Element zurückgibt und als Teilfunktion übergeben?

def return_key(x): 
     return your_dict[x] 

Übergeben Sie es als partitionFunction.