2015-05-22 12 views
11

Ich versuche eine Pub/Sub auf Mongo's Oplog Sammlung zu implementieren. Bereitgestellter Code funktioniert, ohnetailable = True Option gesetzt (es wird alle Dokumente zurückgeben), aber sobald ich es an den Cursor übergeben wird es nichts abholen (auch nach Änderungen in der gewünschten Sammlung).Pymongo - tailing oplog

Ich bin mit pymongo 2.7.2

while(True): 
    with self.database.connect() as connection: 
     cursor = connection['local'].oplog.rs.find(
      {'ns': self.collection}, 
      await_data = True, 
      tailable = True 
     ) 

     cursor.add_option(_QUERY_OPTIONS['oplog_replay']) 

     while cursor.alive: 
      try: 
       doc = cursor.next() 

       print doc 
      except(AutoReconnect, StopIteration): 
       time.sleep(1) 

ich einige Lösungen ausprobiert haben, aber es immer noch nicht so schnell wie tailable Option hinzugefügt wird. Oplog ist richtig eingerichtet, da mongo-oplog Modul von Nodejs wie erwartet funktioniert.

Mögliche duplicate (keine akzeptierte Antwort)

Antwort

4

Sie müssen sich auf der ‚ts‘ oplog Feld abfragen und verfolgen die letzte Dokument, das Sie (über den Zeitstempel) für den Fall, lesen Sie den Cursor neu erstellt werden muss. Hier ein Beispiel, das Sie an Ihre Bedürfnisse anpassen können:

import time 

import pymongo 

c = pymongo.MongoClient() 
# Uncomment this for master/slave. 
# oplog = c.local.oplog['$main'] 
# Uncomment this for replica sets. 
oplog = c.local.oplog.rs 
first = oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1).next() 
ts = first['ts'] 

while True: 
    cursor = oplog.find({'ts': {'$gt': ts}}, tailable=True, await_data=True) 
    # oplogReplay flag - not exposed in the public API 
    cursor.add_option(8) 
    while cursor.alive: 
     for doc in cursor: 
      ts = doc['ts'] 
      # Do something... 
     time.sleep(1)