2015-12-11 2 views
5

Ich benutze PySpark. Ich habe eine Spalte ('dt') in einem Datenrahmen ('canon_evt'), dass dies ein Zeitstempel ist. Ich versuche, Sekunden von einem DateTime-Wert zu entfernen. Es wird ursprünglich aus Parkett als String eingelesen. Ich versuche es dann überPySpark 1.5 So kürzen Timestamp auf die nächste Minute von Sekunden

canon_evt = canon_evt.withColumn('dt',to_date(canon_evt.dt)) 
canon_evt= canon_evt.withColumn('dt',canon_evt.dt.astype('Timestamp')) 

zu zu konvertieren Dann würde Ich mag die Sekunden entfernen. Ich habe 'trunc', 'date_format' versucht oder versucht, Teile wie unten zusammen zu verketten. Ich denke, es erfordert eine Art Map- und Lambda-Kombination, aber ich bin mir nicht sicher, ob Timestamp ein geeignetes Format ist und ob es möglich ist, Sekunden loszuwerden.

canon_evt = canon_evt.withColumn('dyt',year('dt') + '-' + month('dt') + 
    '-' + dayofmonth('dt') + ' ' + hour('dt') + ':' + minute('dt')) 

[Row(dt=datetime.datetime(2015, 9, 16, 0, 0),dyt=None)] 
+0

Könnten Sie posten, wie es aussieht, wenn Sie von Parkett lesen? – WoodChopper

+0

[Zeile (dt = '2015-09-16 05:39:46']], Reihe (dt = '2015-09-16 05:40:46')] – PR102012

+0

'zero323', danke für die super schnelle Hilfe! – PR102012

Antwort

6

Umstellung auf Unix-Zeitstempel und Grundrechenarten sollte der Trick:

from pyspark.sql import Row 
from pyspark.sql.functions import col, unix_timestamp, round 

df = sc.parallelize([ 
    Row(dt='1970-01-01 00:00:00'), 
    Row(dt='2015-09-16 05:39:46'), 
    Row(dt='2015-09-16 05:40:46'), 
    Row(dt='2016-03-05 02:00:10'), 
]).toDF() 


## unix_timestamp converts string to Unix timestamp (bigint/long) 
## in seconds. Divide by 60, round, multiply by 60 and cast 
## should work just fine. 
## 
dt_truncated = ((round(unix_timestamp(col("dt"))/60) * 60) 
    .cast("timestamp")) 

df.withColumn("dt_truncated", dt_truncated).show(10, False) 
## +-------------------+---------------------+ 
## |dt     |dt_truncated   | 
## +-------------------+---------------------+ 
## |1970-01-01 00:00:00|1970-01-01 00:00:00.0| 
## |2015-09-16 05:39:46|2015-09-16 05:40:00.0| 
## |2015-09-16 05:40:46|2015-09-16 05:41:00.0| 
## |2016-03-05 02:00:10|2016-03-05 02:00:00.0| 
## +-------------------+---------------------+ 
+0

Wenn ich nur Zugriff auf Spark 1.3 und daher keine 'unix_timestamp'-Funktion hätte, wäre es in Spark SQL oder DataFrame immer noch einfach durchzuführen? – PR102012

+0

Verwenden Sie einfach [Hive UDF] (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions) – zero323

1

Ich denke zero323 die beste Antwort hat. Es ist irgendwie ärgerlich, dass Spark dies nicht nativ unterstützt, da es einfach zu implementieren ist. Für die Nachwelt, hier ist eine Funktion, die ich verwende:

def trunc(date, format): 
    """Wraps spark's trunc fuction to support day, minute, and hour""" 
    import re 
    import pyspark.sql.functions as func 

    # Ghetto hack to get the column name from Column object or string: 
    try: 
     colname = re.match(r"Column<.?'(.*)'>", str(date)).groups()[0] 
    except AttributeError: 
     colname = date 

    alias = "trunc(%s, %s)" % (colname, format) 

    if format in ('year', 'YYYY', 'yy', 'month', 'mon', 'mm'): 
     return func.trunc(date, format).alias(alias) 
    elif format in ('day', 'DD'): 
     return func.date_sub(date, 0).alias(alias) 
    elif format in ('min',): 
     return ((func.round(func.unix_timestamp(date)/60) * 60).cast("timestamp")).alias(alias) 
    elif format in ('hour',): 
     return ((func.round(func.unix_timestamp(date)/3600) * 3600).cast("timestamp")).alias(alias) 
+0

Vielen Dank! Deine Antwort gab mir genau das, was ich finden wollte. – Paul

Verwandte Themen