2016-10-18 4 views
5

Ich habe einen Datenrahmen wie folgt aus:explodieren Array von Array- (Datenrahmen) pySpark

+-----+--------------------+ 
|index|    merged| 
+-----+--------------------+ 
| 0|[[2.5, 2.4], [3.5...| 
| 1|[[-1.0, -1.0], [-...| 
| 2|[[-1.0, -1.0], [-...| 
| 3|[[0.0, 0.0], [0.5...| 
| 4|[[0.5, 0.5], [1.0...| 
| 5|[[0.5, 0.5], [1.0...| 
| 6|[[-1.0, -1.0], [0...| 
| 7|[[0.0, 0.0], [0.5...| 
| 8|[[0.5, 0.5], [1.0...| 
+-----+--------------------+ 

Und ich will das fusionierte Spalte in

+-----+-------+-------+ 
|index|Column1|Column2| 
+-----+-------+-------+ 
| 0| 2.5| 2.4 | 
| 1| 3.5| 0.5| 
| 2| -1.0| -1.0| 
| 3| -1.0| -1.0| 
| 4| 0.0 | 0.0 | 
| 5| 0.5| 0.74| 
+-----+-------+-------+ 

jedes Tupel zu explodieren [[2.5, 2.4] , [3,5,0,5]] represente zwei Spalten, in dem Wissen, dass 2,5 und 3,5 in Spalte 1 gespeichert werden und (2,4,0,5) wird in zweiter Spalte gespeichert

Also habe ich das versucht

df= df.withColumn("merged", df["merged"].cast("array<array<float>>")) 
df= df.withColumn("merged",explode('merged')) 

dann werde ich ein UDF gelten andere DF

zu schaffen, aber ich kann nicht auf die Daten werfen oder anwenden explodieren, und ich erhielt den Fehler

pyspark.sql.utils.AnalysisException: u"cannot resolve 'cast(merged as array<array<float>)' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true) 

Ich habe auch versucht

df= df.withColumn("merged", df["merged"].cast("array<string>")) 

aber nichts funktioniert und wenn ich explodieren ohne Besetzung, erhalte ich

pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(merged)' due to data type mismatch: input to function explode should be array or map type, not StringType; 
+0

können Sie das Schema von df geben? Es sieht so aus, als wäre die Zusammenführung tatsächlich eine Zeichenfolge, nicht das, was Sie im Argument haben. Sie können 'split' verwenden, um eine Zeichenfolge durch ein Trennzeichen zu trennen. Es scheint auch, dass es Tippfehler in Ihrer Frage gibt: Ist der Index für explodierte Werte in Ihrem Beispiel des erwarteten Ergebnisses nicht gleich? Oder hast du das gegeben, was du wirklich willst? – Wilmerton

+0

Thx, ich lese meinen Code, und ich habe festgestellt, dass ich vergessen habe, Rückgabetyp ArrayType (ArrayType (FloatType())) in meine Lambda-Funktion (wer meine Spalten zusammenführen) – MrGildarts

+0

so ... Problem gelöst? – Wilmerton

Antwort

0

Sie könnten den folgenden Code versuchen:

from pyspark import SparkConf, SparkContext       
from pyspark.sql import SparkSession        

from pyspark.sql.types import FloatType, StringType, IntegerType 
from pyspark.sql.functions import udf, col       


def col1_calc(merged):            
    return merged[0][0]            

def col2_calc(merged):            
    return merged[0][1]            

if __name__ == '__main__':           
    spark = SparkSession \           
     .builder \             
     .appName("Python Spark SQL Hive integration example") \  
     .getOrCreate()            

    df = spark.createDataFrame([         
     (0, [[2.5,2.4],[3.5]]),          
     (1, [[-1.0,-1.0],[3.5]]),         
     (2, [[-1.0,-1.0],[3.5]]),         
    ], ["index", "merged"])           

    df.show()              

    column1_calc = udf(col1_calc, FloatType())      
    df = df.withColumn('Column1', column1_calc(df['merged']))  
    column2_calc = udf(col2_calc, FloatType())      
    df = df.withColumn('Column2', column2_calc(df['merged']))  

    df = df.select(['Column1', 'Column2', 'index'])     
    df.show()   

Ausgang:

+-------+-------+-----+ 
|Column1|Column2|index| 
+-------+-------+-----+ 
| 2.5| 2.4| 0| 
| -1.0| -1.0| 1| 
| -1.0| -1.0| 2| 
+-------+-------+-----+