2017-05-05 2 views
1

Ich habe eine RDD der Form RDD[(string, List(Tuple))], wie unten verwenden:kann nicht rdd.toDF(), aber spark.createDataFrame (RDD) Werke

[(u'C1589::HG02922', [(83779208, 2), (677873089, 0), ...] 

Beim Versuch, den folgenden Code ausführen zu wandeln es in einem Dataframe, spark.createDataFrame(rdd) funktioniert gut, aber rdd.toDF() schlägt fehl.

vector_df1 = spark.createDataFrame(vector_rdd) # Works fine. 
vector_df1.show() 
+--------------+--------------------+ 
|   _1|     _2| 
+--------------+--------------------+ 
|C1589::HG02922|[[83779208,2], [6...| 
|  HG00367|[[83779208,0], [6...| 
| C477::HG00731|[[83779208,0], [6...| 
|  HG00626|[[83779208,0], [6...| 
|  HG00622|[[83779208,0], [6...| 
        ... 
vector_df2 = vector_rdd.toDF() # Tosses the error. 

Der Fehler ausgelöst wird:

Traceback (most recent call last): 
    File "/tmp/7ff0f62d-d849-4884-960f-bb89b5f3dd80/ml_on_vds.py", line 47, in <module> 
    vector_df2 = vector_rdd.toDF().show() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 57, in toDF 
    File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1124, in __call__ 
    File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1094, in _build_args 
    File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 289, in get_command_part 
AttributeError: 'PipelinedRDD' object has no attribute '_get_object_id' 
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [7ff0f62d-d849-4884-960f-bb89b5f3dd80] entered state [ERROR] while waiting for [DONE]. 

Hat jemand ein Problem ähnlich wie diese zuvor begegnet? .toDF() ist nur ein einfacher Wrapper für createDataFrame(), also verstehe ich nicht, warum es scheitern würde. Ich habe zur Laufzeit überprüft, dass ich Spark 2.0.2 verwende.

# Imports  
from pyspark.sql import SparkSession 
from pyspark.sql.functions import udf, hash 
from pyspark.sql.types import * 
from pyspark.ml.clustering import KMeans 
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import StringIndexer 
from hail import * 

# SparkSession 
spark = (SparkSession.builder.appName("PopulationGenomics") 
     .config("spark.sql.files.openCostInBytes", "1099511627776") 
     .config("spark.sql.files.maxPartitionBytes", "1099511627776") 
     .config("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec") 
     .getOrCreate()) 

pro Anfrage, einige weitere der Code, der den Fehler erzeugt:

vector_rdd = (indexed_df.rdd.map(lambda r: (r[0], (r[3], r[2]))) 
       .groupByKey() 
       .mapValues(lambda l: Vectors.sparse((max_index + 1), list(l)))) 
vector_df = spark.createDataFrame(vector_rdd, ['s', 'features']) # Works 
vector_df1 = vector_rdd.toDF() 
vector_df1.show() # Fails 

indexed_df ist ein Datenrahmen des Schemas:

StructType(List(StructField(s,StringType,true),StructField(variant_hash,IntegerType,false),StructField(call,IntegerType,true),StructField(index,DoubleType,true))) 

Und es sieht aus wie ...

+--------------+------------+----+-----+ 
|    s|variant_hash|call|index| 
+--------------+------------+----+-----+ 
|C1046::HG02024| -60010252| 0|225.0| 
|C1046::HG02025| -60010252| 1|225.0| 
|C1046::HG02026| -60010252| 0|225.0| 
|C1047::HG00731| -60010252| 0|225.0| 
|C1047::HG00732| -60010252| 1|225.0| 
|C1047::HG00733| -60010252| 0|225.0| 
|C1048::HG02024| -60010252| 0|225.0| 
|C1048::HG02025| -60010252| 1|225.0| 
|C1048::HG02026| -60010252| 0|225.0| 
|C1049::HG00731| -60010252| 0|225.0| 
|C1049::HG00732| -60010252| 1|225.0| 
|C1049::HG00733| -60010252| 0|225.0| 
|C1050::HG03006| -60010252| 0|225.0| 
|C1051::HG03642| -60010252| 0|225.0| 
|C1589::HG02922| -60010252| 2|225.0| 
|C1589::HG03006| -60010252| 0|225.0| 
|C1589::HG03052| -60010252| 2|225.0| 
|C1589::HG03642| -60010252| 0|225.0| 
|C1589::NA12878| -60010252| 1|225.0| 
|C1589::NA19017| -60010252| 1|225.0| 
+--------------+------------+----+-----+ 
+0

Können Sie ein wenig größeres Beispiel RDD geben, so dass ich Testbeispiel erstellen kann, um herauszufinden? – titipata

Antwort

1

toDF Methode wird unter SparkSession in und SQLContex in 1.x-Version ausgeführt. So

spark = SparkSession(sc) 
hasattr(rdd, "toDF") 

Wenn Sie in scala sind, müssen Sie import spark.implicits._

hoffe, das hilft inport!

+0

Ich habe hinzugefügt, wie ich meine SparkSession an den unteren Rand meines Skripts initialisieren. Sollte mir das nicht Zugriff auf die 'toDF()' Methode geben? – mongolol

+0

Wenn Sie auf Scala sind, müssen Sie spark.implicits._ –

+0

importieren Ich verwende Python. Ich habe meine Importe aufgenommen. – mongolol

Verwandte Themen