2015-07-14 14 views
5

Ich habe einen Datenrahmen aus df Hive Tabelle geladen und es hat eine Zeitmarkenspalte, sagt ts, mit Zeichenfolge Formattyp dd-MMM-yy hh.mm.ss.MS a (umgewandelt in Python Datumzeit-Bibliothek, das ist %d-%b-%y %I.%M.%S.%f %p).Column Filterung in PySpark

Jetzt mag ich Zeilen aus dem Datenrahmen filtern, die aus den letzten fünf Minuten sind:

only_last_5_minutes = df.filter(
    datetime.strptime(df.ts, '%d-%b-%y %I.%M.%S.%f %p') > datetime.now() - timedelta(minutes=5) 
) 

Dies ist jedoch nicht funktioniert und ich erhalte diese Mitteilung

TypeError: strptime() argument 1 must be string, not Column 

Es ist wie ich aussieht habe falsche Anwendung der Spaltenoperation und es scheint mir, ich muss eine Lambda-Funktion erstellen, um jede Spalte zu filtern, die die gewünschte Bedingung erfüllt, aber ein Neuling für Python und Lambda-Ausdruck insbesondere bin, weiß ich nicht, wie ich meinen Filter erstellen richtig. Bitte beraten.

P.S. Ich bevorzuge es, meine Filter als native Python (oder SparkSQL) anstelle eines Filters innerhalb des Hive-SQL-Abfrageausdrucks 'WHERE' auszudrücken.

bevorzugt:

df = sqlContext.sql("SELECT * FROM my_table") 
df.filter(// filter here) 

nicht bevorzugt:

df = sqlContext.sql("SELECT * FROM my_table WHERE...") 

Antwort

16

Es ist möglich, benutzerdefinierte Funktion zu verwenden.

from datetime import datetime, timedelta 
from pyspark.sql.types import BooleanType, TimestampType 
from pyspark.sql.functions import udf, col 

def in_last_5_minutes(now): 
    def _in_last_5_minutes(then): 
     then_parsed = datetime.strptime(then, '%d-%b-%y %I.%M.%S.%f %p') 
     return then_parsed > now - timedelta(minutes=5) 
    return udf(_in_last_5_minutes, BooleanType()) 

einige Dummy-Daten verwenden:

df = sqlContext.createDataFrame([ 
    (1, '14-Jul-15 11.34.29.000000 AM'), 
    (2, '14-Jul-15 11.34.27.000000 AM'), 
    (3, '14-Jul-15 11.32.11.000000 AM'), 
    (4, '14-Jul-15 11.29.00.000000 AM'), 
    (5, '14-Jul-15 11.28.29.000000 AM') 
], ('id', 'datetime')) 

now = datetime(2015, 7, 14, 11, 35) 
df.where(in_last_5_minutes(now)(col("datetime"))).show() 

Und da wir nur drei Einträge erwartet erhalten:

+--+--------------------+ 
|id|   datetime| 
+--+--------------------+ 
| 1|14-Jul-15 11.34.2...| 
| 2|14-Jul-15 11.34.2...| 
| 3|14-Jul-15 11.32.1...| 
+--+--------------------+ 

Datetime-Zeichenfolge Parsen alle wieder vorbei ist ziemlich ineffizient, so dass Sie TimestampType Speicherung betrachten kann stattdessen.

def parse_dt(): 
    def _parse(dt): 
     return datetime.strptime(dt, '%d-%b-%y %I.%M.%S.%f %p') 
    return udf(_parse, TimestampType()) 

df_with_timestamp = df.withColumn("timestamp", parse_dt()(df.datetime)) 

def in_last_5_minutes(now): 
    def _in_last_5_minutes(then): 
     return then > now - timedelta(minutes=5) 
    return udf(_in_last_5_minutes, BooleanType()) 

df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp"))) 

und Ergebnis:

+--+--------------------+--------------------+ 
|id|   datetime|   timestamp| 
+--+--------------------+--------------------+ 
| 1|14-Jul-15 11.34.2...|2015-07-14 11:34:...| 
| 2|14-Jul-15 11.34.2...|2015-07-14 11:34:...| 
| 3|14-Jul-15 11.32.1...|2015-07-14 11:32:...| 
+--+--------------------+--------------------+ 

Schließlich ist es möglich, rohe SQL-Abfrage mit Zeitstempel zu verwenden:

query = """SELECT * FROM df 
    WHERE unix_timestamp(datetime, 'dd-MMM-yy HH.mm.ss.SSSSSS a') > {0} 
    """.format(time.mktime((now - timedelta(minutes=5)).timetuple())) 

sqlContext.sql(query) 

Wie oben, ist es effizienter wäre, wenn Datumsstrings zu analysieren.

Wenn Spalte ist bereits ein timestamp es möglich datetime Literale zu verwenden:

from pyspark.sql.functions import lit 

df_with_timestamp.where(
    df_with_timestamp.timestamp > lit(now - timedelta(minutes=5))) 

EDIT

Seit Spark-1.5 Sie Datumszeichenfolge analysieren kann, wie folgt:

from pyspark.sql.functions import from_unixtime, unix_timestamp 
from pyspark.sql.types import TimestampType 

df.select((from_unixtime(unix_timestamp(
    df.datetime, "yy-MMM-dd h.mm.ss.SSSSSS aa" 
))).cast(TimestampType()).alias("datetime"))