2017-05-16 6 views
1

Ich verwende PySpark mit MongoDB und möchte meine Datenbank mit einer Pipeline mit einem Filter von Datum abfragen. In Mongo meine Abfrage sieht wie folgt aus:PySpark MongoDB Abfrage Datum

db.collection.aggregate([{$match:{"creation":{$lte:new Date("Jan 1, 2016")}}},{$sort:{"creation":1}}]) 

Aber ich weiß nicht, wie die gleiche Sache in Python zu tun. Zum Beispiel habe ich versucht:

pipeline = [{'$match': {'creation': {'$lte': datetime.datetime(2016, 1, 1, 0, 0)}}}, {'$sort': {'creation': 1}}] 
df = context.read.format("com.mongodb.spark.sql").options(pipeline=pipeline).load() 

und ich habe einen Fehler: org.bson.json.JsonParseException: JSON-Reader wurde ein Wert erwartet, fand aber ' Datetime '.

(Ich will alles in der Pipeline tun und nicht in einer SQL-Abfrage)

Antwort

0

Sie MongoDB extended JSON nutzen können das Datum angeben. Zum Beispiel:

pipeline = [{'$match':{'creation':{'$lte': {'$date': "2016-01-01T00:00:00Z" }}}}] 
df_pipeline = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
          .option("pipeline", pipeline).load() 
df_pipeline.first()