2016-05-10 12 views
0

Ist es möglich, eine HiveContext-Datenframe-Spalte in pyspark mit einer komplexen Funktion zu aktualisieren, die in einer UDF nicht möglich ist?Pyspark-Datenframe-Spalte mit komplexer Funktion aktualisieren

Ich habe einen Datenrahmen mit vielen Spalten, von denen 2 Spalten Zeitstempel und Daten genannt werden. Ich muss den Zeitstempel aus der JSON-Zeichenfolge in Daten abrufen und die Timestamp-Spalte aktualisieren, wenn der Zeitstempel in Daten bestimmte Kriterien erfüllt. Ich weiß, dass diese Datenrahmen unveränderlich sind, aber ist es möglich, irgendwie einen neuen Datenrahmen zu bauen, der alle Spalten des alten Datenrahmens behält, aber die timstamp-Spalte aktualisiert?

-Code veranschaulicht, was ich tun möchte:

def updateTime(row): 
    import json 

    THRESHOLD_TIME = 60 * 30 
    client_timestamp = json.loads(row['data']) 
    client_timestamp = float(client_timestamp['timestamp']) 
    server_timestamp = float(row['timestamp']) 
    if server_timestamp - client_timestamp <= THRESHOLD_TIME: 
     new_row = ..... # copy contents of row 
     new_row['timestamp'] = client_timestamp 
     return new_row 
    else: 
     return row 

df = df.map(updateTime) 

Ich dachte an die Abbildung der Zeileninhalte in ein Tupel und dann wieder auf einen Datenrahmen mit .toDF Umwandlung(), aber ich kann ein nicht gefunden Möglichkeit, den Zeileninhalt in ein Tupel zu kopieren und dann die Spaltennamen zurückzubekommen.

+0

Was ist, wenn Sie eine 'UDF' verwenden? –

+0

Vielleicht kann dieser Artikel helfen: http://www.sparkututorials.net/using-sparksql-udfs-to-create-date-times-in-spark-1.5 –

+0

Sorry, ich meinte UDF anstelle von HDF ... Typo .. – SK2

Antwort

0

Wenn Sie Ihre updateTime Funktion anpassen einen Zeitstempel als Parameter und gibt die neue verarbeitete Zeitstempel zu erhalten, können Sie eine UDF erstellen und direkt auf die Spaltendatenrahmen verwenden:

from pyspark.sql.functions import * 
from pyspark.sql.types import TimestampType 

myUDF = udf(updateTime, TimestampType()) 
df = df.withColumn("timestamp", myUDF(col("timestamp")) 

Allerdings habe ich in Ihrem Fall denke, es ist ein wenig komplexer:

from pyspark.sql.functions import * 
from pyspark.sql.types import TimestampType 

myUDF = udf(getClientTime, TimestampType()) 
client_timestamp = myUDF(col("data")) 
server_timestamp = col("timestamp") 
condition = server_timestamp.cast("float") - client_timestamp.cast("float") <= THRESHOLD_TIME  

newCol = when(condition, client_timestamp).otherwise(server_timestamp) 
newDF = df.withColumn("new_timestamp", newCol) 

Mit diesem zweiten Ansatz wird die Funktion getClientTime einen Wert aus der Spalte data empfängt und die Client-Zeitstempel für diesen Wert zurückgibt. Anschließend können Sie eine neue Spalte (client_timestamp) erstellen, die diese Informationen enthält. Schließlich können Sie when verwenden, um die neue Spalte bedingt auf der Grundlage der Werte der Spalte server_timestamp und der neu erstellten Spalte client_timestamp zu erstellen.

Referenz:

+1

Danke! Die Methode funktioniert mit einigen kleineren Änderungen. Sieht aus, als hätte ich nicht verstanden, wie UDFs früher funktioniert haben. Bearbeitete myUDF, um stattdessen StringType() zurückzugeben, und verwendete df ['column'] anstelle von col ('column') – SK2