Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tests #316

Merged
merged 11 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
redis-version: [4, 5, 6, 7]
redis-py-version: [3.5.0]
redis-py-version: [4]

steps:
- uses: actions/checkout@v3
Expand All @@ -28,13 +28,14 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Start Redis
uses: supercharge/redis-github-action@1.5.0
uses: supercharge/redis-github-action@1.8.0
with:
redis-version: ${{ matrix.redis-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install setuptools
pip install redis==${{ matrix.redis-py-version }}
python setup.py install

Expand Down
10 changes: 6 additions & 4 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from itertools import repeat

from rq.exceptions import NoSuchJobError
from rq.job import Job
from rq.job import Job, JobStatus
from rq.queue import Queue
from rq.utils import backend_class, import_attribute

Expand All @@ -30,8 +30,9 @@ class Scheduler(object):

def __init__(self, queue_name='default', queue=None, interval=60, connection=None,
job_class=None, queue_class=None, name=None):
from rq.connections import resolve_connection
self.connection = resolve_connection(connection)
if connection is None:
raise ValueError('`connection` argument is required')
self.connection = connection
self._queue = queue
if self._queue is None:
self.queue_name = queue_name
Expand Down Expand Up @@ -143,7 +144,8 @@ def _create_job(self, func, args=None, kwargs=None, commit=True,
func, args=args, connection=self.connection,
kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id,
description=description, timeout=timeout, meta=meta,
depends_on=depends_on,on_success=on_success,on_failure=on_failure,
depends_on=depends_on, on_success=on_success, on_failure=on_failure,
status=JobStatus.SCHEDULED
)
if queue_name:
job.origin = queue_name
Expand Down
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
rqscheduler = rq_scheduler.scripts.rqscheduler:main
''',
package_data={'': ['README.rst']},
install_requires=['crontab>=0.23.0', 'rq>=0.13', 'python-dateutil', 'freezegun'],
install_requires=['crontab>=0.23.0', 'rq>=2', 'python-dateutil', 'freezegun'],
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)
18 changes: 2 additions & 16 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import unittest
from redis import StrictRedis
from rq import push_connection, pop_connection


def find_empty_redis_database():
Expand All @@ -27,12 +26,7 @@ class RQTestCase(unittest.TestCase):

@classmethod
def setUpClass(cls):
# Set up connection to Redis
testconn = find_empty_redis_database()
push_connection(testconn)

# Store the connection (for sanity checking)
cls.testconn = testconn
cls.testconn = find_empty_redis_database()

def setUp(self):
# Flush beforewards (we like our hygiene)
Expand Down Expand Up @@ -64,12 +58,4 @@ def assertIsInstance(self, obj, cls, msg=None):

@classmethod
def tearDownClass(cls):

# Pop the connection to Redis
testconn = pop_connection()
assert testconn == cls.testconn, 'Wow, something really nasty ' \
'happened to the Redis connection stack. Check your setup.'

# for python < 2.7, which doesn't have setUpClass
if not hasattr(unittest.TestCase, 'setUpClass'):
RQTestCase.setUpClass()
pass
36 changes: 2 additions & 34 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
from multiprocessing import Process

from redis import Redis
from rq import Connection, get_current_job, get_current_connection, Queue
from rq import get_current_job, Queue
from rq.decorators import job
from rq.compat import text_type
from rq.worker import HerokuWorker, Worker


Expand All @@ -39,7 +38,7 @@ async def say_hello_async(name=None):

def say_hello_unicode(name=None):
"""A job with a single argument and a return value."""
return text_type(say_hello(name)) # noqa
return say_hello(name)


def do_nothing():
Expand Down Expand Up @@ -67,17 +66,6 @@ def some_calculation(x, y, z=1):
return x * y / z


def rpush(key, value, append_worker_name=False, sleep=0):
"""Push a value into a list in Redis. Useful for detecting the order in
which jobs were executed."""
if sleep:
time.sleep(sleep)
if append_worker_name:
value += ':' + get_current_job().worker_name
redis = get_current_connection()
redis.rpush(key, value)


def check_dependencies_are_met():
return get_current_job().dependencies_are_met()

Expand Down Expand Up @@ -106,11 +94,6 @@ def launch_process_within_worker_and_store_pid(path, timeout):
p.wait()


def access_self():
assert get_current_connection() is not None
assert get_current_job() is not None


def modify_self(meta):
j = get_current_job()
j.meta.update(meta)
Expand Down Expand Up @@ -156,12 +139,6 @@ def static_method():
return u"I'm a static method"


with Connection():
@job(queue='default')
def decorated_job(x, y):
return x + y


def black_hole(job, *exc_info):
# Don't fall through to default behaviour (moving to failed queue)
return False
Expand Down Expand Up @@ -241,15 +218,6 @@ def start_worker(queue_name, conn_kwargs, worker_name, burst):
w = Worker([queue_name], name=worker_name, connection=Redis(**conn_kwargs))
w.work(burst=burst)

def start_worker_process(queue_name, connection=None, worker_name=None, burst=False):
"""
Use multiprocessing to start a new worker in a separate process.
"""
connection = connection or get_current_connection()
conn_kwargs = connection.connection_pool.connection_kwargs
p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst))
p.start()
return p

def burst_two_workers(queue, timeout=2, tries=5, pause=0.1):
"""
Expand Down
17 changes: 8 additions & 9 deletions tests/test_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from rq import Queue, Worker
from rq.job import Job, JobStatus, UNEVALUATED
from rq.worker import SimpleWorker
from rq.worker import SimpleWorker, Worker


class QueueCallbackTestCase(RQTestCase):
Expand Down Expand Up @@ -51,7 +51,7 @@ class WorkerCallbackTestCase(RQTestCase):
def test_success_callback(self):
"""Test success callback is executed only when job is successful"""
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
worker = SimpleWorker(['default', 'high'], connection=self.testconn)

job = queue.enqueue(say_hello, on_success=save_result)

Expand All @@ -71,7 +71,7 @@ def test_success_callback(self):
def test_erroneous_success_callback(self):
"""Test exception handling when executing success callback"""
queue = Queue(connection=self.testconn)
worker = Worker([queue])
worker = Worker(['default', 'high'], connection=self.testconn)

# If success_callback raises an error, job will is considered as failed
job = queue.enqueue(say_hello, on_success=erroneous_callback)
Expand All @@ -81,15 +81,14 @@ def test_erroneous_success_callback(self):
def test_failure_callback(self):
"""Test failure callback is executed only when job a fails"""
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
worker = Worker(['default', 'high'], connection=self.testconn)

job = queue.enqueue(div_by_zero, on_failure=save_exception)

# Callback is executed when job is successfully executed
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
job.refresh()
print(job.exc_info)
self.assertIn('div_by_zero',
self.testconn.get('failure_callback:%s' % job.id).decode())

Expand All @@ -103,7 +102,7 @@ class JobCallbackTestCase(RQTestCase):

def test_job_creation_with_success_callback(self):
"""Ensure callbacks are created and persisted properly"""
job = Job.create(say_hello)
job = Job.create(say_hello, connection=self.testconn)
self.assertIsNone(job._success_callback_name)
# _success_callback starts with UNEVALUATED
self.assertEqual(job._success_callback, UNEVALUATED)
Expand All @@ -112,7 +111,7 @@ def test_job_creation_with_success_callback(self):
self.assertEqual(job._success_callback, None)

# job.success_callback is assigned properly
job = Job.create(say_hello, on_success=print)
job = Job.create(say_hello, on_success=print, connection=self.testconn)
self.assertIsNotNone(job._success_callback_name)
self.assertEqual(job.success_callback, print)
job.save()
Expand All @@ -122,7 +121,7 @@ def test_job_creation_with_success_callback(self):

def test_job_creation_with_failure_callback(self):
"""Ensure failure callbacks are persisted properly"""
job = Job.create(say_hello)
job = Job.create(say_hello, connection=self.testconn)
self.assertIsNone(job._failure_callback_name)
# _failure_callback starts with UNEVALUATED
self.assertEqual(job._failure_callback, UNEVALUATED)
Expand All @@ -131,7 +130,7 @@ def test_job_creation_with_failure_callback(self):
self.assertEqual(job._failure_callback, None)

# job.failure_callback is assigned properly
job = Job.create(say_hello, on_failure=print)
job = Job.create(say_hello, on_failure=print, connection=self.testconn)
self.assertIsNotNone(job._failure_callback_name)
self.assertEqual(job.failure_callback, print)
job.save()
Expand Down
15 changes: 7 additions & 8 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from dateutil.tz import tzlocal
from dateutil.tz import UTC
from rq import Queue
from rq.compat import as_text
from rq.job import Job

from rq_scheduler import Scheduler
Expand All @@ -28,7 +27,7 @@ def say_hello(name=None):


def tl(l):
return [as_text(i) for i in l]
return [i.decode() for i in l]


def simple_addition(x, y, z):
Expand Down Expand Up @@ -382,9 +381,9 @@ def test_enqueue_job(self):
self.assertTrue(job.enqueued_at is not None)
queue = scheduler.get_queue_for_job(job)
self.assertIn(job, queue.jobs)
queue = Queue.from_queue_key('rq:queue:{0}'.format(queue_name))
queue = Queue.from_queue_key('rq:queue:{0}'.format(queue_name), connection=self.testconn)
self.assertIn(job, queue.jobs)
self.assertIn(queue, Queue.all())
self.assertIn(queue, Queue.all(self.testconn))

def test_enqueue_job_with_scheduler_queue(self):
"""
Expand All @@ -399,7 +398,7 @@ def test_enqueue_job_with_scheduler_queue(self):
scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, queue.jobs)
self.assertIn(queue, Queue.all())
self.assertIn(queue, Queue.all(self.testconn))

def test_enqueue_job_with_job_queue_name(self):
"""
Expand All @@ -414,7 +413,7 @@ def test_enqueue_job_with_job_queue_name(self):
scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, job_queue.jobs)
self.assertIn(job_queue, Queue.all())
self.assertIn(job_queue, Queue.all(self.testconn))

def test_enqueue_at_with_job_queue_name(self):
"""
Expand All @@ -429,7 +428,7 @@ def test_enqueue_at_with_job_queue_name(self):
self.scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, job_queue.jobs)
self.assertIn(job_queue, Queue.all())
self.assertIn(job_queue, Queue.all(self.testconn))

def test_job_membership(self):
now = datetime.utcnow()
Expand Down Expand Up @@ -798,7 +797,7 @@ def test_scheduler_w_o_explicit_connection(self):
"""
Ensure instantiating Scheduler w/o explicit connection works.
"""
s = Scheduler()
s = Scheduler(connection=self.testconn)
self.assertEqual(s.connection, self.testconn)

def test_small_float_interval(self):
Expand Down
Loading