2016-04-26 24 views
2

Ich bin neu in der Spark-Programmierung und habe ein Szenario, um einen Wert zuzuweisen, wenn eine Reihe von Werten in meiner Eingabe erscheint. Im Folgenden finden Sie einen SQL-Code, den ich verwenden würde, um meine Aufgabe zu erfüllen. Müssen Sie dasselbe in Spark tun.Case when Anweisung mit IN-Klausel in Pyspark

Sql Code:

SELECT CASE WHEN c.Number IN ('1121231', '31242323') THEN 1 
ELSE 2 END AS Test 
FROM Input c 

Ich bin mir bewusst, mit nur einer Bedingung when in Funken verwenden.

Input.select(when(Input.Number==1121231,1).otherwise(2).alias("Test")).show() 

Antwort

3

Ich nehme an, Sie arbeiten mit Spark DataFrames, nicht RDDs. Eine Sache zu beachten ist, dass Sie SQL-Abfragen ausführen können direkt auf einem Datenrahmen:

# register the DataFrame so we can refer to it in queries 
sqlContext.registerDataFrameAsTable(df, "df") 

# put your SQL query in a string 
query = """SELECT CASE WHEN 
    df.number IN ('1121231', '31242323') THEN 1 ELSE 2 END AS test 
    FROM df""" 

result = sqlContext.sql(query) 
result.show() 

Sie auch select durch die Schaffung eines user-defined function verwenden können, die Ihre Abfrage case-Anweisung nachahmt:

from pyspark.sql.types import * 
from pyspark.sql.functions import udf 

# need to pass inner function through udf() so it can operate on Columns 
# also need to specify return type 
column_in_list = udf(
    lambda column: 1 if column in ['1121231', '31242323'] else 2, 
    IntegerType() 
) 

# call function on column, name resulting column "transformed" 
result = df.select(column_in_list(df.number).alias("transformed")) 
result.show() 
+0

Vielen Dank für Ihre Antwort. Es funktionierte. Gibt es eine Möglichkeit, die Funktion when direkt auf dem Spark-Datenrahmen zu verwenden und eine Liste von Werten zu geben? – Sid

+2

Ich glaube, ich habe gefunden, was ich machen wollte. df.when (df.char.isin ('H', 'O', 'M'), 1) .anderes (0) – Sid

+0

@ user3258274 Das funktioniert auch. Sie können die Werteliste auch aus der Spalte extrahieren, indem Sie den DataFrame in eine rdd konvertieren und wie folgt sammeln: 'rdd = df.rdd; rdd.map (Lambda x: x.transform) .collect() '. –