Ich schreibe ein Programm, das regelmäßig alte Daten aus einer RethinkDB-Datenbank in eine Datei ablegt und aus der Datenbank entfernt. Derzeit werden die Daten in einer einzigen Datei gespeichert, die unbegrenzt wächst. Ich möchte dies ändern, so dass die maximale Dateigröße beispielsweise 250 MB beträgt und das Programm beginnt, in eine neue Ausgabedatei zu schreiben, kurz bevor diese Größe überschritten wird.Wie JSON dump zu einem rotierenden Dateiobjekt
Es scheint wie Pythons RotingFileHandler-Klasse für loggers tut ungefähr was ich will; Ich bin mir jedoch nicht sicher, ob die Protokollierung auf ein JSON-Dump-Objekt oder nur auf Strings angewendet werden kann.
Ein anderer möglicher Ansatz wäre, Mike Penningtons RotatingFile-Klasse (eine Variante von) zu verwenden (siehe python: outfile to another text file if exceed certain file size).
Welcher dieser Ansätze ist wahrscheinlich der fruchtbarste?
Als Referenz mein aktuelles Programm ist wie folgt:
import os
import sys
import json
import rethinkdb as r
import pytz
from datetime import datetime, timedelta
import schedule
import time
import functools
from iclib import RethinkDB
import msgpack
''' The purpose of the Controller is to periodically archive data from the "sensor_data" table so that it does not grow without limit.'''
class Controller(RethinkDB):
def __init__(self, db_address=(os.environ['DB_ADDR'], int(os.environ['DB_PORT'])), db_name=os.environ['DB_NAME']):
super(Controller, self).__init__(db_address=db_address, db_name=db_name) # Initialize the IperCronComponent with the default logger name (in this case, "Controller")
self.db_table = RethinkDB.SENSOR_DATA_TABLE # The table name is "sensor_data" and is stored as a class variable in RethinkDBMixIn
def generate_archiving_query(self, retention_period=timedelta(days=3)):
expiry_time = r.now() - retention_period.total_seconds() # Timestamp before which data is to be archived
if "timestamp" in r.table(self.db_table).index_list().run(self.db): # If "timestamp" is a secondary index
beginning_of_time = r.time(1400, 1, 1, 'Z') # The minimum time of a ReQL time object (i.e., the year 1400 in the UTC timezone)
data_to_archive = r.table(self.db_table).between(beginning_of_time, expiry_time, index="timestamp") # Generate query using "between" (faster)
else:
data_to_archive = r.table(self.db_table).filter(r.row['timestamp'] < expiry_time) # Generate the same query using "filter" (slower, but does not require "timestamp" to be a secondary index)
return data_to_archive
def archiving_job(self, data_to_archive=None, output_file="archived_sensor_data.json"):
if data_to_archive is None:
data_to_archive = self.generate_archiving_query() # By default, the call the "generate_archiving_query" function to generate the query
old_data = data_to_archive.run(self.db, time_format="raw") # Without time_format="raw" the output does not dump to JSON
with open(output_file, 'a') as f:
ids_to_delete = []
for item in old_data:
print item
# msgpack.dump(item, f)
json.dump(item, f)
f.write('\n') # Separate each document by a new line
ids_to_delete.append(item['id'])
r.table(self.db_table).get_all(r.args(ids_to_delete)).delete().run(self.db) # Delete based on ID. It is preferred to delete the entire batch in a single operation rather than to delete them one by one in the for loop.
def test_job_1():
db_name = "ipercron"
table_name = "sensor_data"
port_offset = 1 # To avoid interference of this testing program with the main program, all ports are initialized at an offset of 1 from the default ports using "rethinkdb --port_offset 1" at the command line.
conn = r.connect("localhost", 28015 + port_offset)
r.db(db_name).table(table_name).delete().run(conn)
import rethinkdb_add_data
controller = Controller(db_address=("localhost", 28015+port_offset))
archiving_job = functools.partial(controller.archiving_job, data_to_archive=controller.generate_archiving_query())
return archiving_job
if __name__ == "__main__":
archiving_job = test_job_1()
schedule.every(0.1).minutes.do(archiving_job)
while True:
schedule.run_pending()
Es ist nicht vollständig ‚runnable‘ aus dem Teil gezeigt, aber der entscheidende Punkt ist, dass ich möchte die Linie
json.dump(item, f)
ersetzen
mit einer ähnlichen Zeile, in der f
ist ein rotierendes, und nicht fest, Dateiobjekt.
Sie können [json.dumps] (https://docs.python.org/3/library/json.html#json.dumps) verwenden, um json als 'str' zu bekommen, das sollte mit' RotatingFileHandler' funktionieren. –