Skip to content

Commit

Permalink
chore: add docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
yashlamba committed May 31, 2024
1 parent 89ee655 commit 85693ab
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
11 changes: 11 additions & 0 deletions invenio_jobs/services/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@


class JobEntry(ScheduleEntry):
"""Entry for celery beat."""

job = None

def __init__(self, job, *args, **kwargs):
"""Initialise entry."""
self.job = job
super().__init__(*args, **kwargs)

@classmethod
def from_job(cls, job):
"""Create JobEntry from job."""
return cls(
job=job,
name=job.title,
Expand All @@ -38,6 +41,8 @@ def from_job(cls, job):


class RunScheduler(Scheduler):
"""Custom beat scheduler for runs."""

Entry = JobEntry
entries = {}

Expand All @@ -47,17 +52,21 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:

@property
def schedule(self):
"""Get currently scheduled entries."""
return self.entries

def setup_schedule(self):
"""Setup schedule."""
# TODO Check whether we need the celery backend task?
self.sync()

def reserve(self, entry):
"""Update entry to next run execution time."""
new_entry = self.schedule[entry.job.id] = next(entry)
return new_entry

def apply_entry(self, entry, producer=None):
"""Create and apply a JobEntry."""
with self.app.flask_app.app_context():
logger.info("Scheduler: Sending due task %s (%s)", entry.name, entry.task)
try:
Expand All @@ -80,6 +89,7 @@ def apply_entry(self, entry, producer=None):
logger.debug("%s sent.", entry.task)

def sync(self):
"""Sync Jobs from db to the scheduler."""
# TODO Should we also have a cleaup task for runs? "stale" run (status running, starttime > hour, Run pending for > 1 hr)
with self.app.flask_app.app_context():
jobs = Job.query.filter(Job.active == True).all()
Expand All @@ -88,6 +98,7 @@ def sync(self):
self.entries[job.id] = JobEntry.from_job(job)

def create_run(self, entry):
"""Create run from a JobEntry."""
job = Job.query.filter_by(id=entry.job.id).one()
run = Run(
job=job,
Expand Down
1 change: 1 addition & 0 deletions invenio_jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

# TODO 1. Move to service? 2. Don't use kwargs?
def update_run(run, **kwargs):
"""Method to update and commit run updates."""
if not run:
return
for kw, value in kwargs.items():
Expand Down

0 comments on commit 85693ab

Please sign in to comment.