2016-06-15 8 views
3

ich einen Datensatz wie dieses:withColumn erlaubt mir nicht max() Funktion zu verwenden, um eine neue Spalte zu erzeugen

a = sc.parallelize([[1,2,3],[0,2,1],[9,8,7]]).toDF(["one", "two", "three"]) 

Ich möchte einen Datensatz haben, die eine neue Spalte hinzufügt, die auf den größten Wert gleich in den anderen drei Spalten. Die Ausgabe würde wie folgt aussehen:

+----+----+-----+-------+ 
|one |two |three|max_col| 
+----+----+-----+-------+ 
| 1| 2| 3|  3| 
| 0| 2| 1|  2| 
| 9| 8| 7|  9| 
+----+----+-----+-------+ 

Ich dachte, ich withColumn, wie so verwenden würde:

b = a.withColumn("max_col", max(a["one"], a["two"], a["three"])) 

aber dies ergibt den Fehler

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/opt/spark152/python/pyspark/sql/column.py", line 418, in __nonzero__ 
    raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', " 
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions. 

Odd. Gibt max eine bool zurück? Nicht nach the documentation on max. Okay. Seltsam.

Ich finde es seltsam, dass dies funktioniert:

b = a.withColumn("max_col", a["one"] + a["two"] + a["three"])) 

Und die Tatsache, dass es funktioniert, lässt mich denken, noch stärker, dass max in gewisser Weise verhält Ich verstehe nicht.

Ich versuchte auch b = a.withColumn("max_col", max([a["one"], a["two"], a["three"]])), die in den drei Spalten als eine Liste statt 3 septe Elemente übergibt. Dies ergibt denselben Fehler wie oben.

Antwort

4

Eigentlich, was Sie hier brauchen, ist greatest nicht max:

from pyspark.sql.functions import greatest 

a.withColumn("max_col", greatest(a["one"], a["two"], a["three"])) 

Und nur der Vollständigkeit halber können Sie least verwenden das Minimum zu finden:

from pyspark.sql.functions import least 

a.withColumn("min_col", least(a["one"], a["two"], a["three"])) 

In Bezug auf den Fehler, den Sie sehen, es ist ganz einfach. max hängt von den reichen Vergleichen ab. Wenn Sie zwei Spalten vergleichen erhalten Sie einen Column:

type(col("a") < col("b") 
## pyspark.sql.column.Column 

PySpark explizit Spalten booleans Umwandlung verbietet (können Sie Column.__nonzero__ Quelle überprüfen), weil es einfach sinnlos. Es ist nur ein logischer Ausdruck, der im Treiberkontext nicht ausgewertet werden kann.

1

Wenn ich es richtig verstehe, stimmen Sie max einer Spalte und max einer Zeile nicht überein. In der Tat .withColumn benötigt eine Spalte, was Sie brauchen, ist eine Zeilenoperation.

b=a.map(lambda row: (row.one, row.two, row.three, max(row))) 

b ist dann ein rdd, können Sie wandeln es

zu Datenrahmen
b.toDF('one','two','three','max') 
0

Sie nicht max von Python verwenden können, da es nicht die erwartete pyspark.sql.Column zurückgibt. Ein Beispiel für pyspark Dataframe-Funktionen ist array, die eine Liste von einigen Säulen baut, notieren Sie die Rückkehr:

http://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#array

Um das zu erreichen, was Sie brauchen, könnten Sie eine benutzerdefinierte-Funktion wie (ungetestet)

schreiben
from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

def my_max(*cols): 
    return max(cols) 

udf_my_max = udf(my_max, IntegerType) 

df.withColumn('max_col', udf_my_max(a.columns)) 
+0

Leider hat das nicht für mich funktioniert. Sicher, es ist ein kleines Problem/Bug, da du keine Chance zum Testen bekommen hast. Ich würde lieber DataFrames anstelle von RDDs verwenden, also wenn Sie eine funktionierende Lösung finden, würde ich es schätzen! –

Verwandte Themen