2015-06-17 12 views
14

Ich habe ein DataFrame, die in etwa so aussehen. Ich möchte am Tag des date_time Feldes operieren.PySpark eine Spalte zu einem Datenrahmen aus einer TimeStampType-Spalte hinzufügen

root 
|-- host: string (nullable = true) 
|-- user_id: string (nullable = true) 
|-- date_time: timestamp (nullable = true) 

Ich habe versucht, eine Spalte hinzuzufügen, um den Tag zu extrahieren. Bis jetzt sind meine Versuche gescheitert.

df = df.withColumn("day", df.date_time.getField("day")) 

org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type TimestampType; 

Dies hat auch nicht

df = df.withColumn("day", df.select("date_time").map(lambda row: row.date_time.day)) 

AttributeError: 'PipelinedRDD' object has no attribute 'alias' 

Jede Idee, wie dies getan werden kann?

Antwort

30

können Sie einfach verwenden map:

df.rdd.map(lambda row: 
    Row(row.__fields__ + ["day"])(row + (row.date_time.day,)) 
) 

Eine weitere Option ist eine Funktion, und führen Sie SQL-Abfrage registrieren:

sqlContext.registerFunction("day", lambda x: x.day) 
sqlContext.registerDataFrameAsTable(df, "df") 
sqlContext.sql("SELECT *, day(date_time) as day FROM df") 

Schließlich können Sie UDF wie folgt definieren:

from pyspark.sql.functions import udf 
from pyspark.sql.types import IntegerType 

day = udf(lambda date_time: date_time.day, IntegerType()) 
df.withColumn("day", day(df.date_time)) 

BEARBEITEN:

Eigentlich, wenn Sie Raw SQL verwenden day Funktion ist bereits definiert (zumindest in Spark 1.4), so dass Sie UDF Registrierung weglassen können. Es bietet auch eine Reihe von verschiedenen Datumsverarbeitungsfunktionen einschließlich:

Es ist auch möglich, wie einfache Zeitpunkt Ausdrücke zu verwenden:

current_timestamp() - expr("INTERVAL 1 HOUR") 

bedeutet es Ihnen, ohne Übergabe von Daten an Python relativ komplexe Abfragen erstellen können. Zum Beispiel:

df = sc.parallelize([ 
    (1, "2016-01-06 00:04:21"), 
    (2, "2016-05-01 12:20:00"), 
    (3, "2016-08-06 00:04:21") 
]).toDF(["id", "ts_"]) 

now = lit("2016-06-01 00:00:00").cast("timestamp") 
five_months_ago = now - expr("INTERVAL 5 MONTHS") 

(df 
    # Cast string to timestamp 
    # For Spark 1.5 use cast("double").cast("timestamp") 
    .withColumn("ts", unix_timestamp("ts_").cast("timestamp")) 
    # Find all events in the last five months 
    .where(col("ts").between(five_months_ago, now)) 
    # Find first Sunday after the event 
    .withColumn("next_sunday", next_day(col("ts"), "Sun")) 
    # Compute difference in days 
    .withColumn("diff", datediff(col("ts"), col("next_sunday")))) 
+0

Es gibt viele Spalten und ich möchte nur noch eine hinzufügen. Die Zuordnungsmethode ist möglicherweise zu umständlich, um alle vorhandenen Spalten aufzulisten. Ich werde versuchen, den Register-Funktion Weg. Danke. –

+0

Sie müssen nicht alle vorhandenen Spalten in der Karte auflisten. Es ist möglich, die Zeile einfach neu zu erstellen. Ich habe die Antwort aktualisiert, um das zu reflektieren. Bei diesem Ansatz gibt es jedoch zwei Probleme. Sie gibt RDD von Rows und nicht von DataFrame zurück und ist wahrscheinlich langsamer als ein optimiertes SQL. – zero323

+1

Definition von UDF scheint zu den saubersten Weg zu sein, den ich bisher gefunden habe. Zu der Antwort hinzugefügt. – zero323

Verwandte Themen