Ich hatte ein ähnliches Problem, wo mein Scheduler-Prozess war ein uWSGI MULE Prozess und es gab eine separate App, wo ich neue Jobs hinzufügen wollte.
bei der BaseScheduler ‚s add_job()
Funktion Suchen:
with self._jobstores_lock:
if not self.running:
self._pending_jobs.append((job, jobstore, replace_existing))
self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, replace_existing, True)
Sie das Problem sehen: der Scheduler fügt Arbeitsplätze nur dann, wenn es bereits gestartet wird.
Die Lösung zum Glück ganz einfach ist, sollten wir unsere eigenen "Add-Job-only" Scheduler definieren:
class JobAddScheduler(BlockingScheduler):
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined,
coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default',
executor='default', replace_existing=False, **trigger_args):
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else(),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time
}
job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined)
job = Job(self, **job_kwargs)
# Add jobs to job store
with self._jobstores_lock:
self._real_add_job(job, jobstore, replace_existing, True)
return job
def start(self):
pass
def shutdown(self, wait=True):
pass
def _main_loop(self):
pass
def wakeup(self):
pass
Dann können wir Cron-Jobs sofort hinzufügen:
jobscheduler = JobAddScheduler()
jobscheduler.add_job(...)
Don vergiss nicht, den scheduler zu konfigurieren! In meinem Fall habe ich SQLAlchemy-MySQL-Backend für die Speicherung von Jobs:
jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:[email protected]/DATABASE'))
jobscheduler.configure(jobstores=jobstores)
ich über die anderen jobstores bin nicht sicher, aber nachdem ich einen neuen Job hinzugefügt, hatte ich die wakeup()
Funktion des separaten Scheduler-Prozess aufrufen um den Job "aktiv" zu machen. Das habe ich mit dem Signalsystem von uWSGI erreicht.
Ich bin ziemlich sicher, dass 'mit self._jobstores_lock' nicht wirklich das Richtige macht, da es in einem separaten Prozess ist. – scribu