Ist es möglich, eine HiveContext-Datenframe-Spalte in pyspark mit einer komplexen Funktion zu aktualisieren, die in einer UDF nicht möglich ist?Pyspark-Datenframe-Spalte mit komplexer Funktion aktualisieren
Ich habe einen Datenrahmen mit vielen Spalten, von denen 2 Spalten Zeitstempel und Daten genannt werden. Ich muss den Zeitstempel aus der JSON-Zeichenfolge in Daten abrufen und die Timestamp-Spalte aktualisieren, wenn der Zeitstempel in Daten bestimmte Kriterien erfüllt. Ich weiß, dass diese Datenrahmen unveränderlich sind, aber ist es möglich, irgendwie einen neuen Datenrahmen zu bauen, der alle Spalten des alten Datenrahmens behält, aber die timstamp-Spalte aktualisiert?
-Code veranschaulicht, was ich tun möchte:
def updateTime(row):
import json
THRESHOLD_TIME = 60 * 30
client_timestamp = json.loads(row['data'])
client_timestamp = float(client_timestamp['timestamp'])
server_timestamp = float(row['timestamp'])
if server_timestamp - client_timestamp <= THRESHOLD_TIME:
new_row = ..... # copy contents of row
new_row['timestamp'] = client_timestamp
return new_row
else:
return row
df = df.map(updateTime)
Ich dachte an die Abbildung der Zeileninhalte in ein Tupel und dann wieder auf einen Datenrahmen mit .toDF Umwandlung(), aber ich kann ein nicht gefunden Möglichkeit, den Zeileninhalt in ein Tupel zu kopieren und dann die Spaltennamen zurückzubekommen.
Was ist, wenn Sie eine 'UDF' verwenden? –
Vielleicht kann dieser Artikel helfen: http://www.sparkututorials.net/using-sparksql-udfs-to-create-date-times-in-spark-1.5 –
Sorry, ich meinte UDF anstelle von HDF ... Typo .. – SK2