2016-10-17 2 views
1

Ich schreibe ein Programm, das regelmäßig alte Daten aus einer RethinkDB-Datenbank in eine Datei ablegt und aus der Datenbank entfernt. Derzeit werden die Daten in einer einzigen Datei gespeichert, die unbegrenzt wächst. Ich möchte dies ändern, so dass die maximale Dateigröße beispielsweise 250 MB beträgt und das Programm beginnt, in eine neue Ausgabedatei zu schreiben, kurz bevor diese Größe überschritten wird.Wie JSON dump zu einem rotierenden Dateiobjekt

Es scheint wie Pythons RotingFileHandler-Klasse für loggers tut ungefähr was ich will; Ich bin mir jedoch nicht sicher, ob die Protokollierung auf ein JSON-Dump-Objekt oder nur auf Strings angewendet werden kann.

Ein anderer möglicher Ansatz wäre, Mike Penningtons RotatingFile-Klasse (eine Variante von) zu verwenden (siehe python: outfile to another text file if exceed certain file size).

Welcher dieser Ansätze ist wahrscheinlich der fruchtbarste?

Als Referenz mein aktuelles Programm ist wie folgt:

import os 
import sys 
import json 
import rethinkdb as r 
import pytz 
from datetime import datetime, timedelta 
import schedule 
import time 
import functools 
from iclib import RethinkDB 
import msgpack 

''' The purpose of the Controller is to periodically archive data from the "sensor_data" table so that it does not grow without limit.''' 

class Controller(RethinkDB): 
    def __init__(self, db_address=(os.environ['DB_ADDR'], int(os.environ['DB_PORT'])), db_name=os.environ['DB_NAME']): 
     super(Controller, self).__init__(db_address=db_address, db_name=db_name)        # Initialize the IperCronComponent with the default logger name (in this case, "Controller") 
     self.db_table = RethinkDB.SENSOR_DATA_TABLE     # The table name is "sensor_data" and is stored as a class variable in RethinkDBMixIn 

    def generate_archiving_query(self, retention_period=timedelta(days=3)): 
     expiry_time = r.now() - retention_period.total_seconds()  # Timestamp before which data is to be archived 

     if "timestamp" in r.table(self.db_table).index_list().run(self.db):  # If "timestamp" is a secondary index 
      beginning_of_time = r.time(1400, 1, 1, 'Z')     # The minimum time of a ReQL time object (i.e., the year 1400 in the UTC timezone) 
      data_to_archive = r.table(self.db_table).between(beginning_of_time, expiry_time, index="timestamp")   # Generate query using "between" (faster) 
     else: 
      data_to_archive = r.table(self.db_table).filter(r.row['timestamp'] < expiry_time)       # Generate the same query using "filter" (slower, but does not require "timestamp" to be a secondary index) 

     return data_to_archive 

    def archiving_job(self, data_to_archive=None, output_file="archived_sensor_data.json"): 
     if data_to_archive is None: 
      data_to_archive = self.generate_archiving_query()    # By default, the call the "generate_archiving_query" function to generate the query 
     old_data = data_to_archive.run(self.db, time_format="raw")  # Without time_format="raw" the output does not dump to JSON 
     with open(output_file, 'a') as f: 
      ids_to_delete = [] 
      for item in old_data: 
       print item 
       # msgpack.dump(item, f) 
       json.dump(item, f) 
       f.write('\n')            # Separate each document by a new line 
       ids_to_delete.append(item['id']) 

     r.table(self.db_table).get_all(r.args(ids_to_delete)).delete().run(self.db)  # Delete based on ID. It is preferred to delete the entire batch in a single operation rather than to delete them one by one in the for loop. 

def test_job_1(): 
    db_name = "ipercron" 
    table_name = "sensor_data" 
    port_offset = 1   # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line. 
    conn = r.connect("localhost", 28015 + port_offset) 
    r.db(db_name).table(table_name).delete().run(conn) 
    import rethinkdb_add_data 
    controller = Controller(db_address=("localhost", 28015+port_offset)) 
    archiving_job = functools.partial(controller.archiving_job, data_to_archive=controller.generate_archiving_query()) 
    return archiving_job 

if __name__ == "__main__": 

    archiving_job = test_job_1() 
    schedule.every(0.1).minutes.do(archiving_job) 

    while True: 
     schedule.run_pending() 

Es ist nicht vollständig ‚runnable‘ aus dem Teil gezeigt, aber der entscheidende Punkt ist, dass ich möchte die Linie

json.dump(item, f) 
ersetzen

mit einer ähnlichen Zeile, in der f ist ein rotierendes, und nicht fest, Dateiobjekt.

+1

Sie können [json.dumps] (https://docs.python.org/3/library/json.html#json.dumps) verwenden, um json als 'str' zu bekommen, das sollte mit' RotatingFileHandler' funktionieren. –

Antwort

0

Nach Stanislav Ivanov, benutzen ich json.dumps jedes RethinkDB Dokument in eine Zeichenfolge zu konvertieren und schrieb dies ein RotatingFileHandler:

import os 
import sys 
import json 
import rethinkdb as r 
import pytz 
from datetime import datetime, timedelta 
import schedule 
import time 
import functools 
from iclib import RethinkDB 
import msgpack 
import logging 
from logging.handlers import RotatingFileHandler 
from random_data_generator import RandomDataGenerator 

''' The purpose of the Controller is to periodically archive data from the "sensor_data" table so that it does not grow without limit.''' 

os.environ['DB_ADDR'] = 'localhost' 
os.environ['DB_PORT'] = '28015' 
os.environ['DB_NAME'] = 'ipercron' 

class Controller(RethinkDB): 
    def __init__(self, db_address=None, db_name=None): 
     if db_address is None: 
      db_address = (os.environ['DB_ADDR'], int(os.environ['DB_PORT']))   # The default host ("rethinkdb") and port (28015) are stored as environment variables 
     if db_name is None: 
      db_name = os.environ['DB_NAME']            # The default database is "ipercron" and is stored as an environment variable 
     super(Controller, self).__init__(db_address=db_address, db_name=db_name)  # Initialize the instance of the RethinkDB class. IperCronComponent will be initialized with its default logger name (in this case, "Controller") 
     self.db_name = db_name 
     self.db_table = RethinkDB.SENSOR_DATA_TABLE          # The table name is "sensor_data" and is stored as a class variable of RethinkDBMixIn 
     self.table = r.db(self.db_name).table(self.db_table) 
     self.archiving_logger = logging.getLogger("archiving_logger") 
     self.archiving_logger.setLevel(logging.DEBUG) 
     self.archiving_handler = RotatingFileHandler("archived_sensor_data.log", maxBytes=2000, backupCount=10) 
     self.archiving_logger.addHandler(self.archiving_handler) 

    def generate_archiving_query(self, retention_period=timedelta(days=3)): 
     expiry_time = r.now() - retention_period.total_seconds()  # Timestamp before which data is to be archived 

     if "timestamp" in self.table.index_list().run(self.db): 
      beginning_of_time = r.time(1400, 1, 1, 'Z')              # The minimum time of a ReQL time object (namely, the year 1400 in UTC) 
      data_to_archive = self.table.between(beginning_of_time, expiry_time, index="timestamp")   # Generate query using "between" (faster, requires "timestamp" to be a secondary index) 
     else: 
      data_to_archive = self.table.filter(r.row['timestamp'] < expiry_time)       # Generate query using "filter" (slower, but does not require "timestamp" to be a secondary index) 

     return data_to_archive 

    def archiving_job(self, data_to_archive=None): 
     if data_to_archive is None: 
      data_to_archive = self.generate_archiving_query()    # By default, the call the "generate_archiving_query" function to generate the query 
     old_data = data_to_archive.run(self.db, time_format="raw")  # Without time_format="raw" the output does not dump to JSON or msgpack 

     ids_to_delete = [] 
     for item in old_data: 
      print item 
      self.dump(item) 
      ids_to_delete.append(item['id']) 

     self.table.get_all(r.args(ids_to_delete)).delete().run(self.db)    # Delete based on ID. It is preferred to delete the entire batch in a single operation rather than to delete them one by one in the for-loop. 

    def dump(self, item, mode='json'): 
     if mode == 'json': 
      dump_string = json.dumps(item) 
     elif mode == 'msgpack': 
      dump_string = msgpack.packb(item) 
     self.archiving_logger.debug(dump_string) 


def populate_database(db_name, table_name, conn): 

    if db_name not in r.db_list().run(conn): 
     r.db_create(db_name).run(conn)       # Create the database if it does not yet exist 

    if table_name not in r.db(db_name).table_list().run(conn): 
     r.db(db_name).table_create(table_name).run(conn)  # Create the table if it does not yet exist 

    r.db(db_name).table(table_name).delete().run(conn)   # Empty the table to start with a clean slate 

    # Generate random data with timestamps uniformly distributed over the past 6 days 
    random_data_time_interval = timedelta(days=6) 
    start_random_data = datetime.utcnow().replace(tzinfo=pytz.utc) - random_data_time_interval 

    random_generator = RandomDataGenerator(seed=0) 
    packets = random_generator.packets(N=100, start=start_random_data) 
    # print packets 
    print "Adding data to the database..." 
    r.db(db_name).table(table_name).insert(packets).run(conn) 


if __name__ == "__main__": 
    db_name = "ipercron" 
    table_name = "sensor_data" 
    port_offset = 1   # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line. 
    host = "localhost" 
    port = 28015 + port_offset 
    conn = r.connect(host, port)  # RethinkDB connection object 

    populate_database(db_name, table_name, conn) 

    # import rethinkdb_add_data 
    controller = Controller(db_address=(host, port)) 
    archiving_job = functools.partial(controller.archiving_job, data_to_archive=controller.generate_archiving_query())  # This ensures that the query is only generated once. (This is sufficient since r.now() is re-evaluated every time a connection is made). 

    schedule.every(0.1).minutes.do(archiving_job) 

    while True: 
     schedule.run_pending() 

In diesem Zusammenhang ist die RethinkDB Klasse nicht wenig anders als definierte die Klassenvariable SENSOR_DATA_TABLE und die RethinkDB Verbindung, self.db = r.connect(self.address[0], self.address[1]). Dies wird zusammen zum Erzeugen von gefälschten Daten mit einem Modul laufen, random_data_generator.py:

import random 
import faker 
from datetime import datetime, timedelta 
import pytz 
import rethinkdb as r 

class RandomDataGenerator(object): 
    def __init__(self, seed=None): 
     self._seed = seed 
     self._random = random.Random() 
     self._random.seed(seed) 
     self.fake = faker.Faker() 
     self.fake.random.seed(seed) 

    def __getattr__(self, x): 
     return getattr(self._random, x) 

    def name(self): 
     return self.fake.name() 

    def datetime(self, start=None, end=None): 
     if start is None: 
      start = datetime(2000, 1, 1, tzinfo=pytz.utc) # Jan 1st 2000 
     if end is None: 
      end = datetime.utcnow().replace(tzinfo=pytz.utc) 

     if isinstance(end, datetime): 
      dt = end - start 
     elif isinstance(end, timedelta): 
      dt = end 
     assert isinstance(dt, timedelta) 

     random_dt = timedelta(microseconds=self._random.randrange(int(dt.total_seconds() * (10 ** 6)))) 
     return start + random_dt 

    def packets(self, N=1, start=None, end=None): 
     return [{'name': self.name(), 'timestamp': self.datetime(start=start, end=end)} for _ in range(N)] 

Wenn I controller laufen erzeugt er mehrere umgerollten Ausgabeprotokolle, die jeweils höchstens 2 kB groß wie erwartet:

enter image description here