2016-04-07 15 views
0

(Verwendung von Apache Spark-1.6.0) Hallo,rdd.toDF() Änderungen schweben Keine

Ich habe ein SparseVector (die im Grunde von zwei numpy Arrays nämlich values und indices definiert ist. Ich möchte erhalten die höchsten Werte und ihre Indizes, es zu tun, die ich benutze:

r = df.map(lambda row: Row(**dict(row.asDict(), top=f(vec))))) 

, wo die f Funktion [ [sorted_indices], [sorted_values] ] die folgende Art und Weise zurück:

def f(v): 
    m, i = zip(*sorted(zip(v.values, v.indices), reverse=True)) 
    m = [ float(j) for j in m] 
    i = [ int(j) for j in i] 
    return [i, m] 

An diesem Punkt r ist ein pyspark.rdd.PipelinedRDD und ich kann überprüfen, ob meine Werte in Ordnung sind, zum Beispiel.

r.first().top[1] 

Das Problem kommt, wenn ich versuche, eine DataFrame zu erhalten mit:

df2 = r.toDF() 

Dann sind meine Werte nur None, das heißt

df2.first().top[1] # i.e. the highest values of the first Vector 

zeigt None.

So sieht es wirklich wie die Funktion zerstört meine Daten aus. Dies wäre ziemlich seltsam, wenn Spark nicht mit dem eingebauten Float-Typ umgehen kann.

Irgendwelche Ideen? thx

Antwort

2

Es funktioniert nicht, da die Typen nicht übereinstimmen. Wenn Sie sich die Typen ansehen, sehen Sie, dass top Spalte als array<array<bigint>> dargestellt wird, während Werte array<float> sein sollten. Ihre Funktion sollte ein Objekt sein, das in eine struct Spalte struct<array<bigint>, array<float>> konvertiert werden kann. Eine offensichtliche Wahl ist entweder ein tuple oder ein :

from pyspark.sql import Row 

def f(v): 
    m, i = zip(*sorted(zip(v.values, v.indices), reverse=True)) 
    m = [ float(j) for j in m] 
    i = [ int(j) for j in i] 
    return Row(indices=i, values=m) 

Auch wenn Vektor ist bereits in einem DataFrame ist besser, hier eine UDF zu verwenden:

from pyspark.sql.functions import udf, col 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("indices", ArrayType(IntegerType())), 
    StructField("values", ArrayType(DoubleType())) 
]) 

df.withColumn("top", udf(f, schema)(col("vec_column"))) 
+0

Schnell, präzise: beeindruckend, vielen Dank – pltrdy

Verwandte Themen