-
Notifications
You must be signed in to change notification settings - Fork 3.2k
[Cosmos] Per-Partition Automatic Failover #41588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 4 commits
Commits
Show all changes
85 commits
Select commit
Hold shift + click to select a range
a47452e
sync PPAF
simorenoh b8228e7
async changes
simorenoh 151a2fa
Update test_per_partition_automatic_failover_async.py
simorenoh b9e0a08
CI fixes
simorenoh e4d7046
changelog
simorenoh 09e7163
broken link
simorenoh 4e28f66
Update test_location_cache.py
simorenoh c5319e8
change PPAF detection logic
simorenoh eba6093
Update _global_partition_endpoint_manager_circuit_breaker_core.py
simorenoh 2ec5c5d
Update _global_partition_endpoint_manager_circuit_breaker_core.py
simorenoh 62d7be0
fix tests and remove environment variable
tvaron3 b57949d
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 24b8415
fix tests
tvaron3 9595327
revert excluded locations change
tvaron3 8911ef5
fix analyze
tvaron3 25dbeb3
test excluded locations
tvaron3 d61a9a9
Add different error handling for 503 and 408s, update README
tvaron3 3f8ac23
Merge branch 'main' into cosmos-ppaf
simorenoh f1c69ed
mypy, cspell, pylint
simorenoh 9306d15
remove tag from tests since config is service based
simorenoh bd07d83
add threshold-based retries for 408, 5xx errors
simorenoh 80cc824
Merge branch 'main' into cosmos-ppaf
simorenoh 2e5838c
update constant use, rollback session token PR change
simorenoh 8b7d181
threshold based retries
simorenoh f25b660
Merge branch 'main' into cosmos-ppaf
simorenoh d8ed980
Update _base.py
simorenoh fcd5c60
cspell, test fixes
simorenoh 93c76ad
Merge branch 'main' into cosmos-ppaf
simorenoh 467a95d
Update _service_unavailable_retry_policy.py
simorenoh b9aa01c
mypy, pylint
simorenoh 64f95e3
503 behavior change, use regional contexts
simorenoh d05fc5e
mypy, pylint, tests
simorenoh 85b2007
special-casing 503s
simorenoh f8fa70a
small fix
simorenoh e5c5ac5
exclude region tests
simorenoh ccd9def
session retry tests
simorenoh 1dccc5d
pylint, cspell
simorenoh ebf0b0d
Merge branch 'main' into cosmos-ppaf
simorenoh c2bb93a
change errors since 503 is now retried directly
simorenoh c3879d8
Update sdk/cosmos/azure-cosmos/README.md
simorenoh 1d57bf2
address comments
simorenoh eec77e7
Update _service_unavailable_retry_policy.py
simorenoh 4c2bf32
small test updates for 503 behavior
simorenoh 05654a9
further comments
simorenoh f982d21
Update test_per_partition_circuit_breaker_sm_mrr.py
simorenoh d9ca7a4
test fixes
simorenoh f1dce5d
Update test_excluded_locations.py
simorenoh 1582cf3
small improvement to region-finding
simorenoh 8f7ec0c
pylint
simorenoh 1c10349
Merge branch 'main' into cosmos-ppaf
simorenoh effb6d1
Update _global_partition_endpoint_manager_per_partition_automatic_fai…
simorenoh 1e773f5
address comments, add threshold lock
simorenoh 24a44d9
add more comments
simorenoh d07610a
Merge branch 'main' into cosmos-ppaf
simorenoh f984204
Merge branch 'main' into cosmos-ppaf
simorenoh c772092
edge cases
simorenoh 143cf17
Merge branch 'main' into cosmos-ppaf
simorenoh ef9f73a
Merge branch 'main' into cosmos-ppaf
simorenoh 3acda24
changes from testing
simorenoh 9a6b17b
pylint
simorenoh c3e0035
Merge branch 'main' into cosmos-ppaf
simorenoh 8f75444
fixes pylint/mypy
simorenoh 0ccd9bf
mypy complaining about assigning str to none
simorenoh f4e4d65
testing changes - will roll back later
simorenoh 4e276e1
Merge branch 'cosmos-ppaf' of https://github.com/Azure/azure-sdk-for-…
simorenoh 8f87b13
Update _endpoint_discovery_retry_policy.py
simorenoh 3e1f6be
Update _asynchronous_request.py
simorenoh 42817fc
add user agent feature flags
simorenoh 23f3b0d
Merge branch 'main' into cosmos-ppaf
simorenoh 65f9e01
Update test_per_partition_automatic_failover_async.py
simorenoh e15e43d
move user agent logic
simorenoh 0d7e887
sync and async match, remove print statements
simorenoh aa3b641
leftover timer
simorenoh 799f6de
Update _retry_utility.py
simorenoh 36249b4
use constants
simorenoh f5cd24b
Merge branch 'main' into cosmos-ppaf
simorenoh 0495c7b
pylint
simorenoh 335e10e
Merge branch 'main' into cosmos-ppaf
simorenoh 2f004b7
Merge branch 'main' into cosmos-ppaf
simorenoh 8639093
Update CHANGELOG.md
simorenoh 5b3815f
react to comments
simorenoh e31d674
Update _retry_utility.py
simorenoh e55871c
mypy pylint
simorenoh 0463a3f
test fixes
simorenoh cdfdc01
add lock to failure additions
simorenoh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
148 changes: 148 additions & 0 deletions
148
...osmos/azure/cosmos/_global_partition_endpoint_manager_per_partition_automatic_failover.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| # The MIT License (MIT) | ||
| # Copyright (c) 2025 Microsoft Corporation | ||
|
|
||
| """Class for global endpoint manager for per partition automatic failover. This class inherits the circuit breaker | ||
| endpoint manager, since enabling per partition automatic failover also enables the circuit breaker logic. | ||
| """ | ||
| import logging | ||
| import os | ||
| import threading | ||
|
|
||
| from typing import Dict, Set, TYPE_CHECKING, Optional | ||
|
|
||
| from azure.cosmos.http_constants import ResourceType | ||
| from azure.cosmos._constants import _Constants as Constants | ||
| from azure.cosmos._global_partition_endpoint_manager_circuit_breaker import \ | ||
| _GlobalPartitionEndpointManagerForCircuitBreaker | ||
| from azure.cosmos.documents import _OperationType | ||
|
|
||
| from azure.cosmos._request_object import RequestObject | ||
| from azure.cosmos._routing.routing_range import PartitionKeyRangeWrapper | ||
|
|
||
| if TYPE_CHECKING: | ||
| from azure.cosmos.aio._cosmos_client_connection_async import CosmosClientConnection | ||
|
|
||
| logger = logging.getLogger("azure.cosmos._GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover") | ||
|
|
||
| # pylint: disable=name-too-long, protected-access | ||
|
|
||
| class PartitionLevelFailoverInfo: | ||
| """ | ||
| Holds information about the partition level regional failover. | ||
| Used to track the partition key range and the regions where it is available. | ||
| """ | ||
| def __init__(self): | ||
| self.unavailable_regional_endpoints = set() | ||
| self.current_regional_endpoint = None | ||
| self._lock = threading.Lock() | ||
|
|
||
| def try_move_to_next_location(self, available_account_regional_endpoints: Set[str], request: RequestObject) -> bool: | ||
| with self._lock: | ||
| failed_regional_endpoint = request.location_endpoint_to_route | ||
| if failed_regional_endpoint != self.current_regional_endpoint: | ||
| logger.info("Moving to next available regional endpoint: %s", self.current_regional_endpoint) | ||
| request.route_to_location(self.current_regional_endpoint) | ||
| return True | ||
|
|
||
| for regional_endpoint in available_account_regional_endpoints: | ||
| if regional_endpoint == self.current_regional_endpoint: | ||
| continue | ||
|
|
||
| if regional_endpoint in self.unavailable_regional_endpoints: | ||
| continue | ||
|
|
||
| self.current_regional_endpoint = regional_endpoint | ||
| logger.info("Moving to next available regional endpoint: %s", self.current_regional_endpoint) | ||
| request.route_to_location(self.current_regional_endpoint) | ||
| return True | ||
|
|
||
| return False | ||
|
|
||
| class _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover(_GlobalPartitionEndpointManagerForCircuitBreaker): | ||
| """ | ||
| This internal class implements the logic for partition endpoint management for | ||
| geo-replicated database accounts. | ||
| """ | ||
| def __init__(self, client: "CosmosClientConnection"): | ||
| super(_GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover, self).__init__(client) | ||
| self.partition_range_to_failover_info: Dict[PartitionKeyRangeWrapper, PartitionLevelFailoverInfo] = {} | ||
simorenoh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def is_per_partition_automatic_failover_applicable(self, request: RequestObject) -> bool: | ||
| if not request: | ||
| return False | ||
|
|
||
| if (self.location_cache.can_use_multiple_write_locations_for_request(request) | ||
| or _OperationType.IsReadOnlyOperation(request.operation_type)): | ||
| return False | ||
|
|
||
| per_partition_automatic_failover_config_enabled = ( | ||
| os.environ.get(Constants.PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED_CONFIG, | ||
| Constants.PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED_CONFIG_DEFAULT).lower() == "true") | ||
|
|
||
| # TODO: This check here needs to be verified once we test against a live account with the config enabled. | ||
| if (not per_partition_automatic_failover_config_enabled or | ||
| not self._database_account_cache._EnablePerPartitionFailoverBehavior): | ||
| return False | ||
|
|
||
| # if we have at most one region available in the account, we cannot do per partition automatic failover | ||
| available_regions = self.compute_available_preferred_regions(request) | ||
| if len(available_regions) <= 1: | ||
| return False | ||
|
|
||
| # if the request is not for a document or if the request is not executing a stored procedure, return False | ||
| if (request.resource_type != ResourceType.Document and | ||
simorenoh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| request.operation_type != _OperationType.ExecuteJavaScript): | ||
| return False | ||
|
|
||
| return True | ||
|
|
||
| def resolve_service_endpoint_for_partition( | ||
dibahlfi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self, | ||
| request: RequestObject, | ||
| pk_range_wrapper: Optional[PartitionKeyRangeWrapper] | ||
| ) -> str: | ||
simorenoh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if self.is_per_partition_automatic_failover_applicable(request) and pk_range_wrapper: | ||
| # If per partition automatic failover is applicable, we check partition unavailability | ||
| if pk_range_wrapper in self.partition_range_to_failover_info: | ||
| logger.info("Resolving service endpoint for partition with per partition automatic failover enabled.") | ||
simorenoh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| partition_failover_info = self.partition_range_to_failover_info[pk_range_wrapper] | ||
| if request.location_endpoint_to_route is not None: | ||
| if request.location_endpoint_to_route in partition_failover_info.unavailable_regional_endpoints: | ||
| # If the current region is unavailable, we try to move to the next available region | ||
| if not partition_failover_info.try_move_to_next_location( | ||
| self.compute_available_preferred_regions(request), | ||
| request): | ||
| logger.info("All available regions for partition are unavailable. Refreshing cache.") | ||
simorenoh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # If no other region is available, we invalidate the cache and start once again from our | ||
| # main write region in the account configurations | ||
| self.partition_range_to_failover_info[pk_range_wrapper] = PartitionLevelFailoverInfo() | ||
| request.clear_route_to_location() | ||
| return self._resolve_service_endpoint(request) | ||
| else: | ||
| # Update the current regional endpoint to whatever the request is routing to | ||
| partition_failover_info.current_regional_endpoint = request.location_endpoint_to_route | ||
| else: | ||
| partition_failover_info = PartitionLevelFailoverInfo() | ||
| partition_failover_info.current_regional_endpoint = request.location_endpoint_to_route | ||
| self.partition_range_to_failover_info[pk_range_wrapper] = partition_failover_info | ||
| return self._resolve_service_endpoint(request) | ||
| return self._resolve_service_endpoint_for_partition_circuit_breaker(request, pk_range_wrapper) | ||
|
|
||
| def compute_available_preferred_regions( | ||
jeet1995 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self, | ||
| request: RequestObject | ||
| ) -> Set[str]: | ||
| """ | ||
| Computes the available regional endpoints for the request based on customer-set preferred and excluded regions. | ||
| :param RequestObject request: The request object containing the routing context. | ||
| :return: A set of available regional endpoints. | ||
| :rtype: Set[str] | ||
| """ | ||
| excluded_locations = request.excluded_locations + self.location_cache.connection_policy.ExcludedLocations | ||
| preferred_locations = self.PreferredLocations | ||
| available_regions = [item for item in preferred_locations if item not in excluded_locations] | ||
| available_regional_endpoints = { | ||
| self.location_cache.account_read_regional_routing_contexts_by_location[region].primary_endpoint | ||
| for region in available_regions | ||
| } | ||
| return available_regional_endpoints | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.