Ich habe ein DataFrame
der Form:Funken 2.0.2 PySpark Fehler collect_list Importieren
+--------------+------------+----+
| s|variant_hash|call|
+--------------+------------+----+
|C1046::HG02024| 83779208| 0|
|C1046::HG02025| 83779208| 1|
|C1046::HG02026| 83779208| 0|
|C1047::HG00731| 83779208| 0|
|C1047::HG00732| 83779208| 1
...
Ich hatte gehofft, collect_list()
zu nutzen, sie in zu verwandeln:
+--------------------+-------------------------------------+
| s| feature_vector|
+--------------------+-------------------------------------+
| C1046::HG02024|[(83779208, 0), (68471259, 2)...]|
+--------------------+-------------------------------------+
Wo der Merkmalsvektor Spalte ist eine Liste von Tupeln des Formulars (variant_hash, call)
. Ich hatte geplant, auf groupBy
und agg(collect_list())
nutzt dieses Ergebnis zu erreichen, aber erhalte den folgenden Fehler:
Traceback (most recent call last):
File "/tmp/ba6a891c-529b-4c75-a76f-8ab20f4377ba/ml_on_vds.py", line 43, in <module>
vector_df = svc_df.groupBy('s').agg(func.collect_list(('variant_hash', 'call')))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 39, in _
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 323, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace:
py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Der Code unten meine Importe zeigt. Ich dachte nicht, dass es notwendig ist, HiveContext
und enableHiveSupport
in 2.0.2 zu importieren, aber ich hatte gehofft, dies zu tun, würde das Problem lösen. Leider kein Glück. Hat jemand irgendwelche Empfehlungen, um dieses Importproblem zu lösen?
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext, HiveContext
from pyspark.sql.functions import udf, hash, collect_list
from pyspark.sql.types import *
from hail import *
# Initialize the SparkSession
spark = (SparkSession.builder.appName("PopulationGenomics")
.config("spark.sql.files.openCostInBytes", "1099511627776")
.config("spark.sql.files.maxPartitionBytes", "1099511627776")
.config("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec")
.enableHiveSupport()
.getOrCreate())
Ich versuche, diesen Code auf einem gcloud dataproc-Cluster auszuführen.
Schätzen Sie die Antwort Pushkr. Ging voraus und machte diese Korrektur, aber immer noch die gleiche Fehlermeldung erhalten. Wenn ich meine IDE ansehe (die nicht mit meinem Spark-Cluster verbunden ist, aber die Includes sollten übereinstimmen), scheint es nicht, dass 'collect_list' in' pyspark.sql.functions' enthalten ist ... – mongolol
Können Sie einen einfachen Test in Spark durchführen? CLI, um zu überprüfen, ob Sie irgendwelche SQL-Funktionen importieren und ausführen können? – Pushkr
konnte "Hash" und "UDF" aus der Funktionsbibliothek importieren und ausführen. – mongolol