diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index fed3081db795..d9b36f4dab5c 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,12 +3,14 @@ ### 4.14.3 (Unreleased) #### Features Added +* Added support for Per Partition Automatic Failover. To enable this feature, you must follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover). See [PR 41588](https://github.com/Azure/azure-sdk-for-python/pull/41588). #### Breaking Changes #### Bugs Fixed #### Other Changes +* Added cross-regional retries for 503 (Service Unavailable) errors. See [PR 41588](https://github.com/Azure/azure-sdk-for-python/pull/41588). ### 4.14.2 (2025-11-14) diff --git a/sdk/cosmos/azure-cosmos/README.md b/sdk/cosmos/azure-cosmos/README.md index 013a4bd02a63..ca76ad346ce9 100644 --- a/sdk/cosmos/azure-cosmos/README.md +++ b/sdk/cosmos/azure-cosmos/README.md @@ -940,6 +940,11 @@ requests to another region: - `AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED`: Default is a `90` percent failure rate. - After a partition reaches a 90 percent failure rate for all requests, the SDK will send requests routed to that partition to another region. +### Per Partition Automatic Failover (Public Preview) +Per partition automatic failover enables the SDK to automatically redirect write requests at the partition level to another region based on service-side signals. This feature is available +only for single write region accounts that have at least one read-only region. When per partition automatic failover is enabled, per partition circuit breaker and cross-region hedging is enabled by default, meaning +all its configurable options also apply to per partition automatic failover. To enable this feature, follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover). + ## Troubleshooting ### General diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index d066135500d1..4cf500a129bd 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -46,9 +46,13 @@ if TYPE_CHECKING: from ._cosmos_client_connection import CosmosClientConnection from .aio._cosmos_client_connection_async import CosmosClientConnection as AsyncClientConnection + from ._global_partition_endpoint_manager_per_partition_automatic_failover import ( + _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover) from ._request_object import RequestObject + from ._routing.routing_range import PartitionKeyRangeWrapper # pylint: disable=protected-access +#cspell:ignore PPAF, ppaf _COMMON_OPTIONS = { 'initial_headers': 'initialHeaders', diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py index d6a23050c226..0a5e961f7aa9 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py @@ -22,8 +22,9 @@ """Class for defining internal constants in the Azure Cosmos database service. """ - +from enum import IntEnum from typing_extensions import Literal +# cspell:ignore PPAF # cspell:ignore reranker @@ -40,6 +41,7 @@ class _Constants: Name: Literal["name"] = "name" DatabaseAccountEndpoint: Literal["databaseAccountEndpoint"] = "databaseAccountEndpoint" DefaultEndpointsRefreshTime: int = 5 * 60 * 1000 # milliseconds + EnablePerPartitionFailoverBehavior: Literal["enablePerPartitionFailoverBehavior"] = "enablePerPartitionFailoverBehavior" #pylint: disable=line-too-long # ServiceDocument Resource EnableMultipleWritableLocations: Literal["enableMultipleWriteLocations"] = "enableMultipleWriteLocations" @@ -74,6 +76,10 @@ class _Constants: FAILURE_PERCENTAGE_TOLERATED = "AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED" FAILURE_PERCENTAGE_TOLERATED_DEFAULT: int = 90 # ------------------------------------------------------------------------- + # Only applicable when per partition automatic failover is enabled -------- + TIMEOUT_ERROR_THRESHOLD_PPAF = "AZURE_COSMOS_TIMEOUT_ERROR_THRESHOLD_FOR_PPAF" + TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT: int = 10 + # ------------------------------------------------------------------------- # Error code translations ERROR_TRANSLATIONS: dict[int, str] = { @@ -99,3 +105,22 @@ class Kwargs: """Whether to retry write operations if they fail. Used either at client level or request level.""" EXCLUDED_LOCATIONS: Literal["excludedLocations"] = "excludedLocations" + + class UserAgentFeatureFlags(IntEnum): + """ + User agent feature flags. + Each flag represents a bit in a number to encode what features are enabled. Therefore, the first feature flag + will be 1, the second 2, the third 4, etc. When constructing the user agent suffix, the feature flags will be + used to encode a unique number representing the features enabled. This number will be converted into a hex + string following the prefix "F" to save space in the user agent as it is limited and appended to the user agent + suffix. This number will then be used to determine what features are enabled by decoding the hex string back + to a number and checking what bits are set. + + Features being developed should align with the .NET SDK as a source of truth for feature flag assignments: + https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos/src/Diagnostics/UserAgentFeatureFlags.cs + + Example: + If the user agent suffix has "F3", this means that flags 1 and 2. + """ + PER_PARTITION_AUTOMATIC_FAILOVER = 1 + PER_PARTITION_CIRCUIT_BREAKER = 2 diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index 4e08365e2c47..c80c76864e98 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -49,7 +49,7 @@ HttpResponse # pylint: disable=no-legacy-azure-core-http-response-import from . import _base as base -from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker +from ._global_partition_endpoint_manager_per_partition_automatic_failover import _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover # pylint: disable=line-too-long from . import _query_iterable as query_iterable from . import _runtime_constants as runtime_constants from . import _session @@ -176,7 +176,7 @@ def __init__( # pylint: disable=too-many-statements self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict() self.UseMultipleWriteLocations = False - self._global_endpoint_manager = _GlobalPartitionEndpointManagerForCircuitBreaker(self) + self._global_endpoint_manager = _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover(self) retry_policy = None if isinstance(self.connection_policy.ConnectionRetryConfiguration, HTTPPolicy): @@ -2688,12 +2688,15 @@ def GetDatabaseAccount( database_account._ReadableLocations = result[Constants.ReadableLocations] if Constants.EnableMultipleWritableLocations in result: database_account._EnableMultipleWritableLocations = result[ - Constants.EnableMultipleWritableLocations - ] + Constants.EnableMultipleWritableLocations] self.UseMultipleWriteLocations = ( self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations ) + + if Constants.EnablePerPartitionFailoverBehavior in result: + database_account._EnablePerPartitionFailoverBehavior = result[Constants.EnablePerPartitionFailoverBehavior] + if response_hook: response_hook(last_response_headers, result) return database_account diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py index 0741ab6c97be..aa9707a9a441 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_http_logging_policy.py @@ -180,7 +180,7 @@ def _get_client_settings(global_endpoint_manager: Optional[_GlobalEndpointManage gem_client = global_endpoint_manager.client if gem_client and gem_client.connection_policy: connection_policy: ConnectionPolicy = gem_client.connection_policy - client_preferred_regions = connection_policy.PreferredLocations + client_preferred_regions = global_endpoint_manager.location_cache.effective_preferred_locations client_excluded_regions = connection_policy.ExcludedLocations if global_endpoint_manager.location_cache: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py index f113efaafc42..3357c097c63a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py @@ -23,16 +23,9 @@ Azure Cosmos database service. """ -import logging -from azure.cosmos.documents import _OperationType - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -log_formatter = logging.Formatter("%(levelname)s:%(message)s") -log_handler = logging.StreamHandler() -log_handler.setFormatter(log_formatter) -logger.addHandler(log_handler) +# cspell:ignore PPAF +from azure.cosmos.documents import _OperationType class EndpointDiscoveryRetryPolicy(object): """The endpoint discovery retry policy class used for geo-replicated database accounts @@ -43,8 +36,9 @@ class EndpointDiscoveryRetryPolicy(object): Max_retry_attempt_count = 120 Retry_after_in_milliseconds = 1000 - def __init__(self, connection_policy, global_endpoint_manager, *args): + def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper, *args): self.global_endpoint_manager = global_endpoint_manager + self.pk_range_wrapper = pk_range_wrapper self._max_retry_attempt_count = EndpointDiscoveryRetryPolicy.Max_retry_attempt_count self.failover_retry_count = 0 self.retry_after_in_milliseconds = EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds @@ -70,6 +64,22 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument self.failover_retry_count += 1 + # set the refresh_needed flag to ensure that endpoint list is + # refreshed with new writable and readable locations + self.global_endpoint_manager.refresh_needed = True + + # If per partition automatic failover is applicable, we mark the current endpoint as unavailable + # and resolve the service endpoint for the partition range - otherwise, continue the default retry logic + if self.global_endpoint_manager.is_per_partition_automatic_failover_applicable(self.request): + partition_level_info = self.global_endpoint_manager.partition_range_to_failover_info[self.pk_range_wrapper] + location = self.global_endpoint_manager.location_cache.get_location_from_endpoint( + str(self.request.location_endpoint_to_route)) + regional_endpoint = (self.global_endpoint_manager.location_cache. + account_read_regional_routing_contexts_by_location.get(location)) + partition_level_info.unavailable_regional_endpoints[location] = regional_endpoint + self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper) + return True + if self.request.location_endpoint_to_route: context = self.__class__.__name__ if _OperationType.IsReadOnlyOperation(self.request.operation_type): @@ -82,16 +92,11 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument self.request.location_endpoint_to_route, True, context) - # set the refresh_needed flag to ensure that endpoint list is - # refreshed with new writable and readable locations - self.global_endpoint_manager.refresh_needed = True - # clear previous location-based routing directive self.request.clear_route_to_location() # set location-based routing directive based on retry count - # simulating single master writes by ensuring usePreferredLocations - # is set to false + # simulating single master writes by ensuring usePreferredLocations is set to false # reasoning being that 403.3 is only expected for write region failover in single writer account # and we must rely on account locations as they are the source of truth self.request.route_to_location_with_preferred_location_flag(self.failover_retry_count, False) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py index 34d27d4b7907..e301e4c4d49f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker.py @@ -36,6 +36,8 @@ if TYPE_CHECKING: from azure.cosmos._cosmos_client_connection import CosmosClientConnection +#cspell:ignore ppcb + class _GlobalPartitionEndpointManagerForCircuitBreaker(_GlobalEndpointManager): """ This internal class implements the logic for partition endpoint management for @@ -93,16 +95,17 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK return PartitionKeyRangeWrapper(partition_range, container_rid) - def record_failure( + def record_ppcb_failure( self, - request: RequestObject - ) -> None: + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None)-> None: if self.is_circuit_breaker_applicable(request): - pk_range_wrapper = self.create_pk_range_wrapper(request) + if pk_range_wrapper is None: + pk_range_wrapper = self.create_pk_range_wrapper(request) if pk_range_wrapper: self.global_partition_endpoint_manager_core.record_failure(request, pk_range_wrapper) - def resolve_service_endpoint_for_partition( + def _resolve_service_endpoint_for_partition_circuit_breaker( self, request: RequestObject, pk_range_wrapper: Optional[PartitionKeyRangeWrapper] @@ -113,11 +116,12 @@ def resolve_service_endpoint_for_partition( pk_range_wrapper) return self._resolve_service_endpoint(request) - def record_success( + def record_ppcb_success( self, - request: RequestObject - ) -> None: - if self.global_partition_endpoint_manager_core.is_circuit_breaker_applicable(request): - pk_range_wrapper = self.create_pk_range_wrapper(request) + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None: + if self.is_circuit_breaker_applicable(request): + if pk_range_wrapper is None: + pk_range_wrapper = self.create_pk_range_wrapper(request) if pk_range_wrapper: self.global_partition_endpoint_manager_core.record_success(request, pk_range_wrapper) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker_core.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker_core.py index 93faf9b7a8c5..91fd67805a18 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker_core.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_circuit_breaker_core.py @@ -19,6 +19,8 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +# pylint: disable=protected-access + """Internal class for global endpoint manager for circuit breaker. """ import logging @@ -60,7 +62,10 @@ def is_circuit_breaker_applicable(self, request: RequestObject) -> bool: return False circuit_breaker_enabled = os.environ.get(Constants.CIRCUIT_BREAKER_ENABLED_CONFIG, - Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT) == "True" + Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT).lower() == "true" + if not circuit_breaker_enabled and self.client._global_endpoint_manager is not None: + if self.client._global_endpoint_manager._database_account_cache is not None: + circuit_breaker_enabled = self.client._global_endpoint_manager._database_account_cache._EnablePerPartitionFailoverBehavior is True # pylint: disable=line-too-long if not circuit_breaker_enabled: return False diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_per_partition_automatic_failover.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_per_partition_automatic_failover.py new file mode 100644 index 000000000000..0547cb41df32 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_partition_endpoint_manager_per_partition_automatic_failover.py @@ -0,0 +1,241 @@ +# The MIT License (MIT) +# Copyright (c) 2025 Microsoft Corporation + +"""Class for global endpoint manager for per partition automatic failover. This class inherits the circuit breaker +endpoint manager, since enabling per partition automatic failover also enables the circuit breaker logic. +""" +import logging +import threading +import os + +from typing import TYPE_CHECKING, Optional + +from azure.cosmos.http_constants import ResourceType +from azure.cosmos._constants import _Constants as Constants +from azure.cosmos._global_partition_endpoint_manager_circuit_breaker import \ + _GlobalPartitionEndpointManagerForCircuitBreaker +from azure.cosmos._partition_health_tracker import _PPAFPartitionThresholdsTracker +from azure.cosmos.documents import _OperationType +from azure.cosmos._request_object import RequestObject +from azure.cosmos._routing.routing_range import PartitionKeyRangeWrapper + +if TYPE_CHECKING: + from azure.cosmos._cosmos_client_connection import CosmosClientConnection + from azure.cosmos._location_cache import RegionalRoutingContext + +logger = logging.getLogger("azure.cosmos._GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover") + +# pylint: disable=name-too-long, protected-access, too-many-nested-blocks +#cspell:ignore PPAF, ppaf, ppcb + +class PartitionLevelFailoverInfo: + """ + Holds information about the partition level regional failover. + Used to track the partition key range and the regions where it is available. + """ + def __init__(self) -> None: + self.unavailable_regional_endpoints: dict[str, "RegionalRoutingContext"] = {} + self._lock = threading.Lock() + self.current_region: Optional[str] = None + + def try_move_to_next_location( + self, + available_account_regional_endpoints: dict[str, "RegionalRoutingContext"], + endpoint_region: str, + request: RequestObject) -> bool: + """ + Tries to move to the next available regional endpoint for the partition key range. + :param dict[str, RegionalRoutingContext] available_account_regional_endpoints: The available regional endpoints + :param str endpoint_region: The current regional endpoint + :param RequestObject request: The request object containing the routing context. + :return: True if the move was successful, False otherwise. + :rtype: bool + """ + with self._lock: + if endpoint_region != self.current_region and self.current_region is not None: + regional_endpoint = available_account_regional_endpoints[self.current_region].primary_endpoint + request.route_to_location(regional_endpoint) + return True + + for regional_endpoint in available_account_regional_endpoints: + if regional_endpoint == self.current_region: + continue + + if regional_endpoint in self.unavailable_regional_endpoints: + continue + + self.current_region = regional_endpoint + logger.warning("PPAF - Moving to next available regional endpoint: %s", self.current_region) + regional_endpoint = available_account_regional_endpoints[self.current_region].primary_endpoint + request.route_to_location(regional_endpoint) + return True + + return False + +class _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover(_GlobalPartitionEndpointManagerForCircuitBreaker): + """ + This internal class implements the logic for partition endpoint management for + geo-replicated database accounts. + """ + def __init__(self, client: "CosmosClientConnection") -> None: + super(_GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover, self).__init__(client) + self.partition_range_to_failover_info: dict[PartitionKeyRangeWrapper, PartitionLevelFailoverInfo] = {} + self.ppaf_thresholds_tracker = _PPAFPartitionThresholdsTracker() + self._threshold_lock = threading.Lock() + + def is_per_partition_automatic_failover_enabled(self) -> bool: + if not self._database_account_cache or not self._database_account_cache._EnablePerPartitionFailoverBehavior: + return False + return True + + def is_per_partition_automatic_failover_applicable(self, request: RequestObject) -> bool: + if not self.is_per_partition_automatic_failover_enabled(): + return False + + if not request: + return False + + if (self.location_cache.can_use_multiple_write_locations_for_request(request) + or _OperationType.IsReadOnlyOperation(request.operation_type)): + return False + + # if we have at most one region available in the account, we cannot do per partition automatic failover + available_regions = self.location_cache.account_read_regional_routing_contexts_by_location + if len(available_regions) <= 1: + return False + + # if the request is not a non-query plan document request + # or if the request is not executing a stored procedure, return False + if (request.resource_type != ResourceType.Document and + request.operation_type != _OperationType.ExecuteJavaScript): + return False + + return True + + def try_ppaf_failover_threshold( + self, + pk_range_wrapper: "PartitionKeyRangeWrapper", + request: "RequestObject"): + """Verifies whether the per-partition failover threshold has been reached for consecutive errors. If so, + it marks the current region as unavailable for the given partition key range, and moves to the next available + region for the request. + + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :param RequestObject request: The request object containing the routing context. + :returns: None + """ + # If PPAF is enabled, we track consecutive failures for certain exceptions, and only fail over at a partition + # level after the threshold is reached + if request and self.is_per_partition_automatic_failover_applicable(request): + if (self.ppaf_thresholds_tracker.get_pk_failures(pk_range_wrapper) + >= int(os.environ.get(Constants.TIMEOUT_ERROR_THRESHOLD_PPAF, + Constants.TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT))): + # If the PPAF threshold is reached, we reset the count and mark the endpoint unavailable + # Once we mark the endpoint unavailable, the PPAF endpoint manager will try to move to the next + # available region for the partition key range + with self._threshold_lock: + # Check for count again, since a previous request may have now reset the count + if (self.ppaf_thresholds_tracker.get_pk_failures(pk_range_wrapper) + >= int(os.environ.get(Constants.TIMEOUT_ERROR_THRESHOLD_PPAF, + Constants.TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT))): + self.ppaf_thresholds_tracker.clear_pk_failures(pk_range_wrapper) + partition_level_info = self.partition_range_to_failover_info[pk_range_wrapper] + location = self.location_cache.get_location_from_endpoint( + str(request.location_endpoint_to_route)) + logger.warning("PPAF - Failover threshold reached for partition key range: %s for region: %s", #pylint: disable=line-too-long + pk_range_wrapper, location) + regional_context = (self.location_cache. + account_read_regional_routing_contexts_by_location. + get(location).primary_endpoint) + partition_level_info.unavailable_regional_endpoints[location] = regional_context + + def resolve_service_endpoint_for_partition( + self, + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] + ) -> str: + """Resolves the endpoint to be used for the request. In a PPAF-enabled account, this method checks whether + the partition key range has any unavailable regions, and if so, it tries to move to the next available region. + If all regions are unavailable, it invalidates the cache and starts once again from the main write region in the + account configurations. + + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :param RequestObject request: The request object containing the routing context. + :returns: The regional endpoint to be used for the request. + :rtype: str + """ + if self.is_per_partition_automatic_failover_applicable(request) and pk_range_wrapper: + # If per partition automatic failover is applicable, we check partition unavailability + if pk_range_wrapper in self.partition_range_to_failover_info: + partition_failover_info = self.partition_range_to_failover_info[pk_range_wrapper] + if request.location_endpoint_to_route is not None: + endpoint_region = self.location_cache.get_location_from_endpoint(request.location_endpoint_to_route) + if endpoint_region in partition_failover_info.unavailable_regional_endpoints: + available_account_regional_endpoints = self.location_cache.account_read_regional_routing_contexts_by_location #pylint: disable=line-too-long + if (partition_failover_info.current_region is not None and + endpoint_region != partition_failover_info.current_region): + # this request has not yet seen there's an available region being used for this partition + regional_endpoint = available_account_regional_endpoints[ + partition_failover_info.current_region].primary_endpoint + request.route_to_location(regional_endpoint) + else: + if (len(self.location_cache.account_read_regional_routing_contexts_by_location) + == len(partition_failover_info.unavailable_regional_endpoints)): + # If no other region is available, we invalidate the cache and start once again + # from our main write region in the account configurations + logger.warning("PPAF - All available regions for partition %s are unavailable." + " Refreshing cache.", pk_range_wrapper) + self.partition_range_to_failover_info[pk_range_wrapper] = PartitionLevelFailoverInfo() + request.clear_route_to_location() + else: + # If the current region is unavailable, we try to move to the next available region + partition_failover_info.try_move_to_next_location( + self.location_cache.account_read_regional_routing_contexts_by_location, + endpoint_region, + request) + else: + # Update the current regional endpoint to whatever the request is routing to + partition_failover_info.current_region = endpoint_region + else: + partition_failover_info = PartitionLevelFailoverInfo() + endpoint_region = self.location_cache.get_location_from_endpoint( + request.location_endpoint_to_route) + partition_failover_info.current_region = endpoint_region + self.partition_range_to_failover_info[pk_range_wrapper] = partition_failover_info + return self._resolve_service_endpoint_for_partition_circuit_breaker(request, pk_range_wrapper) + + def record_failure(self, + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None: + """Records a failure for the given partition key range and request. + :param RequestObject request: The request object containing the routing context. + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :return: None + """ + if self.is_per_partition_automatic_failover_applicable(request): + if pk_range_wrapper is None: + pk_range_wrapper = self.create_pk_range_wrapper(request) + if pk_range_wrapper: + self.ppaf_thresholds_tracker.add_failure(pk_range_wrapper) + else: + self.record_ppcb_failure(request, pk_range_wrapper) + + def record_success(self, + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None: + """Records a success for the given partition key range and request, effectively clearing the failure count. + :param RequestObject request: The request object containing the routing context. + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :return: None + """ + if self.is_per_partition_automatic_failover_applicable(request): + if pk_range_wrapper is None: + pk_range_wrapper = self.create_pk_range_wrapper(request) + if pk_range_wrapper: + self.ppaf_thresholds_tracker.clear_pk_failures(pk_range_wrapper) + else: + self.record_ppcb_success(request, pk_range_wrapper) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index cf8239488712..40b46cf0f42f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -488,7 +488,7 @@ def update_location_cache(self, write_locations=None, read_locations=None, enabl ) def get_preferred_regional_routing_contexts( - self, endpoints_by_location, orderedLocations, expected_available_operation, fallback_endpoint + self, endpoints_by_location, ordered_locations, expected_available_operation, fallback_endpoint ): regional_endpoints = [] # if enableEndpointDiscovery is false, we always use the defaultEndpoint that @@ -522,7 +522,7 @@ def get_preferred_regional_routing_contexts( if not regional_endpoints: regional_endpoints.append(fallback_endpoint) else: - for location in orderedLocations: + for location in ordered_locations: if location and location in endpoints_by_location: # location is empty during manual failover regional_endpoint = endpoints_by_location[location] diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_partition_health_tracker.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_partition_health_tracker.py index 378540b89119..50f4c79bceb4 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_partition_health_tracker.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_partition_health_tracker.py @@ -44,6 +44,8 @@ LAST_UNAVAILABILITY_CHECK_TIME_STAMP = "lastUnavailabilityCheckTimeStamp" HEALTH_STATUS = "healthStatus" +#cspell:ignore PPAF + class _PartitionHealthInfo(object): """ This internal class keeps the health and statistics for a partition. @@ -299,3 +301,28 @@ def _reset_partition_health_tracker_stats(self) -> None: for locations in self.pk_range_wrapper_to_health_info.values(): for health_info in locations.values(): health_info.reset_failure_rate_health_stats() + +class _PPAFPartitionThresholdsTracker(object): + """ + This internal class implements the logic for tracking consecutive failure thresholds for a partition + in the context for per-partition automatic failover. This tracker is only used in the context of 408, 5xx and + ServiceResponseError errors as a defensive measure to avoid failing over too early without confirmation + from the service. + """ + + def __init__(self) -> None: + self.pk_range_wrapper_to_failure_count: dict[PartitionKeyRangeWrapper, int] = {} + self._failure_lock = threading.Lock() + + def add_failure(self, pk_range_wrapper: PartitionKeyRangeWrapper) -> None: + with self._failure_lock: + if pk_range_wrapper not in self.pk_range_wrapper_to_failure_count: + self.pk_range_wrapper_to_failure_count[pk_range_wrapper] = 0 + self.pk_range_wrapper_to_failure_count[pk_range_wrapper] += 1 + + def clear_pk_failures(self, pk_range_wrapper: PartitionKeyRangeWrapper) -> None: + if pk_range_wrapper in self.pk_range_wrapper_to_failure_count: + del self.pk_range_wrapper_to_failure_count[pk_range_wrapper] + + def get_pk_failures(self, pk_range_wrapper: PartitionKeyRangeWrapper) -> int: + return self.pk_range_wrapper_to_failure_count.get(pk_range_wrapper, 0) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py index 9b9153308db2..b52a957c4be0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py @@ -30,7 +30,7 @@ from azure.core.pipeline import PipelineRequest from azure.core.pipeline.policies import RetryPolicy -from . import _container_recreate_retry_policy, _health_check_retry_policy +from . import _container_recreate_retry_policy, _health_check_retry_policy, _service_unavailable_retry_policy from . import _default_retry_policy from . import _endpoint_discovery_retry_policy from . import _gone_retry_policy @@ -46,6 +46,7 @@ # pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches +# cspell:ignore PPAF,ppaf,ppcb # args [0] is the request object # args [1] is the connection policy @@ -65,11 +66,12 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin :rtype: tuple of (dict, dict) """ pk_range_wrapper = None - if args and global_endpoint_manager.is_circuit_breaker_applicable(args[0]): + if args and (global_endpoint_manager.is_per_partition_automatic_failover_applicable(args[0]) or + global_endpoint_manager.is_circuit_breaker_applicable(args[0])): pk_range_wrapper = global_endpoint_manager.create_pk_range_wrapper(args[0]) # instantiate all retry policies here to be applied for each request execution endpointDiscovery_retry_policy = _endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy( - client.connection_policy, global_endpoint_manager, *args + client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args ) health_check_retry_policy = _health_check_retry_policy.HealthCheckRetryPolicy( client.connection_policy, *args @@ -96,8 +98,11 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin service_request_retry_policy = _service_request_retry_policy.ServiceRequestRetryPolicy( client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args, ) + service_unavailable_retry_policy = _service_unavailable_retry_policy._ServiceUnavailableRetryPolicy( + client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args) # Get logger logger = kwargs.get("logger", logging.getLogger("azure.cosmos._retry_utility")) + # HttpRequest we would need to modify for Container Recreate Retry Policy request = None if args and len(args) > 3: @@ -115,7 +120,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin try: if args: result = ExecuteFunction(function, global_endpoint_manager, *args, **kwargs) - global_endpoint_manager.record_success(args[0]) + global_endpoint_manager.record_success(args[0], pk_range_wrapper) else: result = ExecuteFunction(function, *args, **kwargs) if not client.last_response_headers: @@ -202,10 +207,15 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin retry_policy.container_rid = cached_container["_rid"] request.headers[retry_policy._intended_headers] = retry_policy.container_rid - elif e.status_code == StatusCodes.REQUEST_TIMEOUT or e.status_code >= StatusCodes.INTERNAL_SERVER_ERROR: + elif e.status_code == StatusCodes.SERVICE_UNAVAILABLE: if args: # record the failure for circuit breaker tracking - global_endpoint_manager.record_failure(args[0]) + global_endpoint_manager.record_ppcb_failure(args[0], pk_range_wrapper) + retry_policy = service_unavailable_retry_policy + elif e.status_code == StatusCodes.REQUEST_TIMEOUT or e.status_code >= StatusCodes.INTERNAL_SERVER_ERROR: + if args: + # record the failure for ppaf/circuit breaker tracking + global_endpoint_manager.record_failure(args[0], pk_range_wrapper) retry_policy = timeout_failover_retry_policy else: retry_policy = defaultRetry_policy @@ -238,6 +248,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin if not health_check_retry_policy.ShouldRetry(e): raise e else: + if args: + global_endpoint_manager.record_failure(args[0], pk_range_wrapper) _handle_service_request_retries(client, service_request_retry_policy, e, *args) except ServiceResponseError as e: @@ -246,7 +258,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin raise e else: if args: - global_endpoint_manager.record_failure(args[0]) + global_endpoint_manager.record_failure(args[0], pk_range_wrapper) _handle_service_response_retries(request, client, service_response_retry_policy, e, *args) def ExecuteFunction(function, *args, **kwargs): @@ -283,7 +295,8 @@ def _handle_service_request_retries( raise exception def _handle_service_response_retries(request, client, response_retry_policy, exception, *args): - if request and (_has_read_retryable_headers(request.headers) or (args and is_write_retryable(args[0], client))): + if request and (_has_read_retryable_headers(request.headers) or (args and (is_write_retryable(args[0], client) or + client._global_endpoint_manager.is_per_partition_automatic_failover_applicable(args[0])))): # we resolve the request endpoint to the next preferred region # once we are out of preferred regions we stop retrying retry_policy = response_retry_policy diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_response_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_response_retry_policy.py index 1192e14351cf..bafa9f1a2777 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_response_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_response_retry_policy.py @@ -6,6 +6,7 @@ from the service, and as such we do not know what the output of the operation was. As such, we only do cross regional retries for read operations. """ +#cspell:ignore PPAF, ppaf from azure.cosmos.documents import _OperationType @@ -42,7 +43,9 @@ def ShouldRetry(self): return False if self.request: - + # We track consecutive failures for per partition automatic failover, and only fail over at a partition + # level after the threshold is reached + self.global_endpoint_manager.try_ppaf_failover_threshold(self.pk_range_wrapper, self.request) if not _OperationType.IsReadOnlyOperation(self.request.operation_type) and not self.request.retry_write > 0: return False if self.request.retry_write > 0 and self.failover_retry_count + 1 >= self.max_write_retry_count: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_unavailable_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_unavailable_retry_policy.py new file mode 100644 index 000000000000..a210f9348f89 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_unavailable_retry_policy.py @@ -0,0 +1,79 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Internal class for service unavailable errors implementation in the Azure Cosmos database service. + +Service unavailable errors can occur when a request does not make it to the service, or when there is an issue with +the service. In either case, we know the request did not get processed successfully, so service unavailable errors are + retried in the next available preferred region. +""" +from azure.cosmos.documents import _OperationType +from azure.cosmos.exceptions import CosmosHttpResponseError + +#cspell:ignore ppaf + +class _ServiceUnavailableRetryPolicy(object): + def __init__( + self, + connection_policy, + global_endpoint_manager, + pk_range_wrapper, + *args): + self.retry_after_in_milliseconds = 500 + self.global_endpoint_manager = global_endpoint_manager + self.pk_range_wrapper = pk_range_wrapper + self.retry_count = 0 + self.connection_policy = connection_policy + self.request = args[0] if args else None + # If an account only has 1 region, then we still want to retry once on the same region + self._max_retry_attempt_count = len(self.global_endpoint_manager. + location_cache.read_regional_routing_contexts) + 1 + if self.request and _OperationType.IsWriteOperation(self.request.operation_type): + self._max_retry_attempt_count = len(self.global_endpoint_manager.location_cache. + write_regional_routing_contexts) + 1 + + def ShouldRetry(self, _exception: CosmosHttpResponseError): + """Returns true if the request should retry based on the passed-in exception. + + :param exceptions.CosmosHttpResponseError _exception: + :returns: a boolean stating whether the request should be retried + :rtype: bool + """ + # writes are retried for 503s + if not self.connection_policy.EnableEndpointDiscovery: + return False + + self.retry_count += 1 + # Check if the next retry about to be done is safe + if self.retry_count >= self._max_retry_attempt_count: + return False + + if self.request: + # If per partition automatic failover is applicable, we mark the current endpoint as unavailable + # and resolve the service endpoint for the partition range - otherwise, continue the default retry logic + if self.global_endpoint_manager.is_per_partition_automatic_failover_applicable(self.request): + partition_level_info = self.global_endpoint_manager.partition_range_to_failover_info[ + self.pk_range_wrapper] + location = self.global_endpoint_manager.location_cache.get_location_from_endpoint( + str(self.request.location_endpoint_to_route)) + regional_context = (self.global_endpoint_manager.location_cache. + account_read_regional_routing_contexts_by_location.get(location)) + partition_level_info.unavailable_regional_endpoints[location] = regional_context + self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper) + return True + location_endpoint = self.resolve_next_region_service_endpoint() + self.request.route_to_location(location_endpoint) + return True + + # This function prepares the request to go to the next region + def resolve_next_region_service_endpoint(self): + # clear previous location-based routing directive + self.request.clear_route_to_location() + # clear the last routed endpoint within same region since we are going to a new region now + self.request.last_routed_location_endpoint_within_region = None + # set location-based routing directive based on retry count + # ensuring usePreferredLocations is set to True for retry + self.request.route_to_location_with_preferred_location_flag(self.retry_count, True) + # Resolve the endpoint for the request and pin the resolution to the resolved endpoint + # This enables marking the endpoint unavailability on endpoint failover/unreachability + return self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_session_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_session_retry_policy.py index f10366ac4c7f..e11cb4838047 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_session_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_session_retry_policy.py @@ -22,7 +22,7 @@ """Internal class for session read/write unavailable retry policy implementation in the Azure Cosmos database service. """ - +# cspell:disable from azure.cosmos.documents import _OperationType class _SessionRetryPolicy(object): @@ -60,16 +60,12 @@ def ShouldRetry(self, _exception): :returns: a boolean stating whether the request should be retried :rtype: bool """ - if not self.request: + if not self.request or not self.endpoint_discovery_enable: return False self.session_token_retry_count += 1 # clear previous location-based routing directive self.request.clear_route_to_location() - if not self.endpoint_discovery_enable: - # if endpoint discovery is disabled, the request cannot be retried anywhere else - return False - if self.can_use_multiple_write_locations: if _OperationType.IsReadOnlyOperation(self.request.operation_type): locations = self.global_endpoint_manager.get_ordered_read_locations() @@ -105,6 +101,22 @@ def ShouldRetry(self, _exception): self.request.route_to_location_with_preferred_location_flag(self.session_token_retry_count - 1, False) self.request.should_clear_session_token_on_session_read_failure = True + # For PPAF, the retry should happen to whatever the relevant write region is for the affected partition. + if self.global_endpoint_manager.is_per_partition_automatic_failover_enabled(): + pk_failover_info = self.global_endpoint_manager.partition_range_to_failover_info.get(self.pk_range_wrapper) + if pk_failover_info is not None: + location = self.global_endpoint_manager.location_cache.get_location_from_endpoint( + str(self.request.location_endpoint_to_route)) + if location in pk_failover_info.unavailable_regional_endpoints: + # If the request endpoint is unavailable, we need to resolve the endpoint for the request using the + # partition-level failover info + if pk_failover_info.current_region is not None: + location_endpoint = (self.global_endpoint_manager.location_cache. + account_read_regional_routing_contexts_by_location. + get(pk_failover_info.current_region).primary_endpoint) + self.request.route_to_location(location_endpoint) + return True + # Resolve the endpoint for the request and pin the resolution to the resolved endpoint # This enables marking the endpoint unavailability on endpoint failover/unreachability self.location_endpoint = (self.global_endpoint_manager diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py index 1b2290981307..55f9ac40a00c 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py @@ -28,9 +28,8 @@ from urllib.parse import urlparse from azure.core.exceptions import DecodeError # type: ignore -from . import exceptions -from . import http_constants -from . import _retry_utility +from . import exceptions, http_constants, _retry_utility +from ._utils import get_user_agent_features def _is_readable_stream(obj): @@ -80,7 +79,7 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin :rtype: tuple of (dict, dict) """ - # pylint: disable=protected-access + # pylint: disable=protected-access, too-many-branches connection_timeout = connection_policy.RequestTimeout connection_timeout = kwargs.pop("connection_timeout", connection_timeout) @@ -111,8 +110,9 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin base_url = request_params.endpoint_override else: pk_range_wrapper = None - if global_endpoint_manager.is_circuit_breaker_applicable(request_params): - # Circuit breaker is applicable, so we need to use the endpoint from the request + if (global_endpoint_manager.is_circuit_breaker_applicable(request_params) or + global_endpoint_manager.is_per_partition_automatic_failover_applicable(request_params)): + # Circuit breaker or per-partition failover are applicable, so we need to use the endpoint from the request pk_range_wrapper = global_endpoint_manager.create_pk_range_wrapper(request_params) base_url = global_endpoint_manager.resolve_service_endpoint_for_partition(request_params, pk_range_wrapper) if not request.url.startswith(base_url): @@ -120,6 +120,15 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin parse_result = urlparse(request.url) + # Add relevant enabled features to user agent for debugging + if request.headers[http_constants.HttpHeaders.ThinClientProxyResourceType] == http_constants.ResourceType.Document: + user_agent_features = get_user_agent_features(global_endpoint_manager) + if len(user_agent_features) > 0: + user_agent = kwargs.pop("user_agent", global_endpoint_manager.client._user_agent) + user_agent = "{} {}".format(user_agent, user_agent_features) + kwargs.update({"user_agent": user_agent}) + kwargs.update({"user_agent_overwrite": True}) + # The requests library now expects header values to be strings only starting 2.11, # and will raise an error on validation if they are not, so casting all header values to strings. request.headers.update({header: str(value) for header, value in request.headers.items()}) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_timeout_failover_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_timeout_failover_retry_policy.py index 4fa82e83e2b1..801dd350e121 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_timeout_failover_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_timeout_failover_retry_policy.py @@ -6,6 +6,7 @@ """ from azure.cosmos.documents import _OperationType +# cspell:ignore PPAF, ppaf class _TimeoutFailoverRetryPolicy(object): @@ -19,13 +20,14 @@ def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper, # If an account only has 1 region, then we still want to retry once on the same region # We want this to be the default retry attempts as paging through a query means there are requests without # a request object - self._max_retry_attempt_count = len(self.global_endpoint_manager.location_cache - .read_regional_routing_contexts) + 1 + self._max_retry_attempt_count = len(self.global_endpoint_manager. + location_cache.read_regional_routing_contexts) + 1 # If the request is a write operation, we only want to retry as many times as retry_write if self.request and _OperationType.IsWriteOperation(self.request.operation_type): self._max_retry_attempt_count = self.request.retry_write self.retry_count = 0 self.connection_policy = connection_policy + self.request = args[0] if args else None def ShouldRetry(self, _exception): """Returns true if the request should retry based on the passed-in exception. @@ -34,6 +36,8 @@ def ShouldRetry(self, _exception): :returns: a boolean stating whether the request should be retried :rtype: bool """ + self.global_endpoint_manager.try_ppaf_failover_threshold(self.pk_range_wrapper, self.request) + # we retry only if the request is a read operation or if it is a write operation with retry enabled if self.request and not self.is_operation_retryable(): return False @@ -48,7 +52,7 @@ def ShouldRetry(self, _exception): # second check here ensures we only do cross-regional retries for read requests # non-idempotent write retries should only be retried once, using preferred locations if available (MM) - if self.request and (_OperationType.IsReadOnlyOperation(self.request.operation_type) + if self.request and (self.is_operation_retryable() or self.global_endpoint_manager.can_use_multiple_write_locations(self.request)): location_endpoint = self.resolve_next_region_service_endpoint() self.request.route_to_location(location_endpoint) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py index 9144afca613d..0587556f198e 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_utils.py @@ -27,10 +27,13 @@ import base64 import json import time +import os from typing import Any, Optional, Tuple - +from ._constants import _Constants from ._version import VERSION +# cspell:ignore ppcb +# pylint: disable=protected-access def get_user_agent(suffix: Optional[str] = None) -> str: os_name = safe_user_agent_header(platform.platform()) @@ -146,3 +149,26 @@ def valid_key_value_exist( :rtype: bool """ return key in kwargs and kwargs[key] is not invalid_value + + +def get_user_agent_features(global_endpoint_manager: Any) -> str: + """ + Check the account and client configurations in order to add feature flags + to the user agent using bitmask logic and hex encoding (matching .NET/Java). + + :param Any global_endpoint_manager: The GlobalEndpointManager instance. + :return: A string representing the user agent feature flags. + :rtype: str + """ + feature_flag = 0 + # Bitwise OR for feature flags + if global_endpoint_manager._database_account_cache is not None: + if global_endpoint_manager._database_account_cache._EnablePerPartitionFailoverBehavior is True: + feature_flag |= _Constants.UserAgentFeatureFlags.PER_PARTITION_AUTOMATIC_FAILOVER + ppcb_check = os.environ.get( + _Constants.CIRCUIT_BREAKER_ENABLED_CONFIG, + _Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT + ).lower() + if ppcb_check == "true" or feature_flag > 0: + feature_flag |= _Constants.UserAgentFeatureFlags.PER_PARTITION_CIRCUIT_BREAKER + return f"| F{feature_flag:X}" if feature_flag > 0 else "" diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py index c19d9b494abb..2d8c7e313a62 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py @@ -32,6 +32,7 @@ from .. import http_constants from . import _retry_utility_async from .._synchronized_request import _request_body_from_data, _replace_url_prefix +from .._utils import get_user_agent_features async def _Request(global_endpoint_manager, request_params, connection_policy, pipeline_client, request, **kwargs): # pylint: disable=too-many-statements @@ -49,7 +50,7 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p :rtype: tuple of (dict, dict) """ - # pylint: disable=protected-access + # pylint: disable=protected-access, too-many-branches connection_timeout = connection_policy.RequestTimeout read_timeout = connection_policy.ReadTimeout @@ -80,8 +81,9 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p base_url = request_params.endpoint_override else: pk_range_wrapper = None - if global_endpoint_manager.is_circuit_breaker_applicable(request_params): - # Circuit breaker is applicable, so we need to use the endpoint from the request + if (global_endpoint_manager.is_circuit_breaker_applicable(request_params) or + global_endpoint_manager.is_per_partition_automatic_failover_applicable(request_params)): + # Circuit breaker or per-partition failover are applicable, so we need to use the endpoint from the request pk_range_wrapper = await global_endpoint_manager.create_pk_range_wrapper(request_params) base_url = global_endpoint_manager.resolve_service_endpoint_for_partition(request_params, pk_range_wrapper) if not request.url.startswith(base_url): @@ -89,6 +91,15 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p parse_result = urlparse(request.url) + # Add relevant enabled features to user agent for debugging + if request.headers[http_constants.HttpHeaders.ThinClientProxyResourceType] == http_constants.ResourceType.Document: + user_agent_features = get_user_agent_features(global_endpoint_manager) + if len(user_agent_features) > 0: + user_agent = kwargs.pop("user_agent", global_endpoint_manager.client._user_agent) + user_agent = "{} {}".format(user_agent, user_agent_features) + kwargs.update({"user_agent": user_agent}) + kwargs.update({"user_agent_overwrite": True}) + # The requests library now expects header values to be strings only starting 2.11, # and will raise an error on validation if they are not, so casting all header values to strings. request.headers.update({header: str(value) for header, value in request.headers.items()}) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 67d5d4efa3e9..a33f16f0df6d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -44,10 +44,8 @@ DistributedTracingPolicy, ProxyPolicy) from azure.core.utils import CaseInsensitiveDict -from azure.cosmos.aio._global_partition_endpoint_manager_circuit_breaker_async import ( - _GlobalPartitionEndpointManagerForCircuitBreakerAsync) -from ._read_items_helper_async import ReadItemsHelperAsync - +from azure.cosmos.aio._global_partition_endpoint_manager_per_partition_automatic_failover_async import ( + _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailoverAsync) from .. import _base as base from .._base import _build_properties_cache from .. import documents @@ -79,6 +77,7 @@ from ._auth_policy_async import AsyncCosmosBearerTokenCredentialPolicy from .._cosmos_http_logging_policy import CosmosHttpLoggingPolicy from .._range_partition_resolver import RangePartitionResolver +from ._read_items_helper_async import ReadItemsHelperAsync @@ -180,7 +179,7 @@ def __init__( # pylint: disable=too-many-statements # Keeps the latest response headers from the server. self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict() self.UseMultipleWriteLocations = False - self._global_endpoint_manager = _GlobalPartitionEndpointManagerForCircuitBreakerAsync(self) + self._global_endpoint_manager = _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailoverAsync(self) retry_policy = None if isinstance(self.connection_policy.ConnectionRetryConfiguration, AsyncHTTPPolicy): @@ -472,6 +471,10 @@ async def GetDatabaseAccount( self.UseMultipleWriteLocations = ( self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations ) + + if Constants.EnablePerPartitionFailoverBehavior in result: + database_account._EnablePerPartitionFailoverBehavior = result[Constants.EnablePerPartitionFailoverBehavior] + return database_account async def health_check( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py index 02e88fc242ea..954a324cc13f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_circuit_breaker_async.py @@ -36,7 +36,7 @@ if TYPE_CHECKING: from azure.cosmos.aio._cosmos_client_connection_async import CosmosClientConnection - +# cspell:ignore ppcb # pylint: disable=protected-access class _GlobalPartitionEndpointManagerForCircuitBreakerAsync(_GlobalEndpointManager): """ @@ -94,16 +94,17 @@ async def create_pk_range_wrapper(self, request: RequestObject) -> Optional[Part def is_circuit_breaker_applicable(self, request: RequestObject) -> bool: return self.global_partition_endpoint_manager_core.is_circuit_breaker_applicable(request) - async def record_failure( + async def record_ppcb_failure( self, - request: RequestObject - ) -> None: + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None: if self.is_circuit_breaker_applicable(request): - pk_range_wrapper = await self.create_pk_range_wrapper(request) + if pk_range_wrapper is None: + pk_range_wrapper = await self.create_pk_range_wrapper(request) if pk_range_wrapper: self.global_partition_endpoint_manager_core.record_failure(request, pk_range_wrapper) - def resolve_service_endpoint_for_partition( + def _resolve_service_endpoint_for_partition_circuit_breaker( self, request: RequestObject, pk_range_wrapper: Optional[PartitionKeyRangeWrapper] @@ -114,11 +115,12 @@ def resolve_service_endpoint_for_partition( pk_range_wrapper) return self._resolve_service_endpoint(request) - async def record_success( + async def record_ppcb_success( self, - request: RequestObject - ) -> None: + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None: if self.is_circuit_breaker_applicable(request): - pk_range_wrapper = await self.create_pk_range_wrapper(request) + if pk_range_wrapper is None: + pk_range_wrapper = await self.create_pk_range_wrapper(request) if pk_range_wrapper: self.global_partition_endpoint_manager_core.record_success(request, pk_range_wrapper) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_per_partition_automatic_failover_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_per_partition_automatic_failover_async.py new file mode 100644 index 000000000000..c96b46ca46b3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_partition_endpoint_manager_per_partition_automatic_failover_async.py @@ -0,0 +1,242 @@ +# The MIT License (MIT) +# Copyright (c) 2025 Microsoft Corporation + +"""Class for global endpoint manager for per partition automatic failover. This class inherits the circuit breaker +endpoint manager, since enabling per partition automatic failover also enables the circuit breaker logic. +""" +import logging +import threading +import os + +from typing import TYPE_CHECKING, Optional + +from azure.cosmos.http_constants import ResourceType +from azure.cosmos._constants import _Constants as Constants +from azure.cosmos.aio._global_partition_endpoint_manager_circuit_breaker_async import \ + _GlobalPartitionEndpointManagerForCircuitBreakerAsync +from azure.cosmos.documents import _OperationType +from azure.cosmos._partition_health_tracker import _PPAFPartitionThresholdsTracker +from azure.cosmos._request_object import RequestObject +from azure.cosmos._routing.routing_range import PartitionKeyRangeWrapper + +if TYPE_CHECKING: + from azure.cosmos.aio._cosmos_client_connection_async import CosmosClientConnection + from azure.cosmos._location_cache import RegionalRoutingContext + +logger = logging.getLogger("azure.cosmos._GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover") + +# pylint: disable=name-too-long, protected-access, too-many-nested-blocks +#cspell:ignore PPAF, ppaf, ppcb + +class PartitionLevelFailoverInfo: + """ + Holds information about the partition level regional failover. + Used to track the partition key range and the regions where it is available. + """ + def __init__(self) -> None: + self.unavailable_regional_endpoints: dict[str, "RegionalRoutingContext"] = {} + self._lock = threading.Lock() + self.current_region: Optional[str] = None + + def try_move_to_next_location( + self, + available_account_regional_endpoints: dict[str, "RegionalRoutingContext"], + endpoint_region: str, + request: RequestObject) -> bool: + """ + Tries to move to the next available regional endpoint for the partition key range. + :param Dict[str, RegionalRoutingContext] available_account_regional_endpoints: The available regional endpoints + :param str endpoint_region: The current regional endpoint + :param RequestObject request: The request object containing the routing context. + :return: True if the move was successful, False otherwise. + :rtype: bool + """ + with self._lock: + if endpoint_region != self.current_region and self.current_region is not None: + regional_endpoint = available_account_regional_endpoints[self.current_region].primary_endpoint + request.route_to_location(regional_endpoint) + return True + + for regional_endpoint in available_account_regional_endpoints: + if regional_endpoint == self.current_region: + continue + + if regional_endpoint in self.unavailable_regional_endpoints: + continue + + self.current_region = regional_endpoint + logger.warning("PPAF - Moving to next available regional endpoint: %s", self.current_region) + regional_endpoint = available_account_regional_endpoints[self.current_region].primary_endpoint + request.route_to_location(regional_endpoint) + return True + + return False + +class _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailoverAsync( + _GlobalPartitionEndpointManagerForCircuitBreakerAsync): + """ + This internal class implements the logic for partition endpoint management for + geo-replicated database accounts. + """ + def __init__(self, client: "CosmosClientConnection") -> None: + super(_GlobalPartitionEndpointManagerForPerPartitionAutomaticFailoverAsync, self).__init__(client) + self.partition_range_to_failover_info: dict[PartitionKeyRangeWrapper, PartitionLevelFailoverInfo] = {} + self.ppaf_thresholds_tracker = _PPAFPartitionThresholdsTracker() + self._threshold_lock = threading.Lock() + + def is_per_partition_automatic_failover_enabled(self) -> bool: + if not self._database_account_cache or not self._database_account_cache._EnablePerPartitionFailoverBehavior: + return False + return True + + def is_per_partition_automatic_failover_applicable(self, request: RequestObject) -> bool: + if not self.is_per_partition_automatic_failover_enabled(): + return False + + if not request: + return False + + if (self.location_cache.can_use_multiple_write_locations_for_request(request) + or _OperationType.IsReadOnlyOperation(request.operation_type)): + return False + + # if we have at most one region available in the account, we cannot do per partition automatic failover + available_regions = self.location_cache.account_read_regional_routing_contexts_by_location + if len(available_regions) <= 1: + return False + + # if the request is not a non-query plan document request + # or if the request is not executing a stored procedure, return False + if (request.resource_type != ResourceType.Document and + request.operation_type != _OperationType.ExecuteJavaScript): + return False + + return True + + def try_ppaf_failover_threshold( + self, + pk_range_wrapper: "PartitionKeyRangeWrapper", + request: "RequestObject"): + """Verifies whether the per-partition failover threshold has been reached for consecutive errors. If so, + it marks the current region as unavailable for the given partition key range, and moves to the next available + region for the request. + + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :param RequestObject request: The request object containing the routing context. + :returns: None + """ + # If PPAF is enabled, we track consecutive failures for certain exceptions, and only fail over at a partition + # level after the threshold is reached + if request and self.is_per_partition_automatic_failover_applicable(request): + if (self.ppaf_thresholds_tracker.get_pk_failures(pk_range_wrapper) + >= int(os.environ.get(Constants.TIMEOUT_ERROR_THRESHOLD_PPAF, + Constants.TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT))): + # If the PPAF threshold is reached, we reset the count and mark the endpoint unavailable + # Once we mark the endpoint unavailable, the PPAF endpoint manager will try to move to the next + # available region for the partition key range + with self._threshold_lock: + # Check for count again, since a previous request may have now reset the count + if (self.ppaf_thresholds_tracker.get_pk_failures(pk_range_wrapper) + >= int(os.environ.get(Constants.TIMEOUT_ERROR_THRESHOLD_PPAF, + Constants.TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT))): + self.ppaf_thresholds_tracker.clear_pk_failures(pk_range_wrapper) + partition_level_info = self.partition_range_to_failover_info[pk_range_wrapper] + location = self.location_cache.get_location_from_endpoint( + str(request.location_endpoint_to_route)) + logger.warning("PPAF - Failover threshold reached for partition key range: %s for region: %s", #pylint: disable=line-too-long + pk_range_wrapper, location) + regional_context = (self.location_cache. + account_read_regional_routing_contexts_by_location. + get(location).primary_endpoint) + partition_level_info.unavailable_regional_endpoints[location] = regional_context + + def resolve_service_endpoint_for_partition( + self, + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] + ) -> str: + """Resolves the endpoint to be used for the request. In a PPAF-enabled account, this method checks whether + the partition key range has any unavailable regions, and if so, it tries to move to the next available region. + If all regions are unavailable, it invalidates the cache and starts once again from the main write region in the + account configurations. + + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :param RequestObject request: The request object containing the routing context. + :returns: The regional endpoint to be used for the request. + :rtype: str + """ + if self.is_per_partition_automatic_failover_applicable(request) and pk_range_wrapper: + # If per partition automatic failover is applicable, we check partition unavailability + if pk_range_wrapper in self.partition_range_to_failover_info: + partition_failover_info = self.partition_range_to_failover_info[pk_range_wrapper] + if request.location_endpoint_to_route is not None: + endpoint_region = self.location_cache.get_location_from_endpoint(request.location_endpoint_to_route) + if endpoint_region in partition_failover_info.unavailable_regional_endpoints: + available_account_regional_endpoints = self.location_cache.account_read_regional_routing_contexts_by_location #pylint: disable=line-too-long + if (partition_failover_info.current_region is not None and + endpoint_region != partition_failover_info.current_region): + # this request has not yet seen there's an available region being used for this partition + regional_endpoint = available_account_regional_endpoints[ + partition_failover_info.current_region].primary_endpoint + request.route_to_location(regional_endpoint) + else: + if (len(self.location_cache.account_read_regional_routing_contexts_by_location) == + len(partition_failover_info.unavailable_regional_endpoints)): + # If no other region is available, we invalidate the cache and start once again + # from our main write region in the account configurations + logger.warning("All available regions for partition %s are unavailable." + " Refreshing cache.", pk_range_wrapper) + self.partition_range_to_failover_info[pk_range_wrapper] = PartitionLevelFailoverInfo() + request.clear_route_to_location() + else: + # If the current region is unavailable, we try to move to the next available region + partition_failover_info.try_move_to_next_location( + self.location_cache.account_read_regional_routing_contexts_by_location, + endpoint_region, + request) + else: + # Update the current regional endpoint to whatever the request is routing to + partition_failover_info.current_region = endpoint_region + else: + partition_failover_info = PartitionLevelFailoverInfo() + endpoint_region = self.location_cache.get_location_from_endpoint( + request.location_endpoint_to_route) + partition_failover_info.current_region = endpoint_region + self.partition_range_to_failover_info[pk_range_wrapper] = partition_failover_info + return self._resolve_service_endpoint_for_partition_circuit_breaker(request, pk_range_wrapper) + + async def record_failure(self, + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None: + """Records a failure for the given partition key range and request. + :param RequestObject request: The request object containing the routing context. + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :return: None + """ + if self.is_per_partition_automatic_failover_applicable(request): + if pk_range_wrapper is None: + pk_range_wrapper = await self.create_pk_range_wrapper(request) + if pk_range_wrapper: + self.ppaf_thresholds_tracker.add_failure(pk_range_wrapper) + else: + await self.record_ppcb_failure(request, pk_range_wrapper) + + async def record_success(self, + request: RequestObject, + pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None: + """Records a success for the given partition key range and request, effectively clearing the failure count. + :param RequestObject request: The request object containing the routing context. + :param PartitionKeyRangeWrapper pk_range_wrapper: The wrapper containing the partition key range information + for the request. + :return: None + """ + if self.is_per_partition_automatic_failover_applicable(request): + if pk_range_wrapper is None: + pk_range_wrapper = await self.create_pk_range_wrapper(request) + if pk_range_wrapper: + self.ppaf_thresholds_tracker.clear_pk_failures(pk_range_wrapper) + else: + await self.record_ppcb_success(request, pk_range_wrapper) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py index be19eedc36b7..9f58a1b8d8aa 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py @@ -29,7 +29,7 @@ from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError, ServiceResponseError from azure.core.pipeline.policies import AsyncRetryPolicy -from .. import _default_retry_policy, _health_check_retry_policy +from .. import _default_retry_policy, _health_check_retry_policy, _service_unavailable_retry_policy from .. import _endpoint_discovery_retry_policy from .. import _gone_retry_policy from .. import _resource_throttle_retry_policy @@ -47,6 +47,7 @@ # pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches +# cspell:ignore ppaf, ppcb # args [0] is the request object # args [1] is the connection policy @@ -66,11 +67,12 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg :rtype: tuple of (dict, dict) """ pk_range_wrapper = None - if args and global_endpoint_manager.is_circuit_breaker_applicable(args[0]): + if args and (global_endpoint_manager.is_per_partition_automatic_failover_applicable(args[0]) or + global_endpoint_manager.is_circuit_breaker_applicable(args[0])): pk_range_wrapper = await global_endpoint_manager.create_pk_range_wrapper(args[0]) # instantiate all retry policies here to be applied for each request execution endpointDiscovery_retry_policy = _endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy( - client.connection_policy, global_endpoint_manager, *args + client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args ) health_check_retry_policy = _health_check_retry_policy.HealthCheckRetryPolicy( client.connection_policy, @@ -96,8 +98,11 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg service_request_retry_policy = _service_request_retry_policy.ServiceRequestRetryPolicy( client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args, ) + service_unavailable_retry_policy = _service_unavailable_retry_policy._ServiceUnavailableRetryPolicy( + client.connection_policy, global_endpoint_manager, pk_range_wrapper, *args) # Get Logger logger = kwargs.get("logger", logging.getLogger("azure.cosmos._retry_utility_async")) + # HttpRequest we would need to modify for Container Recreate Retry Policy request = None if args and len(args) > 3: @@ -115,7 +120,7 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg try: if args: result = await ExecuteFunctionAsync(function, global_endpoint_manager, *args, **kwargs) - await global_endpoint_manager.record_success(args[0]) + await global_endpoint_manager.record_success(args[0], pk_range_wrapper) else: result = await ExecuteFunctionAsync(function, *args, **kwargs) if not client.last_response_headers: @@ -198,13 +203,17 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg if retry_policy.should_update_throughput_link(request.body, cached_container): new_body = retry_policy._update_throughput_link(request.body) request.body = new_body - retry_policy.container_rid = cached_container["_rid"] request.headers[retry_policy._intended_headers] = retry_policy.container_rid + elif e.status_code == StatusCodes.SERVICE_UNAVAILABLE: + if args: + # record the failure for circuit breaker tracking + await global_endpoint_manager.record_ppcb_failure(args[0], pk_range_wrapper) + retry_policy = service_unavailable_retry_policy elif e.status_code == StatusCodes.REQUEST_TIMEOUT or e.status_code >= StatusCodes.INTERNAL_SERVER_ERROR: - # record the failure for circuit breaker tracking if args: - await global_endpoint_manager.record_failure(args[0]) + # record the failure for ppaf/circuit breaker tracking + await global_endpoint_manager.record_failure(args[0], pk_range_wrapper) retry_policy = timeout_failover_retry_policy else: retry_policy = defaultRetry_policy @@ -252,12 +261,12 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg _handle_service_request_retries(client, service_request_retry_policy, e, *args) else: if args: - await global_endpoint_manager.record_failure(args[0]) + await global_endpoint_manager.record_failure(args[0], pk_range_wrapper) _handle_service_response_retries(request, client, service_response_retry_policy, e, *args) # in case customer is not using aiohttp except ImportError: if args: - await global_endpoint_manager.record_failure(args[0]) + await global_endpoint_manager.record_failure(args[0], pk_range_wrapper) _handle_service_response_retries(request, client, service_response_retry_policy, e, *args) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 35a48b5f0229..74576e29b6a1 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -95,7 +95,9 @@ def _build_connection_policy(kwargs: dict[str, Any]) -> ConnectionPolicy: policy.EnableEndpointDiscovery = kwargs.pop('enable_endpoint_discovery', policy.EnableEndpointDiscovery) policy.PreferredLocations = kwargs.pop('preferred_locations', policy.PreferredLocations) # TODO: Consider storing callback method instead, such as 'Supplier' in JAVA SDK - policy.ExcludedLocations = kwargs.pop('excluded_locations', policy.ExcludedLocations) + excluded_locations = kwargs.pop('excluded_locations', policy.ExcludedLocations) + if excluded_locations: + policy.ExcludedLocations = excluded_locations policy.UseMultipleWriteLocations = kwargs.pop('multiple_write_locations', policy.UseMultipleWriteLocations) # SSL config diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py index d3f28233a6fa..e4edfbb77e85 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/documents.py @@ -79,6 +79,7 @@ def __init__(self) -> None: self._WritableLocations: list[dict[str, str]] = [] self._ReadableLocations: list[dict[str, str]] = [] self._EnableMultipleWritableLocations = False + self._EnablePerPartitionFailoverBehavior = False @property def WritableLocations(self) -> list[dict[str, str]]: diff --git a/sdk/cosmos/azure-cosmos/docs/ErrorCodesAndRetries.md b/sdk/cosmos/azure-cosmos/docs/ErrorCodesAndRetries.md index 9018f3592e67..adea6378a21f 100644 --- a/sdk/cosmos/azure-cosmos/docs/ErrorCodesAndRetries.md +++ b/sdk/cosmos/azure-cosmos/docs/ErrorCodesAndRetries.md @@ -2,20 +2,20 @@ The Cosmos DB Python SDK has several default policies that will deal with retrying certain errors and exceptions. More information on these can be found below. -| Status code | Cause of exception and retry behavior | -| :--- |:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 400 | For all operations:
| -| 401 | For all operations:
| -| 403 |