2016-03-20 4 views
5

My Datenrahmen enthält ein Feld, das ein Datum ist, und es erscheint in dem String-Format, wie in BeispielPySpark: Filtern einen Datenrahmens von Datumsfeld in Bereich, in dem Datum ist Zeichenfolge

'2015-07-02T11:22:21.050Z' 

ich den Datenrahmen filtern, muß sich auf das Datum, um nur die Datensätze in der letzten Woche zu erhalten. Also, ich habe versucht, eine Karte Ansatz, wo ich die Zeichenfolge Datums Datetime-Objekte mit strptime transformiert:

def map_to_datetime(row): 
    format_string = '%Y-%m-%dT%H:%M:%S.%fZ' 
    row.date = datetime.strptime(row.date, format_string) 

df = df.map(map_to_datetime) 

und dann würde ich einen Filter als

df.filter(lambda row: 
    row.date >= (datetime.today() - timedelta(days=7))) 

ich das Mapping Arbeits erhalten verwalten anzuwenden aber die Filter nicht mit

TypeError: condition should be string or Column

gibt es eine Möglichkeit, eine Filterung in eine Art und Weise zu verwenden, das funktioniert, oder soll ich den Ansatz ändern und wie?

Antwort

5

Sie diese Python-Code Arbeiter Seite ohne Verwendung und die Umstellung auf RDDs lösen können. Vor allem, da Sie ISO 8601-String verwenden, können Sie Ihre Daten direkt an Datum oder Zeitstempel gegossen werden:

from pyspark.sql.functions import col 

df = sc.parallelize([ 
    ('2015-07-02T11:22:21.050Z',), 
    ('2016-03-20T21:00:00.000Z',) 
]).toDF(("d_str",)) 

df_casted = df.select("*", 
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts")) 

Dieses Hin- und Rück zwischen JVM und Python sparen. Es gibt auch einige Möglichkeiten, wie Sie sich dem zweiten Teil nähern können. Nur Datum:

from pyspark.sql.functions import current_date, datediff, unix_timestamp 

df_casted.where(datediff(current_date(), col("dt")) < 7) 

Zeitstempel:

def days(i: int) -> int: 
    return 60 * 60 * 24 * i 

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7)) 

Sie können auch einen Blick auf current_timestamp nehmen und date_sub

Hinweis: Ich möchte vermeiden, DataFrame.map verwenden. Es ist besser, stattdessen DataFrame.rdd.map zu verwenden. Es wird Ihnen beim Umschalten auf 2.0+

etwas Arbeit ersparen
5

dachte ich einen Ausweg mein Problem zu lösen, indem die SparkSQL API mit Daten als Strings und tun dies gehalten:

last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d') 

new_df = df.where(df.date >= last_week) 
Verwandte Themen