From bc4541768471dd68a04a141e80b272d1536db672 Mon Sep 17 00:00:00 2001 From: Name Date: Fri, 10 Nov 2023 19:03:06 +0300 Subject: [PATCH 1/4] implement redis cluster broker --- taskiq_redis/__init__.py | 2 + taskiq_redis/redis_cluster_broker.py | 78 ++++++++++++++++++++++++++++ tests/test_broker.py | 33 +++++++++++- 3 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 taskiq_redis/redis_cluster_broker.py diff --git a/taskiq_redis/__init__.py b/taskiq_redis/__init__.py index 36cec84..5a17567 100644 --- a/taskiq_redis/__init__.py +++ b/taskiq_redis/__init__.py @@ -4,6 +4,7 @@ RedisAsyncResultBackend, ) from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker +from taskiq_redis.redis_cluster_broker import ListQueueClusterBroker from taskiq_redis.schedule_source import RedisScheduleSource __all__ = [ @@ -11,5 +12,6 @@ "RedisAsyncResultBackend", "ListQueueBroker", "PubSubBroker", + "ListQueueClusterBroker", "RedisScheduleSource", ] diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py new file mode 100644 index 0000000..24a2de3 --- /dev/null +++ b/taskiq_redis/redis_cluster_broker.py @@ -0,0 +1,78 @@ +from typing import Any, AsyncGenerator, Callable, Optional, TypeVar + +from redis.asyncio import RedisCluster +from taskiq.abc.broker import AsyncBroker +from taskiq.abc.result_backend import AsyncResultBackend +from taskiq.message import BrokerMessage + +_T = TypeVar("_T") + + +class BaseRedisClusterBroker(AsyncBroker): + """Base broker that works with Redis Cluster.""" + + def __init__( + self, + url: str, + task_id_generator: Optional[Callable[[], str]] = None, + result_backend: Optional[AsyncResultBackend[_T]] = None, + queue_name: str = "taskiq", + max_connection_pool_size: int = 2**31, + **connection_kwargs: Any, + ) -> None: + """ + Constructs a new broker. + + :param url: url to redis. + :param task_id_generator: custom task_id generator. + :param result_backend: custom result backend. + :param queue_name: name for a list in redis. + :param max_connection_pool_size: maximum number of connections in pool. + :param connection_kwargs: additional arguments for aio-redis ConnectionPool. + """ + super().__init__( + result_backend=result_backend, + task_id_generator=task_id_generator, + ) + + self.redis: RedisCluster[bytes] = RedisCluster.from_url( + url=url, + max_connections=max_connection_pool_size, + **connection_kwargs, + ) + + self.queue_name = queue_name + + async def shutdown(self) -> None: + """Closes redis connection pool.""" + await self.redis.aclose() # type: ignore[attr-defined] + await super().shutdown() + + +class ListQueueClusterBroker(BaseRedisClusterBroker): + """Broker that works with Redis Cluster and distributes tasks between workers.""" + + async def kick(self, message: BrokerMessage) -> None: + """ + Put a message in a list. + + This method appends a message to the list of all messages. + + :param message: message to append. + """ + await self.redis.lpush(self.queue_name, message.message) + + async def listen(self) -> AsyncGenerator[bytes, None]: + """ + Listen redis queue for new messages. + + This function listens to the queue + and yields new messages if they have BrokerMessage type. + + :yields: broker messages. + """ + redis_brpop_data_position = 1 + while True: + yield (await self.redis.brpop([self.queue_name]))[ + redis_brpop_data_position + ] diff --git a/tests/test_broker.py b/tests/test_broker.py index f664a96..ad20797 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -5,7 +5,7 @@ import pytest from taskiq import AckableMessage, AsyncBroker, BrokerMessage -from taskiq_redis import ListQueueBroker, PubSubBroker +from taskiq_redis import ListQueueBroker, PubSubBroker, ListQueueClusterBroker def test_no_url_should_raise_typeerror() -> None: @@ -96,3 +96,34 @@ async def test_list_queue_broker( worker1_task.cancel() worker2_task.cancel() await broker.shutdown() + + +@pytest.mark.anyio +async def test_list_queue_cluster_broker( + valid_broker_message: BrokerMessage, + redis_cluster_url: str, +) -> None: + """ + Test that messages are published and read correctly by ListQueueClusterBroker. + + We create two workers that listen and send a message to them. + Expect only one worker to receive the same message we sent. + """ + + print(f"redis_cluster_url: {redis_cluster_url}") + broker = ListQueueClusterBroker( + url=redis_cluster_url, queue_name=uuid.uuid4().hex + ) + worker1_task = asyncio.create_task(get_message(broker)) + worker2_task = asyncio.create_task(get_message(broker)) + await asyncio.sleep(0.3) + + await broker.kick(valid_broker_message) + await asyncio.sleep(0.3) + + assert worker1_task.done() != worker2_task.done() + message = worker1_task.result() if worker1_task.done() else worker2_task.result() + assert message == valid_broker_message.message + worker1_task.cancel() + worker2_task.cancel() + await broker.shutdown() From f90c0fe84a8e9c6957ecb38180c8dde61d3ebead Mon Sep 17 00:00:00 2001 From: Name Date: Mon, 13 Nov 2023 12:09:38 +0300 Subject: [PATCH 2/4] fix linters + test --- taskiq_redis/redis_cluster_broker.py | 7 +++---- tests/test_broker.py | 18 +++++++----------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py index 24a2de3..6558726 100644 --- a/taskiq_redis/redis_cluster_broker.py +++ b/taskiq_redis/redis_cluster_broker.py @@ -60,7 +60,7 @@ async def kick(self, message: BrokerMessage) -> None: :param message: message to append. """ - await self.redis.lpush(self.queue_name, message.message) + await self.redis.lpush(self.queue_name, message.message) # type: ignore[attr-defined] async def listen(self) -> AsyncGenerator[bytes, None]: """ @@ -73,6 +73,5 @@ async def listen(self) -> AsyncGenerator[bytes, None]: """ redis_brpop_data_position = 1 while True: - yield (await self.redis.brpop([self.queue_name]))[ - redis_brpop_data_position - ] + value = await self.redis.brpop([self.queue_name]) # type: ignore[attr-defined] + yield value[redis_brpop_data_position] diff --git a/tests/test_broker.py b/tests/test_broker.py index ad20797..813e72e 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -5,7 +5,7 @@ import pytest from taskiq import AckableMessage, AsyncBroker, BrokerMessage -from taskiq_redis import ListQueueBroker, PubSubBroker, ListQueueClusterBroker +from taskiq_redis import ListQueueBroker, ListQueueClusterBroker, PubSubBroker def test_no_url_should_raise_typeerror() -> None: @@ -109,21 +109,17 @@ async def test_list_queue_cluster_broker( We create two workers that listen and send a message to them. Expect only one worker to receive the same message we sent. """ - - print(f"redis_cluster_url: {redis_cluster_url}") broker = ListQueueClusterBroker( - url=redis_cluster_url, queue_name=uuid.uuid4().hex + url=redis_cluster_url, + queue_name=uuid.uuid4().hex, ) - worker1_task = asyncio.create_task(get_message(broker)) - worker2_task = asyncio.create_task(get_message(broker)) + worker_task = asyncio.create_task(get_message(broker)) await asyncio.sleep(0.3) await broker.kick(valid_broker_message) await asyncio.sleep(0.3) - assert worker1_task.done() != worker2_task.done() - message = worker1_task.result() if worker1_task.done() else worker2_task.result() - assert message == valid_broker_message.message - worker1_task.cancel() - worker2_task.cancel() + assert worker_task.done() + assert worker_task.result() == valid_broker_message.message + worker_task.cancel() await broker.shutdown() From 5c3080dec46805b26dcc98557d6229edd27ab93f Mon Sep 17 00:00:00 2001 From: "zazymkin.g" Date: Mon, 13 Nov 2023 12:44:42 +0300 Subject: [PATCH 3/4] remove task_id_generator from BaseRedisClusterBroker --- taskiq_redis/redis_cluster_broker.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py index 6558726..7fa0a6a 100644 --- a/taskiq_redis/redis_cluster_broker.py +++ b/taskiq_redis/redis_cluster_broker.py @@ -1,4 +1,4 @@ -from typing import Any, AsyncGenerator, Callable, Optional, TypeVar +from typing import Any, AsyncGenerator, Optional, TypeVar from redis.asyncio import RedisCluster from taskiq.abc.broker import AsyncBroker @@ -14,7 +14,6 @@ class BaseRedisClusterBroker(AsyncBroker): def __init__( self, url: str, - task_id_generator: Optional[Callable[[], str]] = None, result_backend: Optional[AsyncResultBackend[_T]] = None, queue_name: str = "taskiq", max_connection_pool_size: int = 2**31, @@ -24,16 +23,12 @@ def __init__( Constructs a new broker. :param url: url to redis. - :param task_id_generator: custom task_id generator. :param result_backend: custom result backend. :param queue_name: name for a list in redis. :param max_connection_pool_size: maximum number of connections in pool. :param connection_kwargs: additional arguments for aio-redis ConnectionPool. """ - super().__init__( - result_backend=result_backend, - task_id_generator=task_id_generator, - ) + super().__init__(result_backend=result_backend) self.redis: RedisCluster[bytes] = RedisCluster.from_url( url=url, From d4189c6328e543943b78e65a59f631b7ebcdd4e4 Mon Sep 17 00:00:00 2001 From: Name Date: Mon, 13 Nov 2023 13:55:40 +0300 Subject: [PATCH 4/4] remove result_backend from BaseRedisClusterBroker --- taskiq_redis/redis_cluster_broker.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/taskiq_redis/redis_cluster_broker.py b/taskiq_redis/redis_cluster_broker.py index 7fa0a6a..af2d30f 100644 --- a/taskiq_redis/redis_cluster_broker.py +++ b/taskiq_redis/redis_cluster_broker.py @@ -1,12 +1,9 @@ -from typing import Any, AsyncGenerator, Optional, TypeVar +from typing import Any, AsyncGenerator from redis.asyncio import RedisCluster from taskiq.abc.broker import AsyncBroker -from taskiq.abc.result_backend import AsyncResultBackend from taskiq.message import BrokerMessage -_T = TypeVar("_T") - class BaseRedisClusterBroker(AsyncBroker): """Base broker that works with Redis Cluster.""" @@ -14,7 +11,6 @@ class BaseRedisClusterBroker(AsyncBroker): def __init__( self, url: str, - result_backend: Optional[AsyncResultBackend[_T]] = None, queue_name: str = "taskiq", max_connection_pool_size: int = 2**31, **connection_kwargs: Any, @@ -23,12 +19,11 @@ def __init__( Constructs a new broker. :param url: url to redis. - :param result_backend: custom result backend. :param queue_name: name for a list in redis. :param max_connection_pool_size: maximum number of connections in pool. :param connection_kwargs: additional arguments for aio-redis ConnectionPool. """ - super().__init__(result_backend=result_backend) + super().__init__() self.redis: RedisCluster[bytes] = RedisCluster.from_url( url=url,