2015-07-14 8 views
9

Was ist die richtige Methode zum Filtern von Datenrahmen nach Zeitstempel Feld?datetime Bereich Filter in PySpark SQL

ich unterschiedliche Datumsformate und Formen der Filterung versucht haben, nichts hilft: entweder pyspark liefert 0 Objekte oder einen Fehler wirft, dass es nicht Datetime-Format

Hier versteht ist, was ich so weit gekommen:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 

from django.utils import timezone 
from django.conf import settings 

from myapp.models import Collection 

sc = SparkContext("local", "DjangoApp") 
sqlc = SQLContext(sc) 
url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default'] 
sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection') 

Bereich für die Zeitstempel-Feld:

system_tz = timezone.pytz.timezone(settings.TIME_ZONE) 
date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz) 
date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz) 

Versuch 1

date_filter = "my_col >= '%s' AND my_col <= '%s'" % (
    date_from.isoformat(), date_to.isoformat() 
) 
sf = sf.filter(date_filter) 
sf.count() 

Out[12]: 0 

Versuch 2

sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to) 
sf.count() 

--------------------------------------------------------------------------- 
Py4JJavaError: An error occurred while calling o63.count. 
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 4.0 failed 1 times, most recent failure: 
Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException: 
ERROR: syntax error at or near "18" 
# 
# ups.. JDBC doesn't understand 24h time format?? 

Versuch 3

sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \ 
    (date_from.isoformat(), date_to.isoformat()) 
    ) 
--------------------------------------------------------------------------- 
Py4JJavaError: An error occurred while calling o97.count. 
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 17.0 failed 1 times, most recent failure: 
Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException: 
ERROR: syntax error at or near "18" 

die Daten in der Tabelle existieren, aber:

django_filters = { 
    'my_col__gte': date_from, 
    'my_col__lte': date_to 
    } 
Collection.objects.filter(**django_filters).count() 

Out[17]: 1093436 

Oder diese Weise

django_range_filter = {'my_col__range': (date_from, date_to)} 
Collection.objects.filter(**django_range_filter).count() 

Out[19]: 1093436 

Antwort

6

Ermöglicht Datenrahmen übernehmen sieht wie folgt aus:

sf = sqlContext.createDataFrame([ 
    [datetime.datetime(2013, 6, 29, 11, 34, 29)], 
    [datetime.datetime(2015, 7, 14, 11, 34, 27)], 
    [datetime.datetime(2012, 3, 10, 19, 00, 11)], 
    [datetime.datetime(2016, 2, 8, 12, 21)], 
    [datetime.datetime(2014, 4, 4, 11, 28, 29)] 
], ('my_col',)) 

mit Schema:

root 
|-- my_col: timestamp (nullable = true) 

und Sie möchten Daten in einem folgenden Bereich finden:

import datetime, time 
dates = ("2013-01-01 00:00:00", "2015-07-01 00:00:00") 

timestamps = (
    time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple()) 
    for s in dates) 

Es ist möglich, Abfrage unter Verwendung von Zeitstempeln, die entweder auf einer Treiberseite berechnet wurden:

q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps) 
sf.where(q1).show() 

oder mit unix_timestamp Funktion:

q2 = """CAST(my_col AS INT) 
     BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss') 
     AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates) 

sf.where(q2).show() 

Es ist auch möglich UDF in einer ähnlichen Art und Weise zu verwenden, die ich in einem another answer beschrieben.

Wenn Sie rohe SQL verwenden ist es möglich, verschiedene Elemente des Zeitstempels mit year, date usw.

sqlContext.sql("""SELECT * FROM sf 
    WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show() 

EDIT zu extrahieren:

Da Spark-1.5 können Sie integrierte Funktionen nutzen :

dates = ("2013-01-01", "2015-07-01") 
date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates] 

sf.where((sf.my_col > date_from) & (sf.my_col < date_to)) 
+0

die erste Lösung funktioniert, vielen Dank! – funkifunki

+0

Die zweite Lösung schlägt mit Fehler fehl: Py4JJavaError: Beim Aufruf von o32.filter ist ein Fehler aufgetreten. : java.util.NoSuchElementException: Schlüssel nicht gefunden: unix_timestamp es ist kein Problem, obwohl wahrscheinlich durch meine spezifische Einrichtung verursacht – funkifunki

+1

Es ist möglich. Soweit es mich betrifft, ist 'unix_timestamp' Teil von Hive UDFs. – zero323

0

Wie wäre es mit so etwas:

import pyspark.sql.functions as func 

df = sf.select(func.to_date(sf.my_col).alias("time")) 
sf = df.filter(sf.time > date_from).filter(sf.time < date_to) 
+0

sf = sf.select .... sollte df = sf.select sein ... –