Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oauth2_provider/management/commands/cleartokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from ...models import clear_expired


class Command(BaseCommand):
class Command(BaseCommand): # pragma: no cover
help = "Can be run as a cronjob or directly to clean out expired tokens"

def handle(self, *args, **options):
Expand Down
83 changes: 58 additions & 25 deletions tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

import pytest
from django.contrib.auth import get_user_model
from django.core.exceptions import ImproperlyConfigured, ValidationError
Expand Down Expand Up @@ -294,31 +296,62 @@ def test_str(self):
class TestClearExpired(BaseTestModels):
def setUp(self):
super().setUp()
# Insert two tokens on database.
# Insert many tokens, both expired and not, and grants.
now = timezone.now()
earlier = now - timedelta(seconds=100)
later = now + timedelta(seconds=100)
app = Application.objects.create(
name="test_app",
redirect_uris="http://localhost http://example.com http://example.org",
user=self.user,
client_type=Application.CLIENT_CONFIDENTIAL,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
)
AccessToken.objects.create(
token="555",
expires=timezone.now(),
scope=2,
application=app,
user=self.user,
created=timezone.now(),
updated=timezone.now(),

expired_access_tokens = AccessToken.objects.bulk_create(
AccessToken(token="expired AccessToken {}".format(i), expires=earlier) for i in range(100)
)
AccessToken.objects.create(
token="666",
expires=timezone.now(),
scope=2,
application=app,
user=self.user,
created=timezone.now(),
updated=timezone.now(),
current_access_tokens = AccessToken.objects.bulk_create(
AccessToken(token="current AccessToken {}".format(i), expires=later) for i in range(100)
)

RefreshToken.objects.bulk_create(
RefreshToken(
token="expired AT's refresh token {}".format(i),
application=app,
access_token=expired_access_tokens[i],
user=self.user,
)
for i in range(100, 2)
)
RefreshToken.objects.bulk_create(
RefreshToken(
token="current AT's refresh token {}".format(i),
application=app,
access_token=expired_access_tokens[i],
user=self.user,
)
for i in range(49, 100, 2)
)
Grant.objects.bulk_create(
Grant(
user=self.user,
code="old grant code {}".format(i),
application=app,
expires=expired_access_tokens[i],
redirect_uri="https://localhost/redirect",
)
for i in range(100)
)
Grant.objects.bulk_create(
Grant(
user=self.user,
code="new grant code {}".format(i),
application=app,
expires=current_access_tokens[i],
redirect_uri="https://localhost/redirect",
)
for i in range(100)
)

def test_clear_expired_tokens(self):
Expand All @@ -333,15 +366,15 @@ def test_clear_expired_tokens_incorect_timetype(self):
assert result == "ImproperlyConfigured"

def test_clear_expired_tokens_with_tokens(self):
self.client.login(username="test_user", password="123456")
self.oauth2_settings.REFRESH_TOKEN_EXPIRE_SECONDS = 0
ttokens = AccessToken.objects.count()
expiredt = AccessToken.objects.filter(expires__lte=timezone.now()).count()
assert ttokens == 2
assert expiredt == 2
self.oauth2_settings.CLEAR_EXPIRED_TOKENS_BATCH_SIZE = 10
self.oauth2_settings.CLEAR_EXPIRED_TOKENS_BATCH_INTERVAL = 0.0
assert AccessToken.objects.count() == 200
assert RefreshToken.objects.count() == 100
assert Grant.objects.count() == 200
clear_expired()
expiredt = AccessToken.objects.filter(expires__lte=timezone.now()).count()
assert expiredt == 0
assert AccessToken.objects.count() == 150
assert RefreshToken.objects.count() == 100
assert Grant.objects.count() == 100


@pytest.mark.django_db
Expand Down