können Sie einfach verwenden map
:
df.rdd.map(lambda row:
Row(row.__fields__ + ["day"])(row + (row.date_time.day,))
)
Eine weitere Option ist eine Funktion, und führen Sie SQL-Abfrage registrieren:
sqlContext.registerFunction("day", lambda x: x.day)
sqlContext.registerDataFrameAsTable(df, "df")
sqlContext.sql("SELECT *, day(date_time) as day FROM df")
Schließlich können Sie UDF wie folgt definieren:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
day = udf(lambda date_time: date_time.day, IntegerType())
df.withColumn("day", day(df.date_time))
BEARBEITEN:
Eigentlich, wenn Sie Raw SQL verwenden day
Funktion ist bereits definiert (zumindest in Spark 1.4), so dass Sie UDF Registrierung weglassen können. Es bietet auch eine Reihe von verschiedenen Datumsverarbeitungsfunktionen einschließlich:
Es ist auch möglich, wie einfache Zeitpunkt Ausdrücke zu verwenden:
current_timestamp() - expr("INTERVAL 1 HOUR")
bedeutet es Ihnen, ohne Übergabe von Daten an Python relativ komplexe Abfragen erstellen können. Zum Beispiel:
df = sc.parallelize([
(1, "2016-01-06 00:04:21"),
(2, "2016-05-01 12:20:00"),
(3, "2016-08-06 00:04:21")
]).toDF(["id", "ts_"])
now = lit("2016-06-01 00:00:00").cast("timestamp")
five_months_ago = now - expr("INTERVAL 5 MONTHS")
(df
# Cast string to timestamp
# For Spark 1.5 use cast("double").cast("timestamp")
.withColumn("ts", unix_timestamp("ts_").cast("timestamp"))
# Find all events in the last five months
.where(col("ts").between(five_months_ago, now))
# Find first Sunday after the event
.withColumn("next_sunday", next_day(col("ts"), "Sun"))
# Compute difference in days
.withColumn("diff", datediff(col("ts"), col("next_sunday"))))
Es gibt viele Spalten und ich möchte nur noch eine hinzufügen. Die Zuordnungsmethode ist möglicherweise zu umständlich, um alle vorhandenen Spalten aufzulisten. Ich werde versuchen, den Register-Funktion Weg. Danke. –
Sie müssen nicht alle vorhandenen Spalten in der Karte auflisten. Es ist möglich, die Zeile einfach neu zu erstellen. Ich habe die Antwort aktualisiert, um das zu reflektieren. Bei diesem Ansatz gibt es jedoch zwei Probleme. Sie gibt RDD von Rows und nicht von DataFrame zurück und ist wahrscheinlich langsamer als ein optimiertes SQL. – zero323
Definition von UDF scheint zu den saubersten Weg zu sein, den ich bisher gefunden habe. Zu der Antwort hinzugefügt. – zero323