Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions redis/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from typing import Awaitable, Callable, List, Optional, Protocol, Union

from redis.maintenance_events import (
MaintenanceEvent,
NodeFailedOverEvent,
NodeFailingOverEvent,
NodeMigratedEvent,
NodeMigratingEvent,
NodeMovingEvent,
MaintenanceNotification,
NodeFailedOverNotification,
NodeFailingOverNotification,
NodeMigratedNotification,
NodeMigratingNotification,
NodeMovingNotification,
)

if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
Expand Down Expand Up @@ -175,14 +175,14 @@ class MaintenanceNotificationsParser:

@staticmethod
def parse_maintenance_start_msg(response, notification_type):
# Expected message format is: <event_type> <seq_number> <time>
# Expected message format is: <notification_type> <seq_number> <time>
id = response[1]
ttl = response[2]
return notification_type(id, ttl)

@staticmethod
def parse_maintenance_completed_msg(response, notification_type):
# Expected message format is: <event_type> <seq_number>
# Expected message format is: <notification_type> <seq_number>
id = response[1]
return notification_type(id)

Expand All @@ -200,7 +200,7 @@ def parse_moving_msg(response):
host, port = value.split(":")
port = int(port) if port is not None else None

return NodeMovingEvent(id, host, port, ttl)
return NodeMovingNotification(id, host, port, ttl)


_INVALIDATION_MESSAGE = "invalidate"
Expand All @@ -217,25 +217,27 @@ def parse_moving_msg(response):
_FAILED_OVER_MESSAGE,
)

MSG_TYPE_TO_EVENT_PARSER_MAPPING: dict[str, tuple[type[MaintenanceEvent], Callable]] = {
MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING: dict[
str, tuple[type[MaintenanceNotification], Callable]
] = {
_MIGRATING_MESSAGE: (
NodeMigratingEvent,
NodeMigratingNotification,
MaintenanceNotificationsParser.parse_maintenance_start_msg,
),
_MIGRATED_MESSAGE: (
NodeMigratedEvent,
NodeMigratedNotification,
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
),
_FAILING_OVER_MESSAGE: (
NodeFailingOverEvent,
NodeFailingOverNotification,
MaintenanceNotificationsParser.parse_maintenance_start_msg,
),
_FAILED_OVER_MESSAGE: (
NodeFailedOverEvent,
NodeFailedOverNotification,
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
),
_MOVING_MESSAGE: (
NodeMovingEvent,
NodeMovingNotification,
MaintenanceNotificationsParser.parse_moving_msg,
),
}
Expand Down Expand Up @@ -273,14 +275,20 @@ def handle_push_response(self, response, **kwargs):
return self.invalidation_push_handler_func(response)

if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]

notification = parser_function(response)
return self.node_moving_push_handler_func(notification)

if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][0]
notification = parser_function(response, notification_type)

if notification is not None:
Expand Down Expand Up @@ -342,13 +350,19 @@ async def handle_push_response(self, response, **kwargs):
msg_type = msg_type.decode()

if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification = parser_function(response)
return await self.node_moving_push_handler_func(notification)

if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
parser_function = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][1]
notification_type = MSG_TYPE_TO_MAINT_NOTIFICATION_PARSER_MAPPING[
msg_type
][0]
notification = parser_function(response, notification_type)

if notification is not None:
Expand Down
28 changes: 14 additions & 14 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
)
from redis.lock import Lock
from redis.maintenance_events import (
MaintenanceEventPoolHandler,
MaintenanceEventsConfig,
MaintNotificationsConfig,
MaintNotificationsPoolHandler,
)
from redis.retry import Retry
from redis.utils import (
Expand Down Expand Up @@ -248,7 +248,7 @@ def __init__(
cache: Optional[CacheInterface] = None,
cache_config: Optional[CacheConfig] = None,
event_dispatcher: Optional[EventDispatcher] = None,
maintenance_events_config: Optional[MaintenanceEventsConfig] = None,
maint_notifications_config: Optional[MaintNotificationsConfig] = None,
) -> None:
"""
Initialize a new Redis client.
Expand Down Expand Up @@ -373,22 +373,22 @@ def __init__(
]:
raise RedisError("Client caching is only supported with RESP version 3")

if maintenance_events_config and self.connection_pool.get_protocol() not in [
if maint_notifications_config and self.connection_pool.get_protocol() not in [
3,
"3",
]:
raise RedisError(
"Push handlers on connection are only supported with RESP version 3"
)
if maintenance_events_config and maintenance_events_config.enabled:
self.maintenance_events_pool_handler = MaintenanceEventPoolHandler(
self.connection_pool, maintenance_events_config
if maint_notifications_config and maint_notifications_config.enabled:
self.maint_notifications_pool_handler = MaintNotificationsPoolHandler(
self.connection_pool, maint_notifications_config
)
self.connection_pool.set_maintenance_events_pool_handler(
self.maintenance_events_pool_handler
self.connection_pool.set_maint_notifications_pool_handler(
self.maint_notifications_pool_handler
)
else:
self.maintenance_events_pool_handler = None
self.maint_notifications_pool_handler = None

self.single_connection_lock = threading.RLock()
self.connection = None
Expand Down Expand Up @@ -587,15 +587,15 @@ def monitor(self):
return Monitor(self.connection_pool)

def client(self):
maintenance_events_config = (
maint_notifications_config = (
None
if self.maintenance_events_pool_handler is None
else self.maintenance_events_pool_handler.config
if self.maint_notifications_pool_handler is None
else self.maint_notifications_pool_handler.config
)
return self.__class__(
connection_pool=self.connection_pool,
single_connection_client=True,
maintenance_events_config=maintenance_events_config,
maint_notifications_config=maint_notifications_config,
)

def __enter__(self):
Expand Down
Loading