Skip to content
This repository was archived by the owner on Dec 17, 2019. It is now read-only.

Task leases (fixes #18) #28

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions xenserver/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ def abspath(*args):
# XenServer host
XENZEN_XENAPI_IGNORE_SSL = False

# The number of seconds a task lease will live before it expires
XENZEN_TASK_LEASE_SECONDS = 3600

try:
from local_settings import * # noqa: F401, F403
except ImportError:
Expand Down
51 changes: 51 additions & 0 deletions xenserver/task_lease.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
This module (ab)uses the database session store to provide task leases that
can be used to prevent a buildup of slow tasks.
"""

from __future__ import absolute_import

from datetime import timedelta

from celery.utils.log import get_task_logger
from django.conf import settings
from django.contrib.sessions.models import Session
from django.utils import timezone

logger = get_task_logger(__name__)


def _key(task, args):
return '::'.join(['lease', task, args])


def acquire(task, args, ttl=None):
"""
Attempt to acquire a task lease for the given task. This should be done by
the thing that queues the task for execution. Returns `True` is the lease
has been acquired, `False` if an existing active lease is found.
"""
key = _key(task, args)
if Session.objects.filter(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would get_or_create be simpler here? It returns a tuple of (object, created)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_or_create doesn't let me check the expire_date in the query, but it does require me to pass it as a default for the creation step. While this makes the creation of a new object easier (it happens as part of get_or_create), it makes handling an existing but expired object harder because I'd have to check and update expire_date as appropriate. It's about the same amount of code as doing it this way, but the logic for checking if we properly got the lease is split up and less readable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes.

session_key=key, expire_date__gt=timezone.now()).exists():
# There's an active lease, move on.
logger.info("Active lease found for %s::%s." % (task, args))
return False
if ttl is None:
ttl = settings.XENZEN_TASK_LEASE_SECONDS
exp = timezone.now() + timedelta(seconds=ttl)
lease = Session(session_key=key, session_data="", expire_date=exp)
lease.save()
return True


def release(task, args):
"""
Release the task lease for given task, if it exists. This should be done by
the task itself once it's finished.
"""
key = _key(task, args)
try:
Session.objects.get(session_key=key).delete()
except Session.DoesNotExist:
pass
126 changes: 126 additions & 0 deletions xenserver/tests/test_task_lease.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
Tests for task leases.
"""

from datetime import timedelta

from django.contrib.sessions.models import Session
from django.utils import timezone
import pytest

from xenserver.task_lease import _key, acquire, release


def session_object_exists(task, args, **filterkw):
return Session.objects.filter(
session_key=_key(task, args), **filterkw).exists()


def session_object_expires_in(task, args, ttl):
# Allow a couple of seconds either side because of variable test timings.
now = timezone.now()
range_start = now + timedelta(seconds=ttl - 2)
range_end = now + timedelta(seconds=ttl + 2)
return session_object_exists(
task, args, expire_date__gt=range_start, expire_date__lt=range_end)


def session_object_expired(task, args):
return session_object_exists(task, args, expire_date__lt=timezone.now())


def session_object_unexpired(task, args):
return session_object_exists(task, args, expire_date__gt=timezone.now())


@pytest.mark.django_db
class TestTaskLease(object):
def test_acquire_new_lease(self, settings):
"""
We can acquire a new lease for a task.
"""
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert not session_object_exists('some_task', 'new_lease_1')
assert acquire('some_task', 'new_lease_1')
assert session_object_unexpired('some_task', 'new_lease_1')

def test_acquire_existing_lease(self, settings):
"""
We can't acquire an existing lease for a task.
"""
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert acquire('some_task', 'existing_lease_1')
assert session_object_unexpired('some_task', 'existing_lease_1')
assert not acquire('some_task', 'existing_lease_1')

def test_acquire_expired_lease(self, settings):
"""
We can acquire an expired lease for a task.
"""
settings.XENZEN_TASK_LEASE_SECONDS = -1
assert acquire('some_task', 'expired_lease_1')
assert session_object_expired('some_task', 'expired_lease_1')
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert acquire('some_task', 'expired_lease_1')
assert session_object_unexpired('some_task', 'expired_lease_1')

def test_release_lease(self, settings):
"""
Releasing a lease deletes it.
"""
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert acquire('some_task', 're_lease_1')
assert session_object_unexpired('some_task', 're_lease_1')
release('some_task', 're_lease_1')
assert not session_object_exists('some_task', 're_lease_1')

def test_release_expired_lease(self, settings):
"""
Releasing an expired lease deletes it.
"""
settings.XENZEN_TASK_LEASE_SECONDS = -1
assert acquire('some_task', 'expired_lease_1')
assert session_object_expired('some_task', 'expired_lease_1')
release('some_task', 'expired_lease_1')
assert not session_object_exists('some_task', 'expired_lease_1')

def test_release_missing_lease(self, settings):
"""
Releasing a lease that does not exist does nothing.
"""
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert not session_object_exists('some_task', 'missing_lease_1')
release('some_task', 'missing_lease_1')
assert not session_object_exists('some_task', 'missing_lease_1')

def test_acquire_release_acquire(self, settings):
"""
We can acquire a lease, release it, then acquire it again.
"""
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert acquire('some_task', 'lease_1')
assert session_object_unexpired('some_task', 'lease_1')
release('some_task', 'lease_1')
assert not session_object_exists('some_task', 'lease_1')
assert acquire('some_task', 'lease_1')
assert session_object_unexpired('some_task', 'lease_1')

def test_default_ttl(self, settings):
"""
By default, a new lease has a TTL configured from settings.
"""
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert acquire('some_task', 'new_lease_1')
assert session_object_expires_in('some_task', 'new_lease_1', 10)
assert not session_object_expires_in('some_task', 'new_lease_1', 0)
assert not session_object_expires_in('some_task', 'new_lease_1', 20)

def test_custom_ttl(self, settings):
"""
A custom TTL can be provided if desired.
"""
settings.XENZEN_TASK_LEASE_SECONDS = 10
assert acquire('some_task', 'new_lease_1', 20)
assert session_object_expires_in('some_task', 'new_lease_1', 20)
assert not session_object_expires_in('some_task', 'new_lease_1', 10)
assert not session_object_expires_in('some_task', 'new_lease_1', 30)