Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,9 @@ allow-magic-value-types = ["int", "str", "float"]

[tool.ruff.lint.flake8-bugbear]
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]

[tool.pytest.ini_options]
filterwarnings = [
# about deprecated RedisScheduleSource usage - delete after removing RedisScheduleSource
'ignore:RedisScheduleSource is deprecated:DeprecationWarning',
]
14 changes: 7 additions & 7 deletions taskiq_redis/list_schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async def _get_previous_time_schedules(self) -> list[bytes]:
if key_time and key_time <= minute_before:
time_keys.append(key.decode())
for key in time_keys:
schedules.extend(await redis.lrange(key, 0, -1))
schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore[misc]

return schedules

Expand All @@ -146,10 +146,10 @@ async def delete_schedule(self, schedule_id: str) -> None:
)
# We need to remove the schedule from the cron or time list.
if schedule.cron is not None:
await redis.lrem(self._get_cron_key(), 0, schedule_id)
await redis.lrem(self._get_cron_key(), 0, schedule_id) # type: ignore[misc]
elif schedule.time is not None:
time_key = self._get_time_key(schedule.time)
await redis.lrem(time_key, 0, schedule_id)
await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc]

async def add_schedule(self, schedule: "ScheduledTask") -> None:
"""Add a schedule to the source."""
Expand All @@ -163,9 +163,9 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None:
# This is an optimization, so we can get all the schedules
# for the current time much faster.
if schedule.cron is not None:
await redis.rpush(self._get_cron_key(), schedule.schedule_id)
await redis.rpush(self._get_cron_key(), schedule.schedule_id) # type: ignore[misc]
elif schedule.time is not None:
await redis.rpush(
await redis.rpush( # type: ignore[misc]
self._get_time_key(schedule.time),
schedule.schedule_id,
)
Expand Down Expand Up @@ -195,11 +195,11 @@ async def get_schedules(self) -> List["ScheduledTask"]:
self._is_first_run = False
async with Redis(connection_pool=self._connection_pool) as redis:
buffer = []
crons = await redis.lrange(self._get_cron_key(), 0, -1)
crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore[misc]
logger.debug("Got %d cron schedules", len(crons))
if crons:
buffer.extend(crons)
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1))
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[misc]
logger.debug("Got %d timed schedules", len(timed))
if timed:
buffer.extend(timed)
Expand Down
121 changes: 55 additions & 66 deletions taskiq_redis/redis_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
TYPE_CHECKING,
Any,
AsyncIterator,
Dict,
List,
Optional,
Tuple,
Expand Down Expand Up @@ -121,17 +120,15 @@ async def set_result(
:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
redis_set_params: Dict[str, Union[str, int, bytes]] = {
"name": self._task_name(task_id),
"value": self.serializer.dumpb(model_dump(result)),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

name = self._task_name(task_id)
value = self.serializer.dumpb(model_dump(result))
async with Redis(connection_pool=self.redis_pool) as redis:
await redis.set(**redis_set_params)
if self.result_ex_time:
await redis.set(name=name, value=value, ex=self.result_ex_time)
elif self.result_px_time:
await redis.set(name=name, value=value, px=self.result_px_time)
else:
await redis.set(name=name, value=value)

async def is_result_ready(self, task_id: str) -> bool:
"""
Expand Down Expand Up @@ -195,17 +192,15 @@ async def set_progress(
:param task_id: ID of the task.
:param result: task's TaskProgress instance.
"""
redis_set_params: Dict[str, Union[str, int, bytes]] = {
"name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX,
"value": self.serializer.dumpb(model_dump(progress)),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX
value = self.serializer.dumpb(model_dump(progress))
async with Redis(connection_pool=self.redis_pool) as redis:
await redis.set(**redis_set_params)
if self.result_ex_time:
await redis.set(name=name, value=value, ex=self.result_ex_time)
elif self.result_px_time:
await redis.set(name=name, value=value, px=self.result_px_time)
else:
await redis.set(name=name, value=value)

async def get_progress(
self,
Expand Down Expand Up @@ -296,24 +291,23 @@ async def set_result(
result: TaskiqResult[_ReturnType],
) -> None:
"""
Sets task result in redis.
Sets task result in redis cluster.

Dumps TaskiqResult instance into the bytes and writes
it to redis.
it to redis cluster.

:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
redis_set_params: Dict[str, Union[str, bytes, int]] = {
"name": self._task_name(task_id),
"value": self.serializer.dumpb(model_dump(result)),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

await self.redis.set(**redis_set_params) # type: ignore
name = self._task_name(task_id)
value = self.serializer.dumpb(model_dump(result))
async with self.redis as redis:
if self.result_ex_time:
await redis.set(name=name, value=value, ex=self.result_ex_time)
elif self.result_px_time:
await redis.set(name=name, value=value, px=self.result_px_time)
else:
await redis.set(name=name, value=value)

async def is_result_ready(self, task_id: str) -> bool:
"""
Expand Down Expand Up @@ -367,24 +361,23 @@ async def set_progress(
progress: TaskProgress[_ReturnType],
) -> None:
"""
Sets task progress in redis.
Sets task progress in redis cluster.

Dumps TaskProgress instance into the bytes and writes
it to redis with a standard suffix on the task_id as the key
it to redis cluster with a standard suffix on the task_id as the key

:param task_id: ID of the task.
:param result: task's TaskProgress instance.
"""
redis_set_params: Dict[str, Union[str, int, bytes]] = {
"name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX,
"value": self.serializer.dumpb(model_dump(progress)),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

await self.redis.set(**redis_set_params) # type: ignore
name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX
value = self.serializer.dumpb(model_dump(progress))
async with self.redis as redis:
if self.result_ex_time:
await redis.set(name=name, value=value, ex=self.result_ex_time)
elif self.result_px_time:
await redis.set(name=name, value=value, px=self.result_px_time)
else:
await redis.set(name=name, value=value)

async def get_progress(
self,
Expand Down Expand Up @@ -490,17 +483,15 @@ async def set_result(
:param task_id: ID of the task.
:param result: TaskiqResult instance.
"""
redis_set_params: Dict[str, Union[str, bytes, int]] = {
"name": self._task_name(task_id),
"value": self.serializer.dumpb(model_dump(result)),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

name = self._task_name(task_id)
value = self.serializer.dumpb(model_dump(result))
async with self._acquire_master_conn() as redis:
await redis.set(**redis_set_params) # type: ignore
if self.result_ex_time:
await redis.set(name=name, value=value, ex=self.result_ex_time)
elif self.result_px_time:
await redis.set(name=name, value=value, px=self.result_px_time)
else:
await redis.set(name=name, value=value)

async def is_result_ready(self, task_id: str) -> bool:
"""
Expand Down Expand Up @@ -559,22 +550,20 @@ async def set_progress(
Sets task progress in redis.

Dumps TaskProgress instance into the bytes and writes
it to redis with a standard suffix on the task_id as the key
it to redis via sentinel with a standard suffix on the task_id as the key

:param task_id: ID of the task.
:param result: task's TaskProgress instance.
"""
redis_set_params: Dict[str, Union[str, int, bytes]] = {
"name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX,
"value": self.serializer.dumpb(model_dump(progress)),
}
if self.result_ex_time:
redis_set_params["ex"] = self.result_ex_time
elif self.result_px_time:
redis_set_params["px"] = self.result_px_time

name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX
value = self.serializer.dumpb(model_dump(progress))
async with self._acquire_master_conn() as redis:
await redis.set(**redis_set_params) # type: ignore
if self.result_ex_time:
await redis.set(name=name, value=value, ex=self.result_ex_time)
elif self.result_px_time:
await redis.set(name=name, value=value, px=self.result_px_time)
else:
await redis.set(name=name, value=value)

async def get_progress(
self,
Expand Down
9 changes: 5 additions & 4 deletions taskiq_redis/redis_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Dict,
Optional,
TypeVar,
Union,
)

from redis.asyncio import BlockingConnectionPool, Connection, Redis, ResponseError
Expand Down Expand Up @@ -122,7 +123,7 @@ async def kick(self, message: BrokerMessage) -> None:
"""
queue_name = message.labels.get("queue_name") or self.queue_name
async with Redis(connection_pool=self.connection_pool) as redis_conn:
await redis_conn.lpush(queue_name, message.message)
await redis_conn.lpush(queue_name, message.message) # type: ignore

async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Expand All @@ -137,7 +138,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
while True:
try:
async with Redis(connection_pool=self.connection_pool) as redis_conn:
yield (await redis_conn.brpop(self.queue_name))[
yield (await redis_conn.brpop(self.queue_name))[ # type: ignore
redis_brpop_data_position
]
except ConnectionError as exc:
Expand Down Expand Up @@ -170,7 +171,7 @@ def __init__(
idle_timeout: int = 600000, # 10 minutes
unacknowledged_batch_size: int = 100,
xread_count: Optional[int] = 100,
additional_streams: Optional[Dict[str, str]] = None,
additional_streams: Optional[Dict[str, Union[str, int]]] = None,
**connection_kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -281,7 +282,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
self.consumer_name,
{
self.queue_name: ">",
**self.additional_streams,
**self.additional_streams, # type: ignore[dict-item]
},
block=self.block,
noack=False,
Expand Down