From 43e516abfaef617e278236f9c518d1b0361b94ef Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Tue, 23 Sep 2025 11:34:36 +0300 Subject: [PATCH 1/5] Rename relax_timeout to relaxed_timeout --- redis/connection.py | 64 +++---- redis/maintenance_events.py | 54 +++--- tests/test_maintenance_events.py | 68 ++++---- tests/test_maintenance_events_handling.py | 178 ++++++++++---------- tests/test_scenario/conftest.py | 6 +- tests/test_scenario/test_hitless_upgrade.py | 12 +- 6 files changed, 192 insertions(+), 190 deletions(-) diff --git a/redis/connection.py b/redis/connection.py index 226d22539c..9e6d0fe431 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -289,7 +289,7 @@ def get_resolved_ip(self): pass @abstractmethod - def update_current_socket_timeout(self, relax_timeout: Optional[float] = None): + def update_current_socket_timeout(self, relaxed_timeout: Optional[float] = None): """ Update the timeout for the current socket. """ @@ -299,7 +299,7 @@ def update_current_socket_timeout(self, relax_timeout: Optional[float] = None): def set_tmp_settings( self, tmp_host_address: Optional[str] = None, - tmp_relax_timeout: Optional[float] = None, + tmp_relaxed_timeout: Optional[float] = None, ): """ Updates temporary host address and timeout settings for the connection. @@ -310,7 +310,7 @@ def set_tmp_settings( def reset_tmp_settings( self, reset_host_address: bool = False, - reset_relax_timeout: bool = False, + reset_relaxed_timeout: bool = False, ): """ Resets temporary host address and timeout settings for the connection. @@ -1022,9 +1022,9 @@ def mark_for_reconnect(self): def should_reconnect(self): return self._should_reconnect - def update_current_socket_timeout(self, relax_timeout: Optional[float] = None): + def update_current_socket_timeout(self, relaxed_timeout: Optional[float] = None): if self._sock: - timeout = relax_timeout if relax_timeout != -1 else self.socket_timeout + timeout = relaxed_timeout if relaxed_timeout != -1 else self.socket_timeout self._sock.settimeout(timeout) self.update_parser_buffer_timeout(timeout) @@ -1035,25 +1035,25 @@ def update_parser_buffer_timeout(self, timeout: Optional[float] = None): def set_tmp_settings( self, tmp_host_address: Optional[Union[str, object]] = SENTINEL, - tmp_relax_timeout: Optional[float] = None, + tmp_relaxed_timeout: Optional[float] = None, ): """ The value of SENTINEL is used to indicate that the property should not be updated. """ if tmp_host_address is not SENTINEL: self.host = tmp_host_address - if tmp_relax_timeout != -1: - self.socket_timeout = tmp_relax_timeout - self.socket_connect_timeout = tmp_relax_timeout + if tmp_relaxed_timeout != -1: + self.socket_timeout = tmp_relaxed_timeout + self.socket_connect_timeout = tmp_relaxed_timeout def reset_tmp_settings( self, reset_host_address: bool = False, - reset_relax_timeout: bool = False, + reset_relaxed_timeout: bool = False, ): if reset_host_address: self.host = self.orig_host_address - if reset_relax_timeout: + if reset_relaxed_timeout: self.socket_timeout = self.orig_socket_timeout self.socket_connect_timeout = self.orig_socket_connect_timeout @@ -2081,10 +2081,10 @@ def update_connection_settings( state: Optional["MaintenanceState"] = None, maintenance_event_hash: Optional[int] = None, host_address: Optional[str] = None, - relax_timeout: Optional[float] = None, + relaxed_timeout: Optional[float] = None, update_event_hash: bool = False, reset_host_address: bool = False, - reset_relax_timeout: bool = False, + reset_relaxed_timeout: bool = False, ): """ Update the settings for a single connection. @@ -2099,23 +2099,23 @@ def update_connection_settings( if host_address is not None: conn.set_tmp_settings(tmp_host_address=host_address) - if relax_timeout is not None: - conn.set_tmp_settings(tmp_relax_timeout=relax_timeout) + if relaxed_timeout is not None: + conn.set_tmp_settings(tmp_relaxed_timeout=relaxed_timeout) - if reset_relax_timeout or reset_host_address: + if reset_relaxed_timeout or reset_host_address: conn.reset_tmp_settings( reset_host_address=reset_host_address, - reset_relax_timeout=reset_relax_timeout, + reset_relaxed_timeout=reset_relaxed_timeout, ) - conn.update_current_socket_timeout(relax_timeout) + conn.update_current_socket_timeout(relaxed_timeout) def update_connections_settings( self, state: Optional["MaintenanceState"] = None, maintenance_event_hash: Optional[int] = None, host_address: Optional[str] = None, - relax_timeout: Optional[float] = None, + relaxed_timeout: Optional[float] = None, matching_address: Optional[str] = None, matching_event_hash: Optional[int] = None, matching_pattern: Literal[ @@ -2123,7 +2123,7 @@ def update_connections_settings( ] = "connected_address", update_event_hash: bool = False, reset_host_address: bool = False, - reset_relax_timeout: bool = False, + reset_relaxed_timeout: bool = False, include_free_connections: bool = True, ): """ @@ -2136,13 +2136,13 @@ def update_connections_settings( :param maintenance_event_hash: The hash of the maintenance event to set for the connection. :param host_address: The host address to set for the connection. - :param relax_timeout: The relax timeout to set for the connection. + :param relaxed_timeout: The relaxed timeout to set for the connection. :param matching_address: The address to match for the connection. :param matching_event_hash: The event hash to match for the connection. :param matching_pattern: The pattern to match for the connection. :param update_event_hash: Whether to update the event hash for the connection. :param reset_host_address: Whether to reset the host address to the original address. - :param reset_relax_timeout: Whether to reset the relax timeout to the original timeout. + :param reset_relaxed_timeout: Whether to reset the relaxed timeout to the original timeout. :param include_free_connections: Whether to include free/available connections. """ with self._lock: @@ -2158,10 +2158,10 @@ def update_connections_settings( state=state, maintenance_event_hash=maintenance_event_hash, host_address=host_address, - relax_timeout=relax_timeout, + relaxed_timeout=relaxed_timeout, update_event_hash=update_event_hash, reset_host_address=reset_host_address, - reset_relax_timeout=reset_relax_timeout, + reset_relaxed_timeout=reset_relaxed_timeout, ) if include_free_connections: @@ -2177,10 +2177,10 @@ def update_connections_settings( state=state, maintenance_event_hash=maintenance_event_hash, host_address=host_address, - relax_timeout=relax_timeout, + relaxed_timeout=relaxed_timeout, update_event_hash=update_event_hash, reset_host_address=reset_host_address, - reset_relax_timeout=reset_relax_timeout, + reset_relaxed_timeout=reset_relaxed_timeout, ) def update_connection_kwargs( @@ -2473,7 +2473,7 @@ def update_connections_settings( self, state: Optional["MaintenanceState"] = None, maintenance_event_hash: Optional[int] = None, - relax_timeout: Optional[float] = None, + relaxed_timeout: Optional[float] = None, host_address: Optional[str] = None, matching_address: Optional[str] = None, matching_event_hash: Optional[int] = None, @@ -2482,7 +2482,7 @@ def update_connections_settings( ] = "connected_address", update_event_hash: bool = False, reset_host_address: bool = False, - reset_relax_timeout: bool = False, + reset_relaxed_timeout: bool = False, include_free_connections: bool = True, ): """ @@ -2502,10 +2502,10 @@ def update_connections_settings( state=state, maintenance_event_hash=maintenance_event_hash, host_address=host_address, - relax_timeout=relax_timeout, + relaxed_timeout=relaxed_timeout, update_event_hash=update_event_hash, reset_host_address=reset_host_address, - reset_relax_timeout=reset_relax_timeout, + reset_relaxed_timeout=reset_relaxed_timeout, ) else: connections_in_queue = {conn for conn in self.pool.queue if conn} @@ -2522,10 +2522,10 @@ def update_connections_settings( state=state, maintenance_event_hash=maintenance_event_hash, host_address=host_address, - relax_timeout=relax_timeout, + relaxed_timeout=relaxed_timeout, update_event_hash=update_event_hash, reset_host_address=reset_host_address, - reset_relax_timeout=reset_relax_timeout, + reset_relaxed_timeout=reset_relaxed_timeout, ) def update_active_connections_for_reconnect( diff --git a/redis/maintenance_events.py b/redis/maintenance_events.py index a3636019ed..c037c3309e 100644 --- a/redis/maintenance_events.py +++ b/redis/maintenance_events.py @@ -449,7 +449,7 @@ def __init__( self, enabled: bool = True, proactive_reconnect: bool = True, - relax_timeout: Optional[Number] = 10, + relaxed_timeout: Optional[Number] = 10, endpoint_type: Optional[EndpointType] = None, ): """ @@ -460,8 +460,8 @@ def __init__( Defaults to False. proactive_reconnect (bool): Whether to proactively reconnect when a node is replaced. Defaults to True. - relax_timeout (Number): The relax timeout to use for the connection during maintenance. - If -1 is provided - the relax timeout is disabled. Defaults to 20. + relaxed_timeout (Number): The relaxed timeout to use for the connection during maintenance. + If -1 is provided - the relaxed timeout is disabled. Defaults to 20. endpoint_type (Optional[EndpointType]): Override for the endpoint type to use in CLIENT MAINT_NOTIFICATIONS. If None, the endpoint type will be automatically determined based on the host and TLS configuration. Defaults to None. @@ -470,7 +470,7 @@ def __init__( ValueError: If endpoint_type is provided but is not a valid endpoint type. """ self.enabled = enabled - self.relax_timeout = relax_timeout + self.relaxed_timeout = relaxed_timeout self.proactive_reconnect = proactive_reconnect self.endpoint_type = endpoint_type @@ -479,21 +479,21 @@ def __repr__(self) -> str: f"{self.__class__.__name__}(" f"enabled={self.enabled}, " f"proactive_reconnect={self.proactive_reconnect}, " - f"relax_timeout={self.relax_timeout}, " + f"relaxed_timeout={self.relaxed_timeout}, " f"endpoint_type={self.endpoint_type!r}" f")" ) - def is_relax_timeouts_enabled(self) -> bool: + def is_relaxed_timeouts_enabled(self) -> bool: """ - Check if the relax_timeout is enabled. The '-1' value is used to disable the relax_timeout. - If relax_timeout is set to None, it will make the operation blocking + Check if the relaxed_timeout is enabled. The '-1' value is used to disable the relaxed_timeout. + If relaxed_timeout is set to None, it will make the operation blocking and waiting until any response is received. Returns: - True if the relax_timeout is enabled, False otherwise. + True if the relaxed_timeout is enabled, False otherwise. """ - return self.relax_timeout != -1 + return self.relaxed_timeout != -1 def get_endpoint_type( self, host: str, connection: "ConnectionInterface" @@ -582,7 +582,7 @@ def handle_event(self, notification: MaintenanceEvent): def handle_node_moving_event(self, event: NodeMovingEvent): if ( not self.config.proactive_reconnect - and not self.config.is_relax_timeouts_enabled() + and not self.config.is_relaxed_timeouts_enabled() ): return with self._lock: @@ -595,7 +595,7 @@ def handle_node_moving_event(self, event: NodeMovingEvent): with self.pool._lock: if ( self.config.proactive_reconnect - or self.config.is_relax_timeouts_enabled() + or self.config.is_relaxed_timeouts_enabled() ): # Get the current connected address - if any # This is the address that is being moved @@ -615,7 +615,7 @@ def handle_node_moving_event(self, event: NodeMovingEvent): self.pool.update_connections_settings( state=MaintenanceState.MOVING, maintenance_event_hash=hash(event), - relax_timeout=self.config.relax_timeout, + relaxed_timeout=self.config.relaxed_timeout, host_address=event.new_node_host, matching_address=moving_address_src, matching_pattern="connected_address", @@ -650,11 +650,11 @@ def handle_node_moving_event(self, event: NodeMovingEvent): "host": event.new_node_host, } ) - if self.config.is_relax_timeouts_enabled(): + if self.config.is_relaxed_timeouts_enabled(): kwargs.update( { - "socket_timeout": self.config.relax_timeout, - "socket_connect_timeout": self.config.relax_timeout, + "socket_timeout": self.config.relaxed_timeout, + "socket_connect_timeout": self.config.relaxed_timeout, } ) self.pool.update_connection_kwargs(**kwargs) @@ -715,17 +715,17 @@ def handle_node_moved_event(self, event: NodeMovingEvent): self.pool.update_connection_kwargs(**kwargs) with self.pool._lock: - reset_relax_timeout = self.config.is_relax_timeouts_enabled() + reset_relaxed_timeout = self.config.is_relaxed_timeouts_enabled() reset_host_address = self.config.proactive_reconnect self.pool.update_connections_settings( - relax_timeout=-1, + relaxed_timeout=-1, state=MaintenanceState.NONE, maintenance_event_hash=None, matching_event_hash=event_hash, matching_pattern="event_hash", update_event_hash=True, - reset_relax_timeout=reset_relax_timeout, + reset_relaxed_timeout=reset_relaxed_timeout, reset_host_address=reset_host_address, include_free_connections=True, ) @@ -762,24 +762,26 @@ def handle_event(self, event: MaintenanceEvent): def handle_maintenance_start_event(self, maintenance_state: MaintenanceState): if ( self.connection.maintenance_state == MaintenanceState.MOVING - or not self.config.is_relax_timeouts_enabled() + or not self.config.is_relaxed_timeouts_enabled() ): return self.connection.maintenance_state = maintenance_state - self.connection.set_tmp_settings(tmp_relax_timeout=self.config.relax_timeout) + self.connection.set_tmp_settings( + tmp_relaxed_timeout=self.config.relaxed_timeout + ) # extend the timeout for all created connections - self.connection.update_current_socket_timeout(self.config.relax_timeout) + self.connection.update_current_socket_timeout(self.config.relaxed_timeout) def handle_maintenance_completed_event(self): - # Only reset timeouts if state is not MOVING and relax timeouts are enabled + # Only reset timeouts if state is not MOVING and relaxed timeouts are enabled if ( self.connection.maintenance_state == MaintenanceState.MOVING - or not self.config.is_relax_timeouts_enabled() + or not self.config.is_relaxed_timeouts_enabled() ): return - self.connection.reset_tmp_settings(reset_relax_timeout=True) + self.connection.reset_tmp_settings(reset_relaxed_timeout=True) # Maintenance completed - reset the connection - # timeouts by providing -1 as the relax timeout + # timeouts by providing -1 as the relaxed timeout self.connection.update_current_socket_timeout(-1) self.connection.maintenance_state = MaintenanceState.NONE diff --git a/tests/test_maintenance_events.py b/tests/test_maintenance_events.py index 69bdad2947..6e31816bb5 100644 --- a/tests/test_maintenance_events.py +++ b/tests/test_maintenance_events.py @@ -381,52 +381,52 @@ def test_init_defaults(self): config = MaintenanceEventsConfig() assert config.enabled is True assert config.proactive_reconnect is True - assert config.relax_timeout == 10 + assert config.relaxed_timeout == 10 def test_init_custom_values(self): """Test MaintenanceEventsConfig initialization with custom values.""" config = MaintenanceEventsConfig( - enabled=True, proactive_reconnect=False, relax_timeout=30 + enabled=True, proactive_reconnect=False, relaxed_timeout=30 ) assert config.enabled is True assert config.proactive_reconnect is False - assert config.relax_timeout == 30 + assert config.relaxed_timeout == 30 def test_repr(self): """Test MaintenanceEventsConfig string representation.""" config = MaintenanceEventsConfig( - enabled=True, proactive_reconnect=False, relax_timeout=30 + enabled=True, proactive_reconnect=False, relaxed_timeout=30 ) repr_str = repr(config) assert "MaintenanceEventsConfig" in repr_str assert "enabled=True" in repr_str assert "proactive_reconnect=False" in repr_str - assert "relax_timeout=30" in repr_str + assert "relaxed_timeout=30" in repr_str - def test_is_relax_timeouts_enabled_true(self): - """Test is_relax_timeouts_enabled returns True for positive timeout.""" - config = MaintenanceEventsConfig(relax_timeout=20) - assert config.is_relax_timeouts_enabled() is True + def test_is_relaxed_timeouts_enabled_true(self): + """Test is_relaxed_timeouts_enabled returns True for positive timeout.""" + config = MaintenanceEventsConfig(relaxed_timeout=20) + assert config.is_relaxed_timeouts_enabled() is True - def test_is_relax_timeouts_enabled_false(self): - """Test is_relax_timeouts_enabled returns False for -1 timeout.""" - config = MaintenanceEventsConfig(relax_timeout=-1) - assert config.is_relax_timeouts_enabled() is False + def test_is_relaxed_timeouts_enabled_false(self): + """Test is_relaxed_timeouts_enabled returns False for -1 timeout.""" + config = MaintenanceEventsConfig(relaxed_timeout=-1) + assert config.is_relaxed_timeouts_enabled() is False - def test_is_relax_timeouts_enabled_zero(self): - """Test is_relax_timeouts_enabled returns True for zero timeout.""" - config = MaintenanceEventsConfig(relax_timeout=0) - assert config.is_relax_timeouts_enabled() is True + def test_is_relaxed_timeouts_enabled_zero(self): + """Test is_relaxed_timeouts_enabled returns True for zero timeout.""" + config = MaintenanceEventsConfig(relaxed_timeout=0) + assert config.is_relaxed_timeouts_enabled() is True - def test_is_relax_timeouts_enabled_none(self): - """Test is_relax_timeouts_enabled returns True for None timeout.""" - config = MaintenanceEventsConfig(relax_timeout=None) - assert config.is_relax_timeouts_enabled() is True + def test_is_relaxed_timeouts_enabled_none(self): + """Test is_relaxed_timeouts_enabled returns True for None timeout.""" + config = MaintenanceEventsConfig(relaxed_timeout=None) + assert config.is_relaxed_timeouts_enabled() is True - def test_relax_timeout_none_is_saved_as_none(self): - """Test that None value for relax_timeout is saved as None.""" - config = MaintenanceEventsConfig(relax_timeout=None) - assert config.relax_timeout is None + def test_relaxed_timeout_none_is_saved_as_none(self): + """Test that None value for relaxed_timeout is saved as None.""" + config = MaintenanceEventsConfig(relaxed_timeout=None) + assert config.relaxed_timeout is None class TestMaintenanceEventPoolHandler: @@ -439,7 +439,7 @@ def setup_method(self): self.mock_pool._lock.__enter__.return_value = None self.mock_pool._lock.__exit__.return_value = None self.config = MaintenanceEventsConfig( - enabled=True, proactive_reconnect=True, relax_timeout=20 + enabled=True, proactive_reconnect=True, relaxed_timeout=20 ) self.handler = MaintenanceEventPoolHandler(self.mock_pool, self.config) @@ -493,7 +493,7 @@ def test_handle_event_unknown_type(self): def test_handle_node_moving_event_disabled_config(self): """Test node moving event handling when both features are disabled.""" - config = MaintenanceEventsConfig(proactive_reconnect=False, relax_timeout=-1) + config = MaintenanceEventsConfig(proactive_reconnect=False, relaxed_timeout=-1) handler = MaintenanceEventPoolHandler(self.mock_pool, config) event = NodeMovingEvent( id=1, new_node_host="localhost", new_node_port=6379, ttl=10 @@ -587,7 +587,7 @@ class TestMaintenanceEventConnectionHandler: def setup_method(self): """Set up test fixtures.""" self.mock_connection = Mock() - self.config = MaintenanceEventsConfig(enabled=True, relax_timeout=20) + self.config = MaintenanceEventsConfig(enabled=True, relaxed_timeout=20) self.handler = MaintenanceEventConnectionHandler( self.mock_connection, self.config ) @@ -647,8 +647,8 @@ def test_handle_event_unknown_type(self): assert result is None def test_handle_maintenance_start_event_disabled(self): - """Test maintenance start event handling when relax timeouts are disabled.""" - config = MaintenanceEventsConfig(relax_timeout=-1) + """Test maintenance start event handling when relaxed timeouts are disabled.""" + config = MaintenanceEventsConfig(relaxed_timeout=-1) handler = MaintenanceEventConnectionHandler(self.mock_connection, config) result = handler.handle_maintenance_start_event(MaintenanceState.MAINTENANCE) @@ -675,12 +675,12 @@ def test_handle_maintenance_start_event_success(self): assert self.mock_connection.maintenance_state == MaintenanceState.MAINTENANCE self.mock_connection.update_current_socket_timeout.assert_called_once_with(20) self.mock_connection.set_tmp_settings.assert_called_once_with( - tmp_relax_timeout=20 + tmp_relaxed_timeout=20 ) def test_handle_maintenance_completed_event_disabled(self): - """Test maintenance completed event handling when relax timeouts are disabled.""" - config = MaintenanceEventsConfig(relax_timeout=-1) + """Test maintenance completed event handling when relaxed timeouts are disabled.""" + config = MaintenanceEventsConfig(relaxed_timeout=-1) handler = MaintenanceEventConnectionHandler(self.mock_connection, config) result = handler.handle_maintenance_completed_event() @@ -705,7 +705,7 @@ def test_handle_maintenance_completed_event_success(self): self.mock_connection.update_current_socket_timeout.assert_called_once_with(-1) self.mock_connection.reset_tmp_settings.assert_called_once_with( - reset_relax_timeout=True + reset_relaxed_timeout=True ) diff --git a/tests/test_maintenance_events_handling.py b/tests/test_maintenance_events_handling.py index 06c607b552..56c6270de9 100644 --- a/tests/test_maintenance_events_handling.py +++ b/tests/test_maintenance_events_handling.py @@ -372,7 +372,7 @@ def mock_select(rlist, wlist, xlist, timeout=0): # Create maintenance events config self.config = MaintenanceEventsConfig( - enabled=True, proactive_reconnect=True, relax_timeout=30 + enabled=True, proactive_reconnect=True, relaxed_timeout=30 ) def teardown_method(self): @@ -554,7 +554,7 @@ def test_maint_handler_init_for_existing_connections(self): # Create a new enabled configuration and set up pool handler enabled_config = MaintenanceEventsConfig( - enabled=True, proactive_reconnect=True, relax_timeout=30 + enabled=True, proactive_reconnect=True, relaxed_timeout=30 ) pool_handler = MaintenanceEventPoolHandler( test_redis_client.connection_pool, enabled_config @@ -771,24 +771,24 @@ def test_migration_related_events_handling_integration(self, pool_class): test_redis_client.connection_pool.disconnect() @pytest.mark.parametrize("pool_class", [ConnectionPool, BlockingConnectionPool]) - def test_migrating_event_with_disabled_relax_timeout(self, pool_class): + def test_migrating_event_with_disabled_relaxed_timeout(self, pool_class): """ - Test maintenance events handling when relax timeout is disabled. + Test maintenance events handling when relaxed timeout is disabled. - This test validates that when relax_timeout is disabled (-1): + This test validates that when relaxed_timeout is disabled (-1): 1. MIGRATING, MIGRATED, FAILING_OVER, and FAILED_OVER events are received and processed 2. No timeout updates are applied to connections 3. Socket timeouts remain unchanged during all maintenance events 4. Tests both ConnectionPool and BlockingConnectionPool implementations 5. Tests the complete lifecycle: MIGRATING -> MIGRATED -> FAILING_OVER -> FAILED_OVER """ - # Create config with disabled relax timeout + # Create config with disabled relaxed timeout disabled_config = MaintenanceEventsConfig( enabled=True, - relax_timeout=-1, # This means the relax timeout is Disabled + relaxed_timeout=-1, # This means the relaxed timeout is Disabled ) - # Create a pool and Redis client with disabled relax timeout config + # Create a pool and Redis client with disabled relaxed timeout config test_redis_client = self._get_client( pool_class, max_connections=5, maintenance_events_config=disabled_config ) @@ -810,7 +810,7 @@ def test_migrating_event_with_disabled_relax_timeout(self, pool_class): # Validate Command 2 result assert result2 is True, "Command 2 (SET key_receive_migrating) failed" - # Validate timeout was NOT updated (relax is disabled) + # Validate timeout was NOT updated (relaxed is disabled) # Should remain at default timeout (None), not relaxed to 30s self._validate_current_timeout(None) @@ -831,7 +831,7 @@ def test_migrating_event_with_disabled_relax_timeout(self, pool_class): # Validate Command 4 result assert result4 is True, "Command 4 (SET key_receive_migrated) failed" - # Validate timeout is still NOT updated after MIGRATED (relax is disabled) + # Validate timeout is still NOT updated after MIGRATED (relaxed is disabled) self._validate_current_timeout(None) # Command 5: This SET command will receive FAILING_OVER push message before response @@ -842,7 +842,7 @@ def test_migrating_event_with_disabled_relax_timeout(self, pool_class): # Validate Command 5 result assert result5 is True, "Command 5 (SET key_receive_failing_over) failed" - # Validate timeout is still NOT updated after FAILING_OVER (relax is disabled) + # Validate timeout is still NOT updated after FAILING_OVER (relaxed is disabled) self._validate_current_timeout(None) # Command 6: Another command to verify timeout remains unchanged during failover @@ -862,7 +862,7 @@ def test_migrating_event_with_disabled_relax_timeout(self, pool_class): # Validate Command 7 result assert result7 is True, "Command 7 (SET key_receive_failed_over) failed" - # Validate timeout is still NOT updated after FAILED_OVER (relax is disabled) + # Validate timeout is still NOT updated after FAILED_OVER (relaxed is disabled) self._validate_current_timeout(None) # Command 8: Final command to verify timeout remains unchanged after all events @@ -1024,8 +1024,8 @@ def test_moving_related_events_handling_integration(self, pool_class): expected_maintenance_event_hash=expected_event_hash, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], expected_port=int(DEFAULT_ADDRESS.split(":")[1]), - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1036,12 +1036,12 @@ def test_moving_related_events_handling_integration(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[ 0 ], # the in use connections reconnect when they complete their current task @@ -1050,8 +1050,8 @@ def test_moving_related_events_handling_integration(self, pool_class): pool=test_redis_client.connection_pool, expected_state=MaintenanceState.MOVING, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1157,8 +1157,8 @@ def test_moving_none_events_handling_integration(self, pool_class): expected_maintenance_event_hash=hash(MOVING_NONE_EVENT), expected_host_address=DEFAULT_ADDRESS.split(":")[0], expected_port=int(DEFAULT_ADDRESS.split(":")[1]), - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1170,12 +1170,12 @@ def test_moving_none_events_handling_integration(self, pool_class): expected_should_reconnect=False, expected_state=MaintenanceState.MOVING, expected_host_address=DEFAULT_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[ 0 ], # the in use connections reconnect when they complete their current task @@ -1184,8 +1184,8 @@ def test_moving_none_events_handling_integration(self, pool_class): pool=test_redis_client.connection_pool, expected_state=MaintenanceState.MOVING, expected_host_address=DEFAULT_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1199,12 +1199,12 @@ def test_moving_none_events_handling_integration(self, pool_class): expected_should_reconnect=True, expected_state=MaintenanceState.MOVING, expected_host_address=DEFAULT_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[ 0 ], # the in use connections reconnect when they complete their current task @@ -1306,8 +1306,8 @@ def test_create_new_conn_while_moving_not_expired(self, pool_class): expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, ) # Now get several more connections to force creation of new ones @@ -1319,11 +1319,11 @@ def test_create_new_conn_while_moving_not_expired(self, pool_class): new_connection = test_redis_client.connection_pool.get_connection() - # Validate that new connections are created with temporary address and relax timeout + # Validate that new connections are created with temporary address and relaxed timeout # and when connecting those configs are used # get_connection() returns a connection that is already connected assert new_connection.host == AFTER_MOVING_ADDRESS.split(":")[0] - assert new_connection.socket_timeout is self.config.relax_timeout + assert new_connection.socket_timeout is self.config.relaxed_timeout # New connections should be connected to the temporary address assert new_connection._sock is not None assert new_connection._sock.connected is True @@ -1331,7 +1331,7 @@ def test_create_new_conn_while_moving_not_expired(self, pool_class): new_connection._sock.getpeername()[0] == AFTER_MOVING_ADDRESS.split(":")[0] ) - assert new_connection._sock.gettimeout() == self.config.relax_timeout + assert new_connection._sock.gettimeout() == self.config.relaxed_timeout finally: if hasattr(test_redis_client.connection_pool, "disconnect"): @@ -1456,8 +1456,8 @@ def test_receive_migrated_after_moving(self, pool_class): expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, ) # TODO validate current socket timeout @@ -1484,8 +1484,8 @@ def test_receive_migrated_after_moving(self, pool_class): expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, ) # Step 4: Create new connections after MIGRATED to verify they still use MOVING settings @@ -1498,7 +1498,7 @@ def test_receive_migrated_after_moving(self, pool_class): # Validate that new connections are created with MOVING settings (still active) for connection in new_connections: assert connection.host == AFTER_MOVING_ADDRESS.split(":")[0] - # Note: New connections may not inherit the exact relax timeout value + # Note: New connections may not inherit the exact relaxed timeout value # but they should have the temporary host address # New connections should be connected if connection._sock is not None: @@ -1554,8 +1554,8 @@ def test_overlapping_moving_events(self, pool_class): expected_maintenance_event_hash=hash(MOVING_EVENT), expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], expected_port=int(DEFAULT_ADDRESS.split(":")[1]), - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1565,12 +1565,12 @@ def test_overlapping_moving_events(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[0], ) Helpers.validate_free_connections_state( @@ -1579,8 +1579,8 @@ def test_overlapping_moving_events(self, pool_class): connected_to_tmp_address=True, expected_state=MaintenanceState.MOVING, expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1613,8 +1613,8 @@ def test_overlapping_moving_events(self, pool_class): expected_maintenance_event_hash=hash(second_moving_event), expected_host_address=second_moving_address.split(":")[0], expected_port=int(DEFAULT_ADDRESS.split(":")[1]), - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1624,12 +1624,12 @@ def test_overlapping_moving_events(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=second_moving_address.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=orig_after_moving.split(":")[0], ) # print(test_redis_client.connection_pool._available_connections) @@ -1640,8 +1640,8 @@ def test_overlapping_moving_events(self, pool_class): tmp_address=second_moving_address.split(":")[0], expected_state=MaintenanceState.MOVING, expected_host_address=second_moving_address.split(":")[0], - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1704,8 +1704,8 @@ def worker(idx): expected_maintenance_event_hash=hash(MOVING_EVENT), expected_host_address=AFTER_MOVING_ADDRESS.split(":")[0], expected_port=int(DEFAULT_ADDRESS.split(":")[1]), - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1756,12 +1756,12 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=tmp_address, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[0], ) Helpers.validate_free_connections_state( @@ -1770,8 +1770,8 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): connected_to_tmp_address=False, expected_state=MaintenanceState.MOVING, expected_host_address=tmp_address, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, @@ -1786,12 +1786,12 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=tmp_address, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[0], ) @@ -1805,12 +1805,12 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=tmp_address, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[0], ) @@ -1824,12 +1824,12 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=tmp_address, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[0], ) @@ -1843,12 +1843,12 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): in_use_connections, expected_state=MaintenanceState.MOVING, expected_host_address=tmp_address, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=DEFAULT_ADDRESS.split(":")[0], expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=DEFAULT_ADDRESS.split(":")[0], ) @@ -1952,7 +1952,7 @@ def mock_socket_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): # Create maintenance events config self.config = MaintenanceEventsConfig( - enabled=True, proactive_reconnect=True, relax_timeout=30 + enabled=True, proactive_reconnect=True, relaxed_timeout=30 ) def teardown_method(self): @@ -2005,12 +2005,12 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): in_use_connections[key1], expected_state=MaintenanceState.MOVING, expected_host_address=new_ip, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=self.orig_host, expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=key1, ) # validate free connections for ip1 @@ -2024,8 +2024,8 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): changed_free_connections += 1 assert conn.maintenance_state == MaintenanceState.MOVING assert conn.host == new_ip - assert conn.socket_timeout == self.config.relax_timeout - assert conn.socket_connect_timeout == self.config.relax_timeout + assert conn.socket_timeout == self.config.relaxed_timeout + assert conn.socket_connect_timeout == self.config.relaxed_timeout assert conn.orig_host_address == self.orig_host assert conn.orig_socket_timeout is None assert conn.orig_socket_connect_timeout is None @@ -2053,12 +2053,12 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): in_use_connections[key2], expected_state=MaintenanceState.MOVING, expected_host_address=new_ip_2, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=self.orig_host, expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=key2, ) # validate free connections for ip2 @@ -2072,8 +2072,8 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): changed_free_connections += 1 assert conn.maintenance_state == MaintenanceState.MOVING assert conn.host == new_ip_2 - assert conn.socket_timeout == self.config.relax_timeout - assert conn.socket_connect_timeout == self.config.relax_timeout + assert conn.socket_timeout == self.config.relaxed_timeout + assert conn.socket_connect_timeout == self.config.relaxed_timeout assert conn.orig_host_address == self.orig_host assert conn.orig_socket_timeout is None assert conn.orig_socket_connect_timeout is None @@ -2090,9 +2090,9 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): assert conn.maintenance_state == MaintenanceState.MOVING # MIGRATED event conn_event_handler.handle_event(NodeMigratedEvent(id=3)) - # validate connection does not lose its MOVING state and relax timeout + # validate connection does not lose its MOVING state and relaxed timeout assert conn.maintenance_state == MaintenanceState.MOVING - assert conn.socket_timeout == self.config.relax_timeout + assert conn.socket_timeout == self.config.relaxed_timeout # Send Migrating event to con with ip = key3 conn = in_use_connections[key3][0] @@ -2101,7 +2101,7 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): # validate connection is in MIGRATING state assert conn.maintenance_state == MaintenanceState.MAINTENANCE - assert conn.socket_timeout == self.config.relax_timeout + assert conn.socket_timeout == self.config.relaxed_timeout # Send MIGRATED event to con with ip = key3 conn_event_handler.handle_event(NodeMigratedEvent(id=3)) @@ -2129,12 +2129,12 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): in_use_connections[key2], expected_state=MaintenanceState.MOVING, expected_host_address=new_ip_2, - expected_socket_timeout=self.config.relax_timeout, - expected_socket_connect_timeout=self.config.relax_timeout, + expected_socket_timeout=self.config.relaxed_timeout, + expected_socket_connect_timeout=self.config.relaxed_timeout, expected_orig_host_address=self.orig_host, expected_orig_socket_timeout=None, expected_orig_socket_connect_timeout=None, - expected_current_socket_timeout=self.config.relax_timeout, + expected_current_socket_timeout=self.config.relaxed_timeout, expected_current_peername=key2, ) Helpers.validate_in_use_connections_state( diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py index 805941bbaf..eb30c117f4 100644 --- a/tests/test_scenario/conftest.py +++ b/tests/test_scenario/conftest.py @@ -11,7 +11,7 @@ from redis.retry import Retry from tests.test_scenario.fault_injector_client import FaultInjectorClient -RELAX_TIMEOUT = 30 +RELAXED_TIMEOUT = 30 CLIENT_TIMEOUT = 5 DEFAULT_ENDPOINT_NAME = "m-standard" @@ -58,7 +58,7 @@ def _get_client_maint_events( protocol: int = 3, enable_maintenance_events: bool = True, endpoint_type: Optional[EndpointType] = None, - enable_relax_timeout: bool = True, + enable_relaxed_timeout: bool = True, enable_proactive_reconnect: bool = True, disable_retries: bool = False, socket_timeout: Optional[float] = None, @@ -88,7 +88,7 @@ def _get_client_maint_events( maintenance_config = MaintenanceEventsConfig( enabled=enable_maintenance_events, proactive_reconnect=enable_proactive_reconnect, - relax_timeout=RELAX_TIMEOUT if enable_relax_timeout else -1, + relaxed_timeout=RELAXED_TIMEOUT if enable_relaxed_timeout else -1, endpoint_type=endpoint_type, ) diff --git a/tests/test_scenario/test_hitless_upgrade.py b/tests/test_scenario/test_hitless_upgrade.py index f23be4d4b2..a2b2496bda 100644 --- a/tests/test_scenario/test_hitless_upgrade.py +++ b/tests/test_scenario/test_hitless_upgrade.py @@ -19,7 +19,7 @@ ) from tests.test_scenario.conftest import ( CLIENT_TIMEOUT, - RELAX_TIMEOUT, + RELAXED_TIMEOUT, _get_client_maint_events, ) from tests.test_scenario.fault_injector_client import ( @@ -227,7 +227,7 @@ def _validate_maintenance_state( for conn in connections: if ( conn._sock is not None - and conn._sock.gettimeout() == RELAX_TIMEOUT + and conn._sock.gettimeout() == RELAXED_TIMEOUT and conn.maintenance_state == MaintenanceState.MAINTENANCE ): matching_conns_count += 1 @@ -261,7 +261,7 @@ def _validate_moving_state( ) if ( conn._sock is not None - and conn._sock.gettimeout() == RELAX_TIMEOUT + and conn._sock.gettimeout() == RELAXED_TIMEOUT and conn.maintenance_state == MaintenanceState.MOVING and endpoint_configured_correctly ): @@ -269,7 +269,7 @@ def _validate_moving_state( elif ( conn._sock is None and conn.maintenance_state == MaintenanceState.MOVING - and conn.socket_timeout == RELAX_TIMEOUT + and conn.socket_timeout == RELAXED_TIMEOUT and endpoint_configured_correctly ): matching_disconnected_conns_count += 1 @@ -405,7 +405,7 @@ def test_receive_migrating_and_moving_push_notification( logging.info("Validating connection migrating state...") conn = client_maint_events.connection_pool.get_connection() assert conn.maintenance_state == MaintenanceState.MAINTENANCE - assert conn._sock.gettimeout() == RELAX_TIMEOUT + assert conn._sock.gettimeout() == RELAXED_TIMEOUT client_maint_events.connection_pool.release(conn) logging.info("Waiting for MIGRATED push notifications...") @@ -438,7 +438,7 @@ def test_receive_migrating_and_moving_push_notification( logging.info("Validating connection states...") conn = client_maint_events.connection_pool.get_connection() assert conn.maintenance_state == MaintenanceState.MOVING - assert conn._sock.gettimeout() == RELAX_TIMEOUT + assert conn._sock.gettimeout() == RELAXED_TIMEOUT logging.info("Waiting for moving ttl to expire") time.sleep(BIND_TIMEOUT) From 3982529bc72618a911990df1abdd2a37a9bd6e82 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Tue, 23 Sep 2025 14:45:56 +0300 Subject: [PATCH 2/5] Rename to MaintNotificationsConnectionHandler, MaintNotificationsPoolHandler and MaintNotificationsConfig --- redis/client.py | 28 +-- redis/connection.py | 146 ++++++------ redis/maintenance_events.py | 95 ++++---- tests/test_maintenance_events.py | 258 +++++++++++----------- tests/test_maintenance_events_handling.py | 118 +++++----- tests/test_scenario/conftest.py | 12 +- 6 files changed, 344 insertions(+), 313 deletions(-) diff --git a/redis/client.py b/redis/client.py index 26837b673b..9c1407afc0 100755 --- a/redis/client.py +++ b/redis/client.py @@ -57,8 +57,8 @@ ) from redis.lock import Lock from redis.maintenance_events import ( - MaintenanceEventPoolHandler, - MaintenanceEventsConfig, + MaintNotificationsPoolHandler, + MaintNotificationsConfig, ) from redis.retry import Retry from redis.utils import ( @@ -248,7 +248,7 @@ def __init__( cache: Optional[CacheInterface] = None, cache_config: Optional[CacheConfig] = None, event_dispatcher: Optional[EventDispatcher] = None, - maintenance_events_config: Optional[MaintenanceEventsConfig] = None, + maint_notifications_config: Optional[MaintNotificationsConfig] = None, ) -> None: """ Initialize a new Redis client. @@ -373,22 +373,22 @@ def __init__( ]: raise RedisError("Client caching is only supported with RESP version 3") - if maintenance_events_config and self.connection_pool.get_protocol() not in [ + if maint_notifications_config and self.connection_pool.get_protocol() not in [ 3, "3", ]: raise RedisError( "Push handlers on connection are only supported with RESP version 3" ) - if maintenance_events_config and maintenance_events_config.enabled: - self.maintenance_events_pool_handler = MaintenanceEventPoolHandler( - self.connection_pool, maintenance_events_config + if maint_notifications_config and maint_notifications_config.enabled: + self.maint_notifications_pool_handler = MaintNotificationsPoolHandler( + self.connection_pool, maint_notifications_config ) - self.connection_pool.set_maintenance_events_pool_handler( - self.maintenance_events_pool_handler + self.connection_pool.set_maint_notifications_pool_handler( + self.maint_notifications_pool_handler ) else: - self.maintenance_events_pool_handler = None + self.maint_notifications_pool_handler = None self.single_connection_lock = threading.RLock() self.connection = None @@ -587,15 +587,15 @@ def monitor(self): return Monitor(self.connection_pool) def client(self): - maintenance_events_config = ( + maint_notifications_config = ( None - if self.maintenance_events_pool_handler is None - else self.maintenance_events_pool_handler.config + if self.maint_notifications_pool_handler is None + else self.maint_notifications_pool_handler.config ) return self.__class__( connection_pool=self.connection_pool, single_connection_client=True, - maintenance_events_config=maintenance_events_config, + maint_notifications_config=maint_notifications_config, ) def __enter__(self): diff --git a/redis/connection.py b/redis/connection.py index 9e6d0fe431..b95bbab07c 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -48,9 +48,9 @@ TimeoutError, ) from .maintenance_events import ( - MaintenanceEventConnectionHandler, - MaintenanceEventPoolHandler, - MaintenanceEventsConfig, + MaintNotificationsConnectionHandler, + MaintNotificationsPoolHandler, + MaintNotificationsConfig, MaintenanceState, ) from .retry import Retry @@ -345,8 +345,10 @@ def __init__( protocol: Optional[int] = 2, command_packer: Optional[Callable[[], None]] = None, event_dispatcher: Optional[EventDispatcher] = None, - maintenance_events_pool_handler: Optional[MaintenanceEventPoolHandler] = None, - maintenance_events_config: Optional[MaintenanceEventsConfig] = None, + maint_notifications_pool_handler: Optional[ + MaintNotificationsPoolHandler + ] = None, + maint_notifications_config: Optional[MaintNotificationsConfig] = None, maintenance_state: "MaintenanceState" = MaintenanceState.NONE, maintenance_event_hash: Optional[int] = None, orig_host_address: Optional[str] = None, @@ -428,11 +430,11 @@ def __init__( parser_class = _RESP3Parser self.set_parser(parser_class) - self.maintenance_events_config = maintenance_events_config + self.maint_notifications_config = maint_notifications_config # Set up maintenance events if enabled self._configure_maintenance_events( - maintenance_events_pool_handler, + maint_notifications_pool_handler, orig_host_address, orig_socket_timeout, orig_socket_connect_timeout, @@ -499,31 +501,31 @@ def set_parser(self, parser_class): def _configure_maintenance_events( self, - maintenance_events_pool_handler=None, + maint_notifications_pool_handler=None, orig_host_address=None, orig_socket_timeout=None, orig_socket_connect_timeout=None, ): """Enable maintenance events by setting up handlers and storing original connection parameters.""" if ( - not self.maintenance_events_config - or not self.maintenance_events_config.enabled + not self.maint_notifications_config + or not self.maint_notifications_config.enabled ): - self._maintenance_event_connection_handler = None + self._maint_notifications_connection_handler = None return # Set up pool handler if available - if maintenance_events_pool_handler: + if maint_notifications_pool_handler: self._parser.set_node_moving_push_handler( - maintenance_events_pool_handler.handle_event + maint_notifications_pool_handler.handle_notification ) # Set up connection handler - self._maintenance_event_connection_handler = MaintenanceEventConnectionHandler( - self, self.maintenance_events_config + self._maint_notifications_connection_handler = ( + MaintNotificationsConnectionHandler(self, self.maint_notifications_config) ) self._parser.set_maintenance_push_handler( - self._maintenance_event_connection_handler.handle_event + self._maint_notifications_connection_handler.handle_notification ) # Store original connection parameters @@ -538,26 +540,26 @@ def _configure_maintenance_events( ) def set_maintenance_event_pool_handler( - self, maintenance_event_pool_handler: MaintenanceEventPoolHandler + self, maint_notifications_pool_handler: MaintNotificationsPoolHandler ): - maintenance_event_pool_handler.set_connection(self) + maint_notifications_pool_handler.set_connection(self) self._parser.set_node_moving_push_handler( - maintenance_event_pool_handler.handle_event + maint_notifications_pool_handler.handle_notification ) # Update maintenance event connection handler if it doesn't exist - if not self._maintenance_event_connection_handler: - self._maintenance_event_connection_handler = ( - MaintenanceEventConnectionHandler( - self, maintenance_event_pool_handler.config + if not self._maint_notifications_connection_handler: + self._maint_notifications_connection_handler = ( + MaintNotificationsConnectionHandler( + self, maint_notifications_pool_handler.config ) ) self._parser.set_maintenance_push_handler( - self._maintenance_event_connection_handler.handle_event + self._maint_notifications_connection_handler.handle_notification ) else: - self._maintenance_event_connection_handler.config = ( - maintenance_event_pool_handler.config + self._maint_notifications_connection_handler.config = ( + maint_notifications_pool_handler.config ) def connect(self): @@ -688,13 +690,13 @@ def on_connect_check_health(self, check_health: bool = True): # and we have a host to determine the endpoint type from if ( self.protocol not in [2, "2"] - and self.maintenance_events_config - and self.maintenance_events_config.enabled - and self._maintenance_event_connection_handler + and self.maint_notifications_config + and self.maint_notifications_config.enabled + and self._maint_notifications_connection_handler and hasattr(self, "host") ): try: - endpoint_type = self.maintenance_events_config.get_endpoint_type( + endpoint_type = self.maint_notifications_config.get_endpoint_type( self.host, self ) self.send_command( @@ -1742,20 +1744,20 @@ def __init__( connection_kwargs.pop("cache_config", None) if self.connection_kwargs.get( - "maintenance_events_pool_handler" - ) or self.connection_kwargs.get("maintenance_events_config"): + "maint_notifications_pool_handler" + ) or self.connection_kwargs.get("maint_notifications_config"): if self.connection_kwargs.get("protocol") not in [3, "3"]: raise RedisError( "Push handlers on connection are only supported with RESP version 3" ) - config = self.connection_kwargs.get("maintenance_events_config", None) or ( - self.connection_kwargs.get("maintenance_events_pool_handler").config - if self.connection_kwargs.get("maintenance_events_pool_handler") + config = self.connection_kwargs.get("maint_notifications_config", None) or ( + self.connection_kwargs.get("maint_notifications_pool_handler").config + if self.connection_kwargs.get("maint_notifications_pool_handler") else None ) if config and config.enabled: - self._update_connection_kwargs_for_maintenance_events() + self._update_connection_kwargs_for_maint_notifications() self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None) if self._event_dispatcher is None: @@ -1791,46 +1793,54 @@ def get_protocol(self): """ return self.connection_kwargs.get("protocol", None) - def maintenance_events_pool_handler_enabled(self): + def maint_notifications_pool_handler_enabled(self): """ Returns: - True if the maintenance events pool handler is enabled, False otherwise. + True if the maintenance notifications pool handler is enabled, False otherwise. """ - maintenance_events_config = self.connection_kwargs.get( - "maintenance_events_config", None + maint_notifications_config = self.connection_kwargs.get( + "maint_notifications_config", None ) - return maintenance_events_config and maintenance_events_config.enabled + return maint_notifications_config and maint_notifications_config.enabled - def set_maintenance_events_pool_handler( - self, maintenance_events_pool_handler: MaintenanceEventPoolHandler + def set_maint_notifications_pool_handler( + self, maint_notifications_pool_handler: MaintNotificationsPoolHandler ): self.connection_kwargs.update( { - "maintenance_events_pool_handler": maintenance_events_pool_handler, - "maintenance_events_config": maintenance_events_pool_handler.config, + "maint_notifications_pool_handler": maint_notifications_pool_handler, + "maint_notifications_config": maint_notifications_pool_handler.config, } ) - self._update_connection_kwargs_for_maintenance_events() + self._update_connection_kwargs_for_maint_notifications() - self._update_maintenance_events_configs_for_connections( - maintenance_events_pool_handler + self._update_maint_notifications_configs_for_connections( + maint_notifications_pool_handler ) - def _update_maintenance_events_configs_for_connections( - self, maintenance_events_pool_handler + def _update_maint_notifications_configs_for_connections( + self, maint_notifications_pool_handler ): - """Update the maintenance events config for all connections in the pool.""" + """Update the maintenance notifications config for all connections in the pool.""" with self._lock: for conn in self._available_connections: - conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler) - conn.maintenance_events_config = maintenance_events_pool_handler.config + conn.set_maintenance_event_pool_handler( + maint_notifications_pool_handler + ) + conn.maint_notifications_config = ( + maint_notifications_pool_handler.config + ) for conn in self._in_use_connections: - conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler) - conn.maintenance_events_config = maintenance_events_pool_handler.config + conn.set_maintenance_event_pool_handler( + maint_notifications_pool_handler + ) + conn.maint_notifications_config = ( + maint_notifications_pool_handler.config + ) - def _update_connection_kwargs_for_maintenance_events(self): - """Store original connection parameters for maintenance events.""" + def _update_connection_kwargs_for_maint_notifications(self): + """Store original connection parameters for maintenance notifications.""" if self.connection_kwargs.get("orig_host_address", None) is None: # If orig_host_address is None it means we haven't # configured the original values yet @@ -1936,7 +1946,7 @@ def get_connection(self, command_name=None, *keys, **options) -> "Connection": if ( connection.can_read() and self.cache is None - and not self.maintenance_events_pool_handler_enabled() + and not self.maint_notifications_pool_handler_enabled() ): raise ConnectionError("Connection has data") except (ConnectionError, TimeoutError, OSError): @@ -2569,20 +2579,24 @@ def disconnect_free_connections( ): conn.disconnect() - def _update_maintenance_events_config_for_connections( - self, maintenance_events_config + def _update_maint_notifications_config_for_connections( + self, maint_notifications_config ): for conn in tuple(self._connections): - conn.maintenance_events_config = maintenance_events_config + conn.maint_notifications_config = maint_notifications_config - def _update_maintenance_events_configs_for_connections( - self, maintenance_events_pool_handler + def _update_maint_notifications_configs_for_connections( + self, maint_notifications_pool_handler ): - """Update the maintenance events config for all connections in the pool.""" + """Update the maintenance notifications config for all connections in the pool.""" with self._lock: for conn in tuple(self._connections): - conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler) - conn.maintenance_events_config = maintenance_events_pool_handler.config + conn.set_maintenance_event_pool_handler( + maint_notifications_pool_handler + ) + conn.maint_notifications_config = ( + maint_notifications_pool_handler.config + ) def set_in_maintenance(self, in_maintenance: bool): """ diff --git a/redis/maintenance_events.py b/redis/maintenance_events.py index c037c3309e..b64067efec 100644 --- a/redis/maintenance_events.py +++ b/redis/maintenance_events.py @@ -435,9 +435,9 @@ def _is_private_fqdn(host: str) -> bool: return False -class MaintenanceEventsConfig: +class MaintNotificationsConfig: """ - Configuration class for maintenance events handling behaviour. Events are received through + Configuration class for maintenance notifications handling behaviour. Notifications are received through push notifications. This class defines how the Redis client should react to different push notifications @@ -453,10 +453,10 @@ def __init__( endpoint_type: Optional[EndpointType] = None, ): """ - Initialize a new MaintenanceEventsConfig. + Initialize a new MaintNotificationsConfig. Args: - enabled (bool): Whether to enable maintenance events handling. + enabled (bool): Whether to enable maintenance notifications handling. Defaults to False. proactive_reconnect (bool): Whether to proactively reconnect when a node is replaced. Defaults to True. @@ -550,15 +550,15 @@ def get_endpoint_type( return EndpointType.INTERNAL_FQDN if is_private else EndpointType.EXTERNAL_FQDN -class MaintenanceEventPoolHandler: +class MaintNotificationsPoolHandler: def __init__( self, pool: Union["ConnectionPool", "BlockingConnectionPool"], - config: MaintenanceEventsConfig, + config: MaintNotificationsConfig, ) -> None: self.pool = pool self.config = config - self._processed_events = set() + self._processed_notifications = set() self._lock = threading.RLock() self.connection = None @@ -567,28 +567,28 @@ def set_connection(self, connection: "ConnectionInterface"): def remove_expired_notifications(self): with self._lock: - for notification in tuple(self._processed_events): + for notification in tuple(self._processed_notifications): if notification.is_expired(): - self._processed_events.remove(notification) + self._processed_notifications.remove(notification) - def handle_event(self, notification: MaintenanceEvent): + def handle_notification(self, notification: MaintenanceEvent): self.remove_expired_notifications() if isinstance(notification, NodeMovingEvent): - return self.handle_node_moving_event(notification) + return self.handle_node_moving_notification(notification) else: logging.error(f"Unhandled notification type: {notification}") - def handle_node_moving_event(self, event: NodeMovingEvent): + def handle_node_moving_notification(self, notification: NodeMovingEvent): if ( not self.config.proactive_reconnect and not self.config.is_relaxed_timeouts_enabled() ): return with self._lock: - if event in self._processed_events: + if notification in self._processed_notifications: # nothing to do in the connection pool handling - # the event has already been handled or is expired + # the notification has already been handled or is expired # just return return @@ -614,9 +614,9 @@ def handle_node_moving_event(self, event: NodeMovingEvent): # connection settings for matching connections self.pool.update_connections_settings( state=MaintenanceState.MOVING, - maintenance_event_hash=hash(event), + maintenance_event_hash=hash(notification), relaxed_timeout=self.config.relaxed_timeout, - host_address=event.new_node_host, + host_address=notification.new_node_host, matching_address=moving_address_src, matching_pattern="connected_address", update_event_hash=True, @@ -624,11 +624,11 @@ def handle_node_moving_event(self, event: NodeMovingEvent): ) if self.config.proactive_reconnect: - if event.new_node_host is not None: + if notification.new_node_host is not None: self.run_proactive_reconnect(moving_address_src) else: threading.Timer( - event.ttl / 2, + notification.ttl / 2, self.run_proactive_reconnect, args=(moving_address_src,), ).start() @@ -639,15 +639,15 @@ def handle_node_moving_event(self, event: NodeMovingEvent): # if relax timeouts are enabled - update timeouts kwargs: dict = { "maintenance_state": MaintenanceState.MOVING, - "maintenance_event_hash": hash(event), + "maintenance_event_hash": hash(notification), } - if event.new_node_host is not None: + if notification.new_node_host is not None: # the host is not updated if the new node host is None # this happens when the MOVING push notification does not contain # the new node host - in this case we only update the timeouts kwargs.update( { - "host": event.new_node_host, + "host": notification.new_node_host, } ) if self.config.is_relaxed_timeouts_enabled(): @@ -663,10 +663,12 @@ def handle_node_moving_event(self, event: NodeMovingEvent): self.pool.set_in_maintenance(False) threading.Timer( - event.ttl, self.handle_node_moved_event, args=(event,) + notification.ttl, + self.handle_node_moved_notification, + args=(notification,), ).start() - self._processed_events.add(event) + self._processed_notifications.add(notification) def run_proactive_reconnect(self, moving_address_src: Optional[str] = None): """ @@ -687,17 +689,20 @@ def run_proactive_reconnect(self, moving_address_src: Optional[str] = None): moving_address_src=moving_address_src, ) - def handle_node_moved_event(self, event: NodeMovingEvent): + def handle_node_moved_notification(self, notification: NodeMovingEvent): """ - Handle the cleanup after a node moving event expires. + Handle the cleanup after a node moving notification expires. """ - event_hash = hash(event) + notification_hash = hash(notification) with self._lock: - # if the current maintenance_event_hash in kwargs is not matching the event - # it means there has been a new moving event after this one + # if the current maintenance_event_hash in kwargs is not matching the notification + # it means there has been a new moving notification after this one # and we don't need to revert the kwargs yet - if self.pool.connection_kwargs.get("maintenance_event_hash") == event_hash: + if ( + self.pool.connection_kwargs.get("maintenance_event_hash") + == notification_hash + ): orig_host = self.pool.connection_kwargs.get("orig_host_address") orig_socket_timeout = self.pool.connection_kwargs.get( "orig_socket_timeout" @@ -722,7 +727,7 @@ def handle_node_moved_event(self, event: NodeMovingEvent): relaxed_timeout=-1, state=MaintenanceState.NONE, maintenance_event_hash=None, - matching_event_hash=event_hash, + matching_event_hash=notification_hash, matching_pattern="event_hash", update_event_hash=True, reset_relaxed_timeout=reset_relaxed_timeout, @@ -731,9 +736,9 @@ def handle_node_moved_event(self, event: NodeMovingEvent): ) -class MaintenanceEventConnectionHandler: - # 1 = "starting maintenance" events, 0 = "completed maintenance" events - _EVENT_TYPES: dict[type["MaintenanceEvent"], int] = { +class MaintNotificationsConnectionHandler: + # 1 = "starting maintenance" notifications, 0 = "completed maintenance" notifications + _NOTIFICATION_TYPES: dict[type["MaintenanceEvent"], int] = { NodeMigratingEvent: 1, NodeFailingOverEvent: 1, NodeMigratedEvent: 0, @@ -741,25 +746,27 @@ class MaintenanceEventConnectionHandler: } def __init__( - self, connection: "ConnectionInterface", config: MaintenanceEventsConfig + self, connection: "ConnectionInterface", config: MaintNotificationsConfig ) -> None: self.connection = connection self.config = config - def handle_event(self, event: MaintenanceEvent): - # get the event type by checking its class in the _EVENT_TYPES dict - event_type = self._EVENT_TYPES.get(event.__class__, None) + def handle_notification(self, notification: MaintenanceEvent): + # get the notification type by checking its class in the _NOTIFICATION_TYPES dict + notification_type = self._NOTIFICATION_TYPES.get(notification.__class__, None) - if event_type is None: - logging.error(f"Unhandled event type: {event}") + if notification_type is None: + logging.error(f"Unhandled notification type: {notification}") return - if event_type: - self.handle_maintenance_start_event(MaintenanceState.MAINTENANCE) + if notification_type: + self.handle_maintenance_start_notification(MaintenanceState.MAINTENANCE) else: - self.handle_maintenance_completed_event() + self.handle_maintenance_completed_notification() - def handle_maintenance_start_event(self, maintenance_state: MaintenanceState): + def handle_maintenance_start_notification( + self, maintenance_state: MaintenanceState + ): if ( self.connection.maintenance_state == MaintenanceState.MOVING or not self.config.is_relaxed_timeouts_enabled() @@ -773,7 +780,7 @@ def handle_maintenance_start_event(self, maintenance_state: MaintenanceState): # extend the timeout for all created connections self.connection.update_current_socket_timeout(self.config.relaxed_timeout) - def handle_maintenance_completed_event(self): + def handle_maintenance_completed_notification(self): # Only reset timeouts if state is not MOVING and relaxed timeouts are enabled if ( self.connection.maintenance_state == MaintenanceState.MOVING diff --git a/tests/test_maintenance_events.py b/tests/test_maintenance_events.py index 6e31816bb5..081121092b 100644 --- a/tests/test_maintenance_events.py +++ b/tests/test_maintenance_events.py @@ -11,9 +11,9 @@ NodeMigratedEvent, NodeFailingOverEvent, NodeFailedOverEvent, - MaintenanceEventsConfig, - MaintenanceEventPoolHandler, - MaintenanceEventConnectionHandler, + MaintNotificationsConfig, + MaintNotificationsPoolHandler, + MaintNotificationsConnectionHandler, MaintenanceState, EndpointType, ) @@ -373,19 +373,19 @@ def test_equality_and_hash(self): assert hash(event1) != hash(event3) -class TestMaintenanceEventsConfig: - """Test the MaintenanceEventsConfig class.""" +class TestMaintNotificationsConfig: + """Test the MaintNotificationsConfig class.""" def test_init_defaults(self): - """Test MaintenanceEventsConfig initialization with defaults.""" - config = MaintenanceEventsConfig() + """Test MaintNotificationsConfig initialization with defaults.""" + config = MaintNotificationsConfig() assert config.enabled is True assert config.proactive_reconnect is True assert config.relaxed_timeout == 10 def test_init_custom_values(self): - """Test MaintenanceEventsConfig initialization with custom values.""" - config = MaintenanceEventsConfig( + """Test MaintNotificationsConfig initialization with custom values.""" + config = MaintNotificationsConfig( enabled=True, proactive_reconnect=False, relaxed_timeout=30 ) assert config.enabled is True @@ -393,44 +393,44 @@ def test_init_custom_values(self): assert config.relaxed_timeout == 30 def test_repr(self): - """Test MaintenanceEventsConfig string representation.""" - config = MaintenanceEventsConfig( + """Test MaintNotificationsConfig string representation.""" + config = MaintNotificationsConfig( enabled=True, proactive_reconnect=False, relaxed_timeout=30 ) repr_str = repr(config) - assert "MaintenanceEventsConfig" in repr_str + assert "MaintNotificationsConfig" in repr_str assert "enabled=True" in repr_str assert "proactive_reconnect=False" in repr_str assert "relaxed_timeout=30" in repr_str def test_is_relaxed_timeouts_enabled_true(self): """Test is_relaxed_timeouts_enabled returns True for positive timeout.""" - config = MaintenanceEventsConfig(relaxed_timeout=20) + config = MaintNotificationsConfig(relaxed_timeout=20) assert config.is_relaxed_timeouts_enabled() is True def test_is_relaxed_timeouts_enabled_false(self): """Test is_relaxed_timeouts_enabled returns False for -1 timeout.""" - config = MaintenanceEventsConfig(relaxed_timeout=-1) + config = MaintNotificationsConfig(relaxed_timeout=-1) assert config.is_relaxed_timeouts_enabled() is False def test_is_relaxed_timeouts_enabled_zero(self): """Test is_relaxed_timeouts_enabled returns True for zero timeout.""" - config = MaintenanceEventsConfig(relaxed_timeout=0) + config = MaintNotificationsConfig(relaxed_timeout=0) assert config.is_relaxed_timeouts_enabled() is True def test_is_relaxed_timeouts_enabled_none(self): """Test is_relaxed_timeouts_enabled returns True for None timeout.""" - config = MaintenanceEventsConfig(relaxed_timeout=None) + config = MaintNotificationsConfig(relaxed_timeout=None) assert config.is_relaxed_timeouts_enabled() is True def test_relaxed_timeout_none_is_saved_as_none(self): """Test that None value for relaxed_timeout is saved as None.""" - config = MaintenanceEventsConfig(relaxed_timeout=None) + config = MaintNotificationsConfig(relaxed_timeout=None) assert config.relaxed_timeout is None -class TestMaintenanceEventPoolHandler: - """Test the MaintenanceEventPoolHandler class.""" +class TestMaintNotificationsPoolHandler: + """Test the MaintNotificationsPoolHandler class.""" def setup_method(self): """Set up test fixtures.""" @@ -438,16 +438,16 @@ def setup_method(self): self.mock_pool._lock = MagicMock() self.mock_pool._lock.__enter__.return_value = None self.mock_pool._lock.__exit__.return_value = None - self.config = MaintenanceEventsConfig( + self.config = MaintNotificationsConfig( enabled=True, proactive_reconnect=True, relaxed_timeout=20 ) - self.handler = MaintenanceEventPoolHandler(self.mock_pool, self.config) + self.handler = MaintNotificationsPoolHandler(self.mock_pool, self.config) def test_init(self): - """Test MaintenanceEventPoolHandler initialization.""" + """Test MaintNotificationsPoolHandler initialization.""" assert self.handler.pool == self.mock_pool assert self.handler.config == self.config - assert isinstance(self.handler._processed_events, set) + assert isinstance(self.handler._processed_notifications, set) assert isinstance(self.handler._lock, type(threading.RLock())) def test_remove_expired_notifications(self): @@ -459,63 +459,65 @@ def test_remove_expired_notifications(self): event2 = NodeMovingEvent( id=2, new_node_host="host2", new_node_port=6380, ttl=5 ) - self.handler._processed_events.add(event1) - self.handler._processed_events.add(event2) + self.handler._processed_notifications.add(event1) + self.handler._processed_notifications.add(event2) # Move time forward but not enough to expire event2 (expires at 1005) with patch("time.monotonic", return_value=1003): self.handler.remove_expired_notifications() - assert event1 in self.handler._processed_events - assert event2 in self.handler._processed_events # Not expired yet + assert event1 in self.handler._processed_notifications + assert event2 in self.handler._processed_notifications # Not expired yet # Move time forward to expire event2 but not event1 with patch("time.monotonic", return_value=1006): self.handler.remove_expired_notifications() - assert event1 in self.handler._processed_events - assert event2 not in self.handler._processed_events # Now expired + assert event1 in self.handler._processed_notifications + assert event2 not in self.handler._processed_notifications # Now expired - def test_handle_event_node_moving(self): + def test_handle_notification_node_moving(self): """Test handling of NodeMovingEvent.""" - event = NodeMovingEvent( + notification = NodeMovingEvent( id=1, new_node_host="localhost", new_node_port=6379, ttl=10 ) - with patch.object(self.handler, "handle_node_moving_event") as mock_handle: - self.handler.handle_event(event) - mock_handle.assert_called_once_with(event) + with patch.object( + self.handler, "handle_node_moving_notification" + ) as mock_handle: + self.handler.handle_notification(notification) + mock_handle.assert_called_once_with(notification) - def test_handle_event_unknown_type(self): - """Test handling of unknown event type.""" - event = NodeMigratingEvent(id=1, ttl=5) # Not handled by pool handler + def test_handle_notification_unknown_type(self): + """Test handling of unknown notification type.""" + notification = NodeMigratingEvent(id=1, ttl=5) # Not handled by pool handler - result = self.handler.handle_event(event) + result = self.handler.handle_notification(notification) assert result is None - def test_handle_node_moving_event_disabled_config(self): - """Test node moving event handling when both features are disabled.""" - config = MaintenanceEventsConfig(proactive_reconnect=False, relaxed_timeout=-1) - handler = MaintenanceEventPoolHandler(self.mock_pool, config) - event = NodeMovingEvent( + def test_handle_node_moving_notification_disabled_config(self): + """Test node moving notification handling when both features are disabled.""" + config = MaintNotificationsConfig(proactive_reconnect=False, relaxed_timeout=-1) + handler = MaintNotificationsPoolHandler(self.mock_pool, config) + notification = NodeMovingEvent( id=1, new_node_host="localhost", new_node_port=6379, ttl=10 ) - result = handler.handle_node_moving_event(event) + result = handler.handle_node_moving_notification(notification) assert result is None - assert event not in handler._processed_events + assert notification not in handler._processed_notifications - def test_handle_node_moving_event_already_processed(self): - """Test node moving event handling when event already processed.""" - event = NodeMovingEvent( + def test_handle_node_moving_notification_already_processed(self): + """Test node moving notification handling when notification already processed.""" + notification = NodeMovingEvent( id=1, new_node_host="localhost", new_node_port=6379, ttl=10 ) - self.handler._processed_events.add(event) + self.handler._processed_notifications.add(notification) - result = self.handler.handle_node_moving_event(event) + result = self.handler.handle_node_moving_notification(notification) assert result is None - def test_handle_node_moving_event_success(self): - """Test successful node moving event handling.""" - event = NodeMovingEvent( + def test_handle_node_moving_notification_success(self): + """Test successful node moving notification handling.""" + notification = NodeMovingEvent( id=1, new_node_host="localhost", new_node_port=6379, ttl=10 ) @@ -523,48 +525,54 @@ def test_handle_node_moving_event_success(self): patch("threading.Timer") as mock_timer, patch("time.monotonic", return_value=1000), ): - self.handler.handle_node_moving_event(event) + self.handler.handle_node_moving_notification(notification) # Verify timer was started mock_timer.assert_called_once_with( - event.ttl, self.handler.handle_node_moved_event, args=(event,) + notification.ttl, + self.handler.handle_node_moved_notification, + args=(notification,), ) mock_timer.return_value.start.assert_called_once() - # Verify event was added to processed set - assert event in self.handler._processed_events + # Verify notification was added to processed set + assert notification in self.handler._processed_notifications # Verify pool methods were called self.mock_pool.update_connections_settings.assert_called_once() - def test_handle_node_moving_event_with_no_host_and_port(self): - """Test successful node moving event handling.""" - event = NodeMovingEvent(id=1, new_node_host=None, new_node_port=None, ttl=2) + def test_handle_node_moving_notification_with_no_host_and_port(self): + """Test successful node moving notification handling.""" + notification = NodeMovingEvent( + id=1, new_node_host=None, new_node_port=None, ttl=2 + ) with ( patch("threading.Timer") as mock_timer, patch("time.monotonic", return_value=1000), ): - self.handler.handle_node_moving_event(event) + self.handler.handle_node_moving_notification(notification) # Verify timer was started mock_timer.assert_has_calls( [ call( - event.ttl / 2, + notification.ttl / 2, self.handler.run_proactive_reconnect, args=(None,), ), call().start(), call( - event.ttl, self.handler.handle_node_moved_event, args=(event,) + notification.ttl, + self.handler.handle_node_moved_notification, + args=(notification,), ), call().start(), ] ) - # Verify event was added to processed set - assert event in self.handler._processed_events + # Verify notification was added to processed set + assert notification in self.handler._processed_notifications # Verify pool methods were called self.mock_pool.update_connections_settings.assert_called_once() @@ -575,102 +583,104 @@ def test_handle_node_moved_event(self): id=1, new_node_host="localhost", new_node_port=6379, ttl=10 ) self.mock_pool.connection_kwargs = {"host": "localhost"} - self.handler.handle_node_moved_event(event) + self.handler.handle_node_moved_notification(event) # Verify cleanup methods were called self.mock_pool.update_connections_settings.assert_called_once() -class TestMaintenanceEventConnectionHandler: - """Test the MaintenanceEventConnectionHandler class.""" +class TestMaintNotificationsConnectionHandler: + """Test the MaintNotificationsConnectionHandler class.""" def setup_method(self): """Set up test fixtures.""" self.mock_connection = Mock() - self.config = MaintenanceEventsConfig(enabled=True, relaxed_timeout=20) - self.handler = MaintenanceEventConnectionHandler( + self.config = MaintNotificationsConfig(enabled=True, relaxed_timeout=20) + self.handler = MaintNotificationsConnectionHandler( self.mock_connection, self.config ) def test_init(self): - """Test MaintenanceEventConnectionHandler initialization.""" + """Test MaintNotificationsConnectionHandler initialization.""" assert self.handler.connection == self.mock_connection assert self.handler.config == self.config - def test_handle_event_migrating(self): + def test_handle_notification_migrating(self): """Test handling of NodeMigratingEvent.""" - event = NodeMigratingEvent(id=1, ttl=5) + notification = NodeMigratingEvent(id=1, ttl=5) with patch.object( - self.handler, "handle_maintenance_start_event" + self.handler, "handle_maintenance_start_notification" ) as mock_handle: - self.handler.handle_event(event) + self.handler.handle_notification(notification) mock_handle.assert_called_once_with(MaintenanceState.MAINTENANCE) - def test_handle_event_migrated(self): + def test_handle_notification_migrated(self): """Test handling of NodeMigratedEvent.""" - event = NodeMigratedEvent(id=1) + notification = NodeMigratedEvent(id=1) with patch.object( - self.handler, "handle_maintenance_completed_event" + self.handler, "handle_maintenance_completed_notification" ) as mock_handle: - self.handler.handle_event(event) + self.handler.handle_notification(notification) mock_handle.assert_called_once_with() - def test_handle_event_failing_over(self): + def test_handle_notification_failing_over(self): """Test handling of NodeFailingOverEvent.""" - event = NodeFailingOverEvent(id=1, ttl=5) + notification = NodeFailingOverEvent(id=1, ttl=5) with patch.object( - self.handler, "handle_maintenance_start_event" + self.handler, "handle_maintenance_start_notification" ) as mock_handle: - self.handler.handle_event(event) + self.handler.handle_notification(notification) mock_handle.assert_called_once_with(MaintenanceState.MAINTENANCE) - def test_handle_event_failed_over(self): + def test_handle_notification_failed_over(self): """Test handling of NodeFailedOverEvent.""" - event = NodeFailedOverEvent(id=1) + notification = NodeFailedOverEvent(id=1) with patch.object( - self.handler, "handle_maintenance_completed_event" + self.handler, "handle_maintenance_completed_notification" ) as mock_handle: - self.handler.handle_event(event) + self.handler.handle_notification(notification) mock_handle.assert_called_once_with() - def test_handle_event_unknown_type(self): - """Test handling of unknown event type.""" - event = NodeMovingEvent( + def test_handle_notification_unknown_type(self): + """Test handling of unknown notification type.""" + notification = NodeMovingEvent( id=1, new_node_host="localhost", new_node_port=6379, ttl=10 ) - result = self.handler.handle_event(event) + result = self.handler.handle_notification(notification) assert result is None - def test_handle_maintenance_start_event_disabled(self): - """Test maintenance start event handling when relaxed timeouts are disabled.""" - config = MaintenanceEventsConfig(relaxed_timeout=-1) - handler = MaintenanceEventConnectionHandler(self.mock_connection, config) + def test_handle_maintenance_start_notification_disabled(self): + """Test maintenance start notification handling when relaxed timeouts are disabled.""" + config = MaintNotificationsConfig(relaxed_timeout=-1) + handler = MaintNotificationsConnectionHandler(self.mock_connection, config) - result = handler.handle_maintenance_start_event(MaintenanceState.MAINTENANCE) + result = handler.handle_maintenance_start_notification( + MaintenanceState.MAINTENANCE + ) assert result is None self.mock_connection.update_current_socket_timeout.assert_not_called() - def test_handle_maintenance_start_event_moving_state(self): - """Test maintenance start event handling when connection is in MOVING state.""" + def test_handle_maintenance_start_notification_moving_state(self): + """Test maintenance start notification handling when connection is in MOVING state.""" self.mock_connection.maintenance_state = MaintenanceState.MOVING - result = self.handler.handle_maintenance_start_event( + result = self.handler.handle_maintenance_start_notification( MaintenanceState.MAINTENANCE ) assert result is None self.mock_connection.update_current_socket_timeout.assert_not_called() - def test_handle_maintenance_start_event_success(self): - """Test successful maintenance start event handling for migrating.""" + def test_handle_maintenance_start_notification_success(self): + """Test successful maintenance start notification handling for migrating.""" self.mock_connection.maintenance_state = MaintenanceState.NONE - self.handler.handle_maintenance_start_event(MaintenanceState.MAINTENANCE) + self.handler.handle_maintenance_start_notification(MaintenanceState.MAINTENANCE) assert self.mock_connection.maintenance_state == MaintenanceState.MAINTENANCE self.mock_connection.update_current_socket_timeout.assert_called_once_with(20) @@ -678,28 +688,28 @@ def test_handle_maintenance_start_event_success(self): tmp_relaxed_timeout=20 ) - def test_handle_maintenance_completed_event_disabled(self): - """Test maintenance completed event handling when relaxed timeouts are disabled.""" - config = MaintenanceEventsConfig(relaxed_timeout=-1) - handler = MaintenanceEventConnectionHandler(self.mock_connection, config) + def test_handle_maintenance_completed_notification_disabled(self): + """Test maintenance completed notification handling when relaxed timeouts are disabled.""" + config = MaintNotificationsConfig(relaxed_timeout=-1) + handler = MaintNotificationsConnectionHandler(self.mock_connection, config) - result = handler.handle_maintenance_completed_event() + result = handler.handle_maintenance_completed_notification() assert result is None self.mock_connection.update_current_socket_timeout.assert_not_called() - def test_handle_maintenance_completed_event_moving_state(self): - """Test maintenance completed event handling when connection is in MOVING state.""" + def test_handle_maintenance_completed_notification_moving_state(self): + """Test maintenance completed notification handling when connection is in MOVING state.""" self.mock_connection.maintenance_state = MaintenanceState.MOVING - result = self.handler.handle_maintenance_completed_event() + result = self.handler.handle_maintenance_completed_notification() assert result is None self.mock_connection.update_current_socket_timeout.assert_not_called() - def test_handle_maintenance_completed_event_success(self): - """Test successful maintenance completed event handling.""" + def test_handle_maintenance_completed_notification_success(self): + """Test successful maintenance completed notification handling.""" self.mock_connection.maintenance_state = MaintenanceState.MAINTENANCE - self.handler.handle_maintenance_completed_event() + self.handler.handle_maintenance_completed_notification() assert self.mock_connection.maintenance_state == MaintenanceState.NONE @@ -721,8 +731,8 @@ def test_endpoint_type_constants(self): assert EndpointType.NONE.value == "none" -class TestMaintenanceEventsConfigEndpointType: - """Test MaintenanceEventsConfig endpoint type functionality.""" +class TestMaintNotificationsConfigEndpointType: + """Test MaintNotificationsConfig endpoint type functionality.""" def setup_method(self): """Set up common mock classes for all tests.""" @@ -751,19 +761,19 @@ def get_resolved_ip(self): self.MockConnection = MockConnection def test_config_validation_valid_endpoint_types(self): - """Test that MaintenanceEventsConfig accepts valid endpoint types.""" + """Test that MaintNotificationsConfig accepts valid endpoint types.""" for endpoint_type in EndpointType: - config = MaintenanceEventsConfig(endpoint_type=endpoint_type) + config = MaintNotificationsConfig(endpoint_type=endpoint_type) assert config.endpoint_type == endpoint_type def test_config_validation_none_endpoint_type(self): - """Test that MaintenanceEventsConfig accepts None as endpoint type.""" - config = MaintenanceEventsConfig(endpoint_type=None) + """Test that MaintNotificationsConfig accepts None as endpoint type.""" + config = MaintNotificationsConfig(endpoint_type=None) assert config.endpoint_type is None def test_endpoint_type_detection_ip_addresses(self): """Test endpoint type detection for IP addresses.""" - config = MaintenanceEventsConfig() + config = MaintNotificationsConfig() # Test private IPv4 addresses conn1 = self.MockConnection("192.168.1.1", resolved_ip="192.168.1.1") @@ -788,7 +798,7 @@ def test_endpoint_type_detection_ip_addresses(self): def test_endpoint_type_detection_fqdn_with_resolved_ip(self): """Test endpoint type detection for FQDNs with resolved IP addresses.""" - config = MaintenanceEventsConfig() + config = MaintNotificationsConfig() # Test FQDN resolving to private IP conn1 = self.MockConnection( @@ -834,7 +844,7 @@ def test_endpoint_type_detection_fqdn_with_resolved_ip(self): def test_endpoint_type_detection_fqdn_heuristics(self): """Test endpoint type detection using FQDN heuristics when no resolved IP is available.""" - config = MaintenanceEventsConfig() + config = MaintNotificationsConfig() # Test localhost (should be internal) conn1 = self.MockConnection("localhost") @@ -859,11 +869,11 @@ def test_endpoint_type_override(self): """Test that configured endpoint_type overrides detection.""" # Test with endpoint_type set to NONE - config = MaintenanceEventsConfig(endpoint_type=EndpointType.NONE) + config = MaintNotificationsConfig(endpoint_type=EndpointType.NONE) conn = self.MockConnection("localhost") assert config.get_endpoint_type("localhost", conn) == EndpointType.NONE # Test with endpoint_type set to EXTERNAL_IP - config = MaintenanceEventsConfig(endpoint_type=EndpointType.EXTERNAL_IP) + config = MaintNotificationsConfig(endpoint_type=EndpointType.EXTERNAL_IP) assert config.get_endpoint_type("localhost", conn) == EndpointType.EXTERNAL_IP diff --git a/tests/test_maintenance_events_handling.py b/tests/test_maintenance_events_handling.py index 56c6270de9..821cf20b19 100644 --- a/tests/test_maintenance_events_handling.py +++ b/tests/test_maintenance_events_handling.py @@ -14,12 +14,12 @@ MaintenanceState, ) from redis.maintenance_events import ( - MaintenanceEventsConfig, + MaintNotificationsConfig, NodeMigratingEvent, NodeMigratedEvent, NodeFailingOverEvent, NodeFailedOverEvent, - MaintenanceEventPoolHandler, + MaintNotificationsPoolHandler, NodeMovingEvent, ) @@ -371,7 +371,7 @@ def mock_select(rlist, wlist, xlist, timeout=0): self.select_patcher.start() # Create maintenance events config - self.config = MaintenanceEventsConfig( + self.config = MaintNotificationsConfig( enabled=True, proactive_reconnect=True, relaxed_timeout=30 ) @@ -384,7 +384,7 @@ def _get_client( self, pool_class, max_connections=10, - maintenance_events_config=None, + maint_notifications_config=None, setup_pool_handler=False, ): """Helper method to create a pool and Redis client with maintenance events configuration. @@ -392,7 +392,7 @@ def _get_client( Args: pool_class: The connection pool class (ConnectionPool or BlockingConnectionPool) max_connections: Maximum number of connections in the pool (default: 10) - maintenance_events_config: Optional MaintenanceEventsConfig to use. If not provided, + maint_notifications_config: Optional MaintNotificationsConfig to use. If not provided, uses self.config from setup_method (default: None) setup_pool_handler: Whether to set up pool handler for moving events (default: False) @@ -400,8 +400,8 @@ def _get_client( tuple: (test_pool, test_redis_client) """ config = ( - maintenance_events_config - if maintenance_events_config is not None + maint_notifications_config + if maint_notifications_config is not None else self.config ) @@ -410,16 +410,16 @@ def _get_client( port=int(DEFAULT_ADDRESS.split(":")[1]), max_connections=max_connections, protocol=3, # Required for maintenance events - maintenance_events_config=config, + maint_notifications_config=config, ) test_redis_client = Redis(connection_pool=test_pool) # Set up pool handler for moving events if requested if setup_pool_handler: - pool_handler = MaintenanceEventPoolHandler( + pool_handler = MaintNotificationsPoolHandler( test_redis_client.connection_pool, config ) - test_redis_client.connection_pool.set_maintenance_events_pool_handler( + test_redis_client.connection_pool.set_maint_notifications_pool_handler( pool_handler ) @@ -433,7 +433,7 @@ def _validate_connection_handlers(self, conn, pool_handler, config): assert hasattr(parser_handler, "__self__") assert hasattr(parser_handler, "__func__") assert parser_handler.__self__ is pool_handler - assert parser_handler.__func__ is pool_handler.handle_event.__func__ + assert parser_handler.__func__ is pool_handler.handle_notification.__func__ # Test that the maintenance handler function is correctly set maintenance_handler = conn._parser.maintenance_push_handler_func @@ -443,15 +443,15 @@ def _validate_connection_handlers(self, conn, pool_handler, config): # The maintenance handler should be bound to the connection's # maintenance event connection handler assert ( - maintenance_handler.__self__ is conn._maintenance_event_connection_handler + maintenance_handler.__self__ is conn._maint_notifications_connection_handler ) assert ( maintenance_handler.__func__ - is conn._maintenance_event_connection_handler.handle_event.__func__ + is conn._maint_notifications_connection_handler.handle_notification.__func__ ) # Validate that the connection's maintenance handler has the same config object - assert conn._maintenance_event_connection_handler.config is config + assert conn._maint_notifications_connection_handler.config is config def _validate_current_timeout(self, expected_timeout, error_msg=None): """Helper method to validate the current timeout for the calling thread.""" @@ -492,11 +492,11 @@ def test_client_initialization(self): test_redis_client = Redis( protocol=3, # Required for maintenance events - maintenance_events_config=self.config, + maint_notifications_config=self.config, ) pool_handler = test_redis_client.connection_pool.connection_kwargs.get( - "maintenance_events_pool_handler" + "maint_notifications_pool_handler" ) assert pool_handler is not None assert pool_handler.config == self.config @@ -513,7 +513,7 @@ def test_client_initialization(self): assert hasattr(parser_handler, "__self__") assert hasattr(parser_handler, "__func__") assert parser_handler.__self__ is pool_handler - assert parser_handler.__func__ is pool_handler.handle_event.__func__ + assert parser_handler.__func__ is pool_handler.handle_notification.__func__ # Test that the maintenance handler function is correctly set maintenance_handler = conn._parser.maintenance_push_handler_func @@ -523,25 +523,25 @@ def test_client_initialization(self): # The maintenance handler should be bound to the connection's # maintenance event connection handler assert ( - maintenance_handler.__self__ is conn._maintenance_event_connection_handler + maintenance_handler.__self__ is conn._maint_notifications_connection_handler ) assert ( maintenance_handler.__func__ - is conn._maintenance_event_connection_handler.handle_event.__func__ + is conn._maint_notifications_connection_handler.handle_notification.__func__ ) # Validate that the connection's maintenance handler has the same config object - assert conn._maintenance_event_connection_handler.config is self.config + assert conn._maint_notifications_connection_handler.config is self.config def test_maint_handler_init_for_existing_connections(self): """Test that maintenance event handlers are properly set on existing and new connections when configuration is enabled after client creation.""" # Create a Redis client with disabled maintenance events configuration - disabled_config = MaintenanceEventsConfig(enabled=False) + disabled_config = MaintNotificationsConfig(enabled=False) test_redis_client = Redis( protocol=3, # Required for maintenance events - maintenance_events_config=disabled_config, + maint_notifications_config=disabled_config, ) # Extract an existing connection before enabling maintenance events @@ -549,17 +549,17 @@ def test_maint_handler_init_for_existing_connections(self): # Verify that maintenance events are initially disabled assert existing_conn._parser.node_moving_push_handler_func is None - assert existing_conn._maintenance_event_connection_handler is None + assert existing_conn._maint_notifications_connection_handler is None assert existing_conn._parser.maintenance_push_handler_func is None # Create a new enabled configuration and set up pool handler - enabled_config = MaintenanceEventsConfig( + enabled_config = MaintNotificationsConfig( enabled=True, proactive_reconnect=True, relaxed_timeout=30 ) - pool_handler = MaintenanceEventPoolHandler( + pool_handler = MaintNotificationsPoolHandler( test_redis_client.connection_pool, enabled_config ) - test_redis_client.connection_pool.set_maintenance_events_pool_handler( + test_redis_client.connection_pool.set_maint_notifications_pool_handler( pool_handler ) @@ -587,23 +587,23 @@ def test_connection_pool_creation_with_maintenance_events(self, pool_class): try: assert ( - test_pool.connection_kwargs.get("maintenance_events_config") + test_pool.connection_kwargs.get("maint_notifications_config") == self.config ) # Pool should have maintenance events enabled - assert test_pool.maintenance_events_pool_handler_enabled() is True + assert test_pool.maint_notifications_pool_handler_enabled() is True # Create and set a pool handler - pool_handler = MaintenanceEventPoolHandler(test_pool, self.config) - test_pool.set_maintenance_events_pool_handler(pool_handler) + pool_handler = MaintNotificationsPoolHandler(test_pool, self.config) + test_pool.set_maint_notifications_pool_handler(pool_handler) # Validate that the handler is properly set on the pool assert ( - test_pool.connection_kwargs.get("maintenance_events_pool_handler") + test_pool.connection_kwargs.get("maint_notifications_pool_handler") == pool_handler ) assert ( - test_pool.connection_kwargs.get("maintenance_events_config") + test_pool.connection_kwargs.get("maint_notifications_config") == pool_handler.config ) @@ -640,7 +640,7 @@ def test_redis_operations_with_mock_sockets(self, pool_class): # Verify that the connection has maintenance event handler connection = test_redis_client.connection_pool.get_connection() - assert hasattr(connection, "_maintenance_event_connection_handler") + assert hasattr(connection, "_maint_notifications_connection_handler") test_redis_client.connection_pool.release(connection) finally: @@ -655,7 +655,7 @@ def test_pool_handler_with_migrating_event(self): try: # Create and set a pool handler - pool_handler = MaintenanceEventPoolHandler(test_pool, self.config) + pool_handler = MaintNotificationsPoolHandler(test_pool, self.config) # Create a migrating event (not handled by pool handler) migrating_event = NodeMigratingEvent(id=1, ttl=5) @@ -666,17 +666,17 @@ def test_pool_handler_with_migrating_event(self): pool_handler, "remove_expired_notifications" ) as mock_remove_expired, patch.object( - pool_handler, "handle_node_moving_event" + pool_handler, "handle_node_moving_notification" ) as mock_handle_moving, patch("redis.maintenance_events.logging.error") as mock_logging_error, ): # Pool handler should return None for migrating events (not its responsibility) - pool_handler.handle_event(migrating_event) + pool_handler.handle_notification(migrating_event) # Validate that remove_expired_notifications has been called once mock_remove_expired.assert_called_once() - # Validate that handle_node_moving_event hasn't been called + # Validate that handle_node_moving_notification hasn't been called mock_handle_moving.assert_not_called() # Validate that logging.error has been called once @@ -783,14 +783,14 @@ def test_migrating_event_with_disabled_relaxed_timeout(self, pool_class): 5. Tests the complete lifecycle: MIGRATING -> MIGRATED -> FAILING_OVER -> FAILED_OVER """ # Create config with disabled relaxed timeout - disabled_config = MaintenanceEventsConfig( + disabled_config = MaintNotificationsConfig( enabled=True, relaxed_timeout=-1, # This means the relaxed timeout is Disabled ) # Create a pool and Redis client with disabled relaxed timeout config test_redis_client = self._get_client( - pool_class, max_connections=5, maintenance_events_config=disabled_config + pool_class, max_connections=5, maint_notifications_config=disabled_config ) try: @@ -1727,7 +1727,7 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): pool_class, max_connections=5, setup_pool_handler=True ) pool = test_redis_client.connection_pool - pool_handler = pool.connection_kwargs["maintenance_events_pool_handler"] + pool_handler = pool.connection_kwargs["maint_notifications_pool_handler"] # Create and release some connections in_use_connections = [] @@ -1750,7 +1750,7 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): moving_event = NodeMovingEvent( id=1, new_node_host=tmp_address, new_node_port=6379, ttl=1 ) - pool_handler.handle_event(moving_event) + pool_handler.handle_notification(moving_event) Helpers.validate_in_use_connections_state( in_use_connections, @@ -1779,7 +1779,7 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): # 2. MIGRATING event (simulate direct connection handler call) for conn in in_use_connections: - conn._maintenance_event_connection_handler.handle_event( + conn._maint_notifications_connection_handler.handle_notification( NodeMigratingEvent(id=2, ttl=1) ) Helpers.validate_in_use_connections_state( @@ -1797,7 +1797,7 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): # 3. MIGRATED event (simulate direct connection handler call) for conn in in_use_connections: - conn._maintenance_event_connection_handler.handle_event( + conn._maint_notifications_connection_handler.handle_notification( NodeMigratedEvent(id=2) ) # State should not change for connections that are in MOVING state @@ -1816,7 +1816,7 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): # 4. FAILING_OVER event (simulate direct connection handler call) for conn in in_use_connections: - conn._maintenance_event_connection_handler.handle_event( + conn._maint_notifications_connection_handler.handle_notification( NodeFailingOverEvent(id=3, ttl=1) ) # State should not change for connections that are in MOVING state @@ -1835,7 +1835,7 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): # 5. FAILED_OVER event (simulate direct connection handler call) for conn in in_use_connections: - conn._maintenance_event_connection_handler.handle_event( + conn._maint_notifications_connection_handler.handle_notification( NodeFailedOverEvent(id=3) ) # State should not change for connections that are in MOVING state @@ -1853,7 +1853,7 @@ def test_moving_migrating_migrated_moved_state_transitions(self, pool_class): ) # 6. MOVED event (simulate timer expiry) - pool_handler.handle_node_moved_event(moving_event) + pool_handler.handle_node_moved_notification(moving_event) Helpers.validate_in_use_connections_state( in_use_connections, expected_state=MaintenanceState.NONE, @@ -1951,7 +1951,7 @@ def mock_socket_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): self.getaddrinfo_patcher.start() # Create maintenance events config - self.config = MaintenanceEventsConfig( + self.config = MaintNotificationsConfig( enabled=True, proactive_reconnect=True, relaxed_timeout=30 ) @@ -1971,12 +1971,12 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): port=12345, max_connections=10, protocol=3, # Required for maintenance events - maintenance_events_config=self.config, + maint_notifications_config=self.config, ) - pool.set_maintenance_events_pool_handler( - MaintenanceEventPoolHandler(pool, self.config) + pool.set_maint_notifications_pool_handler( + MaintNotificationsPoolHandler(pool, self.config) ) - pool_handler = pool.connection_kwargs["maintenance_events_pool_handler"] + pool_handler = pool.connection_kwargs["maint_notifications_pool_handler"] # Create and release some connections key1 = "1.2.3.4" @@ -1996,7 +1996,7 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): conn = in_use_connections[key1][0] pool_handler.set_connection(conn) new_ip = "13.14.15.16" - pool_handler.handle_event( + pool_handler.handle_notification( NodeMovingEvent(id=1, new_node_host=new_ip, new_node_port=6379, ttl=1) ) @@ -2044,7 +2044,7 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): conn = in_use_connections[key2][0] pool_handler.set_connection(conn) new_ip_2 = "17.18.19.20" - pool_handler.handle_event( + pool_handler.handle_notification( NodeMovingEvent(id=2, new_node_host=new_ip_2, new_node_port=6379, ttl=2) ) @@ -2084,27 +2084,27 @@ def test_migrating_after_moving_multiple_proxies(self, pool_class): # MIGRATING event on connection that has already been marked as MOVING conn = in_use_connections[key2][0] - conn_event_handler = conn._maintenance_event_connection_handler - conn_event_handler.handle_event(NodeMigratingEvent(id=3, ttl=1)) + conn_event_handler = conn._maint_notifications_connection_handler + conn_event_handler.handle_notification(NodeMigratingEvent(id=3, ttl=1)) # validate connection does not lose its MOVING state assert conn.maintenance_state == MaintenanceState.MOVING # MIGRATED event - conn_event_handler.handle_event(NodeMigratedEvent(id=3)) + conn_event_handler.handle_notification(NodeMigratedEvent(id=3)) # validate connection does not lose its MOVING state and relaxed timeout assert conn.maintenance_state == MaintenanceState.MOVING assert conn.socket_timeout == self.config.relaxed_timeout # Send Migrating event to con with ip = key3 conn = in_use_connections[key3][0] - conn_event_handler = conn._maintenance_event_connection_handler - conn_event_handler.handle_event(NodeMigratingEvent(id=3, ttl=1)) + conn_event_handler = conn._maint_notifications_connection_handler + conn_event_handler.handle_notification(NodeMigratingEvent(id=3, ttl=1)) # validate connection is in MIGRATING state assert conn.maintenance_state == MaintenanceState.MAINTENANCE assert conn.socket_timeout == self.config.relaxed_timeout # Send MIGRATED event to con with ip = key3 - conn_event_handler.handle_event(NodeMigratedEvent(id=3)) + conn_event_handler.handle_notification(NodeMigratedEvent(id=3)) # validate connection is in MOVING state assert conn.maintenance_state == MaintenanceState.NONE assert conn.socket_timeout is None diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py index eb30c117f4..52fd99437c 100644 --- a/tests/test_scenario/conftest.py +++ b/tests/test_scenario/conftest.py @@ -7,7 +7,7 @@ from redis.backoff import ExponentialWithJitterBackoff, NoBackoff from redis.client import Redis -from redis.maintenance_events import EndpointType, MaintenanceEventsConfig +from redis.maintenance_events import EndpointType, MaintNotificationsConfig from redis.retry import Retry from tests.test_scenario.fault_injector_client import FaultInjectorClient @@ -85,15 +85,15 @@ def _get_client_maint_events( logging.info(f"Connecting to Redis Enterprise: {host}:{port} with user: {username}") # Configure maintenance events - maintenance_config = MaintenanceEventsConfig( + maintenance_config = MaintNotificationsConfig( enabled=enable_maintenance_events, proactive_reconnect=enable_proactive_reconnect, relaxed_timeout=RELAXED_TIMEOUT if enable_relaxed_timeout else -1, endpoint_type=endpoint_type, ) - # Create Redis client with maintenance events config - # This will automatically create the MaintenanceEventPoolHandler + # Create Redis client with maintenance notifications config + # This will automatically create the MaintNotificationsPoolHandler if disable_retries: retry = Retry(NoBackoff(), 0) else: @@ -112,12 +112,12 @@ def _get_client_maint_events( ssl_cert_reqs="none", ssl_check_hostname=False, protocol=protocol, # RESP3 required for push notifications - maintenance_events_config=maintenance_config, + maint_notifications_config=maintenance_config, retry=retry, ) logging.info("Redis client created with maintenance events enabled") logging.info(f"Client uses Protocol: {client.connection_pool.get_protocol()}") - maintenance_handler_exists = client.maintenance_events_pool_handler is not None + maintenance_handler_exists = client.maint_notifications_pool_handler is not None logging.info(f"Maintenance events pool handler: {maintenance_handler_exists}") return client From 5d3724810a77dd3c3a22bc597c5cc6f19ebf0912 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Wed, 24 Sep 2025 10:04:27 +0300 Subject: [PATCH 3/5] Completing renames - all tests are included as well as the rest of the class and object/field/args names --- redis/_parsers/base.py | 56 ++- redis/client.py | 2 +- redis/connection.py | 104 ++--- redis/maintenance_events.py | 154 ++++---- tests/test_maintenance_events.py | 402 ++++++++++---------- tests/test_maintenance_events_handling.py | 318 ++++++++-------- tests/test_scenario/conftest.py | 20 +- tests/test_scenario/test_hitless_upgrade.py | 54 +-- 8 files changed, 576 insertions(+), 534 deletions(-) diff --git a/redis/_parsers/base.py b/redis/_parsers/base.py index be7b5cf1d0..928ec0c201 100644 --- a/redis/_parsers/base.py +++ b/redis/_parsers/base.py @@ -5,12 +5,12 @@ from typing import Awaitable, Callable, List, Optional, Protocol, Union from redis.maintenance_events import ( - MaintenanceEvent, - NodeFailedOverEvent, - NodeFailingOverEvent, - NodeMigratedEvent, - NodeMigratingEvent, - NodeMovingEvent, + MaintenanceNotification, + NodeFailedOverNotification, + NodeFailingOverNotification, + NodeMigratedNotification, + NodeMigratingNotification, + NodeMovingNotification, ) if sys.version_info.major >= 3 and sys.version_info.minor >= 11: @@ -175,14 +175,14 @@ class MaintenanceNotificationsParser: @staticmethod def parse_maintenance_start_msg(response, notification_type): - # Expected message format is: