diff --git a/invenio_jobs/services/scheduler.py b/invenio_jobs/services/scheduler.py index 4932a71..6739591 100644 --- a/invenio_jobs/services/scheduler.py +++ b/invenio_jobs/services/scheduler.py @@ -17,15 +17,18 @@ class JobEntry(ScheduleEntry): + """Entry for celery beat.""" job = None def __init__(self, job, *args, **kwargs): + """Initialise entry.""" self.job = job super().__init__(*args, **kwargs) @classmethod def from_job(cls, job): + """Create JobEntry from job.""" return cls( job=job, name=job.title, @@ -38,6 +41,8 @@ def from_job(cls, job): class RunScheduler(Scheduler): + """Custom beat scheduler for runs.""" + Entry = JobEntry entries = {} @@ -47,17 +52,21 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: @property def schedule(self): + """Get currently scheduled entries.""" return self.entries def setup_schedule(self): + """Setup schedule.""" # TODO Check whether we need the celery backend task? self.sync() def reserve(self, entry): + """Update entry to next run execution time.""" new_entry = self.schedule[entry.job.id] = next(entry) return new_entry def apply_entry(self, entry, producer=None): + """Create and apply a JobEntry.""" with self.app.flask_app.app_context(): logger.info("Scheduler: Sending due task %s (%s)", entry.name, entry.task) try: @@ -80,6 +89,7 @@ def apply_entry(self, entry, producer=None): logger.debug("%s sent.", entry.task) def sync(self): + """Sync Jobs from db to the scheduler.""" # TODO Should we also have a cleaup task for runs? "stale" run (status running, starttime > hour, Run pending for > 1 hr) with self.app.flask_app.app_context(): jobs = Job.query.filter(Job.active == True).all() @@ -88,6 +98,7 @@ def sync(self): self.entries[job.id] = JobEntry.from_job(job) def create_run(self, entry): + """Create run from a JobEntry.""" job = Job.query.filter_by(id=entry.job.id).one() run = Run( job=job, diff --git a/invenio_jobs/tasks.py b/invenio_jobs/tasks.py index 657e311..ec8f3ca 100644 --- a/invenio_jobs/tasks.py +++ b/invenio_jobs/tasks.py @@ -16,6 +16,7 @@ # TODO 1. Move to service? 2. Don't use kwargs? def update_run(run, **kwargs): + """Method to update and commit run updates.""" if not run: return for kw, value in kwargs.items():