2017-05-05 4 views
0

Ich habe ein DataFrame der Form:Funken 2.0.2 PySpark Fehler collect_list Importieren

+--------------+------------+----+ 
|    s|variant_hash|call| 
+--------------+------------+----+ 
|C1046::HG02024| 83779208| 0| 
|C1046::HG02025| 83779208| 1| 
|C1046::HG02026| 83779208| 0| 
|C1047::HG00731| 83779208| 0| 
|C1047::HG00732| 83779208| 1 
       ... 

Ich hatte gehofft, collect_list() zu nutzen, sie in zu verwandeln:

+--------------------+-------------------------------------+ 
|     s|      feature_vector| 
+--------------------+-------------------------------------+ 
|  C1046::HG02024|[(83779208, 0), (68471259, 2)...]| 
+--------------------+-------------------------------------+ 

Wo der Merkmalsvektor Spalte ist eine Liste von Tupeln des Formulars (variant_hash, call). Ich hatte geplant, auf groupBy und agg(collect_list()) nutzt dieses Ergebnis zu erreichen, aber erhalte den folgenden Fehler:

Traceback (most recent call last): 
    File "/tmp/ba6a891c-529b-4c75-a76f-8ab20f4377ba/ml_on_vds.py", line 43, in <module> 
    vector_df = svc_df.groupBy('s').agg(func.collect_list(('variant_hash', 'call'))) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 39, in _ 
    File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 
    File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 323, in get_return_value 
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace: 
py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339) 
     at py4j.Gateway.invoke(Gateway.java:274) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:214) 
     at java.lang.Thread.run(Thread.java:745) 

Der Code unten meine Importe zeigt. Ich dachte nicht, dass es notwendig ist, HiveContext und enableHiveSupport in 2.0.2 zu importieren, aber ich hatte gehofft, dies zu tun, würde das Problem lösen. Leider kein Glück. Hat jemand irgendwelche Empfehlungen, um dieses Importproblem zu lösen?

from pyspark.sql import SparkSession 
from pyspark import SparkConf, SparkContext, HiveContext 
from pyspark.sql.functions import udf, hash, collect_list 
from pyspark.sql.types import * 
from hail import * 
# Initialize the SparkSession 
spark = (SparkSession.builder.appName("PopulationGenomics") 
     .config("spark.sql.files.openCostInBytes", "1099511627776") 
     .config("spark.sql.files.maxPartitionBytes", "1099511627776") 
     .config("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec") 
     .enableHiveSupport() 
     .getOrCreate()) 

Ich versuche, diesen Code auf einem gcloud dataproc-Cluster auszuführen.

Antwort

1

so wirft es Fehler in dieser Zeile -

vector_df = svc_df.groupBy('s').agg(func.collect_list(('variant_hash', 'call'))) 

Sie collect_list als func.collect_list fordern aber Sie importieren Funktionen wie -

from pyspark.sql.functions import udf, hash, collect_list

können Sie Importfunktionen gemeint wie 'func' wie

from pyspark.sql import functions as func,

+0

Schätzen Sie die Antwort Pushkr. Ging voraus und machte diese Korrektur, aber immer noch die gleiche Fehlermeldung erhalten. Wenn ich meine IDE ansehe (die nicht mit meinem Spark-Cluster verbunden ist, aber die Includes sollten übereinstimmen), scheint es nicht, dass 'collect_list' in' pyspark.sql.functions' enthalten ist ... – mongolol

+0

Können Sie einen einfachen Test in Spark durchführen? CLI, um zu überprüfen, ob Sie irgendwelche SQL-Funktionen importieren und ausführen können? – Pushkr

+0

konnte "Hash" und "UDF" aus der Funktionsbibliothek importieren und ausführen. – mongolol

Verwandte Themen