2015-09-03 6 views
6

Ich habe eine einfache Anforderung. Ich betreibe Apscheduler als separaten Prozess. Ich habe ein anderes jobproducer-Skript, von dem ich einen Job dem Scheduler hinzufügen und es ausführen möchte.APScheduler wie Job außerhalb des Schedulers hinzufügen?

Dies ist mein Scheduler Code,

# appsched.py 
from apscheduler.schedulers.blocking import BlockingScheduler 
scheduler = BlockingScheduler() 
scheduler.start() 

Das ist mein Job Produzent Skript,

# jobproducer.py 
from appsched import scheduler 

def say_hello_job(): 
    print "Hello" 

scheduler.add_job(say_hello_job, 'interval', minutes=1) 

Unnötig zu sagen, dass dies nicht funktioniert. Gibt es eine Möglichkeit, dies zu erledigen, indem Sie vielleicht einen Jobstore verwenden? Wie teile ich einen Scheduler mit mehreren Job-Herstellern?

Antwort

1

Ich hatte ein ähnliches Problem, wo mein Scheduler-Prozess war ein uWSGI MULE Prozess und es gab eine separate App, wo ich neue Jobs hinzufügen wollte.

bei der BaseScheduler ‚s add_job() Funktion Suchen:

with self._jobstores_lock: 
if not self.running: 
    self._pending_jobs.append((job, jobstore, replace_existing)) 
    self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts') 
else: 
    self._real_add_job(job, jobstore, replace_existing, True) 

Sie das Problem sehen: der Scheduler fügt Arbeitsplätze nur dann, wenn es bereits gestartet wird.

Die Lösung zum Glück ganz einfach ist, sollten wir unsere eigenen "Add-Job-only" Scheduler definieren:

class JobAddScheduler(BlockingScheduler): 
    def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, 
       coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', 
       executor='default', replace_existing=False, **trigger_args): 

    job_kwargs = { 
     'trigger': self._create_trigger(trigger, trigger_args), 
     'executor': executor, 
     'func': func, 
     'args': tuple(args) if args is not None else(), 
     'kwargs': dict(kwargs) if kwargs is not None else {}, 
     'id': id, 
     'name': name, 
     'misfire_grace_time': misfire_grace_time, 
     'coalesce': coalesce, 
     'max_instances': max_instances, 
     'next_run_time': next_run_time 
    } 
    job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined) 
    job = Job(self, **job_kwargs) 

    # Add jobs to job store 
    with self._jobstores_lock: 
     self._real_add_job(job, jobstore, replace_existing, True) 

    return job 

    def start(self): 
    pass 

    def shutdown(self, wait=True): 
    pass 

    def _main_loop(self): 
    pass 

    def wakeup(self): 
    pass 

Dann können wir Cron-Jobs sofort hinzufügen:

jobscheduler = JobAddScheduler() 
jobscheduler.add_job(...) 

Don vergiss nicht, den scheduler zu konfigurieren! In meinem Fall habe ich SQLAlchemy-MySQL-Backend für die Speicherung von Jobs:

jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:[email protected]/DATABASE')) 
jobscheduler.configure(jobstores=jobstores) 

ich über die anderen jobstores bin nicht sicher, aber nachdem ich einen neuen Job hinzugefügt, hatte ich die wakeup() Funktion des separaten Scheduler-Prozess aufrufen um den Job "aktiv" zu machen. Das habe ich mit dem Signalsystem von uWSGI erreicht.

+0

Ich bin ziemlich sicher, dass 'mit self._jobstores_lock' nicht wirklich das Richtige macht, da es in einem separaten Prozess ist. – scribu

Verwandte Themen