Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
a47452e
sync PPAF
simorenoh Jun 15, 2025
b8228e7
async changes
simorenoh Jun 16, 2025
151a2fa
Update test_per_partition_automatic_failover_async.py
simorenoh Jun 16, 2025
b9e0a08
CI fixes
simorenoh Jun 16, 2025
e4d7046
changelog
simorenoh Jun 16, 2025
09e7163
broken link
simorenoh Jun 16, 2025
4e28f66
Update test_location_cache.py
simorenoh Jun 16, 2025
c5319e8
change PPAF detection logic
simorenoh Jun 16, 2025
eba6093
Update _global_partition_endpoint_manager_circuit_breaker_core.py
simorenoh Jun 16, 2025
2ec5c5d
Update _global_partition_endpoint_manager_circuit_breaker_core.py
simorenoh Jun 17, 2025
62d7be0
fix tests and remove environment variable
tvaron3 Jun 18, 2025
b57949d
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
tvaron3 Jun 18, 2025
24b8415
fix tests
tvaron3 Jun 23, 2025
9595327
revert excluded locations change
tvaron3 Jul 2, 2025
8911ef5
fix analyze
tvaron3 Jul 3, 2025
25dbeb3
test excluded locations
tvaron3 Jul 7, 2025
d61a9a9
Add different error handling for 503 and 408s, update README
tvaron3 Jul 8, 2025
3f8ac23
Merge branch 'main' into cosmos-ppaf
simorenoh Jul 30, 2025
f1c69ed
mypy, cspell, pylint
simorenoh Jul 31, 2025
9306d15
remove tag from tests since config is service based
simorenoh Jul 31, 2025
bd07d83
add threshold-based retries for 408, 5xx errors
simorenoh Aug 7, 2025
80cc824
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 8, 2025
2e5838c
update constant use, rollback session token PR change
simorenoh Aug 8, 2025
8b7d181
threshold based retries
simorenoh Aug 18, 2025
f25b660
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 18, 2025
d8ed980
Update _base.py
simorenoh Aug 19, 2025
fcd5c60
cspell, test fixes
simorenoh Aug 19, 2025
93c76ad
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 19, 2025
467a95d
Update _service_unavailable_retry_policy.py
simorenoh Aug 19, 2025
b9aa01c
mypy, pylint
simorenoh Aug 19, 2025
64f95e3
503 behavior change, use regional contexts
simorenoh Aug 21, 2025
d05fc5e
mypy, pylint, tests
simorenoh Aug 21, 2025
85b2007
special-casing 503s
simorenoh Aug 21, 2025
f8fa70a
small fix
simorenoh Aug 21, 2025
e5c5ac5
exclude region tests
simorenoh Aug 21, 2025
ccd9def
session retry tests
simorenoh Aug 22, 2025
1dccc5d
pylint, cspell
simorenoh Aug 22, 2025
ebf0b0d
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 22, 2025
c2bb93a
change errors since 503 is now retried directly
simorenoh Aug 25, 2025
c3879d8
Update sdk/cosmos/azure-cosmos/README.md
simorenoh Aug 26, 2025
1d57bf2
address comments
simorenoh Aug 26, 2025
eec77e7
Update _service_unavailable_retry_policy.py
simorenoh Aug 26, 2025
4c2bf32
small test updates for 503 behavior
simorenoh Aug 26, 2025
05654a9
further comments
simorenoh Aug 27, 2025
f982d21
Update test_per_partition_circuit_breaker_sm_mrr.py
simorenoh Aug 27, 2025
d9ca7a4
test fixes
simorenoh Aug 27, 2025
f1dce5d
Update test_excluded_locations.py
simorenoh Aug 27, 2025
1582cf3
small improvement to region-finding
simorenoh Aug 29, 2025
8f7ec0c
pylint
simorenoh Aug 29, 2025
1c10349
Merge branch 'main' into cosmos-ppaf
simorenoh Aug 29, 2025
effb6d1
Update _global_partition_endpoint_manager_per_partition_automatic_fai…
simorenoh Aug 29, 2025
1e773f5
address comments, add threshold lock
simorenoh Aug 29, 2025
24a44d9
add more comments
simorenoh Aug 29, 2025
d07610a
Merge branch 'main' into cosmos-ppaf
simorenoh Sep 2, 2025
f984204
Merge branch 'main' into cosmos-ppaf
simorenoh Sep 4, 2025
c772092
edge cases
simorenoh Sep 19, 2025
143cf17
Merge branch 'main' into cosmos-ppaf
simorenoh Sep 19, 2025
ef9f73a
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 2, 2025
3acda24
changes from testing
simorenoh Oct 7, 2025
9a6b17b
pylint
simorenoh Oct 7, 2025
c3e0035
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 8, 2025
8f75444
fixes pylint/mypy
simorenoh Oct 8, 2025
0ccd9bf
mypy complaining about assigning str to none
simorenoh Oct 8, 2025
f4e4d65
testing changes - will roll back later
simorenoh Oct 8, 2025
4e276e1
Merge branch 'cosmos-ppaf' of https://github.com/Azure/azure-sdk-for-…
simorenoh Oct 8, 2025
8f87b13
Update _endpoint_discovery_retry_policy.py
simorenoh Oct 9, 2025
3e1f6be
Update _asynchronous_request.py
simorenoh Oct 17, 2025
42817fc
add user agent feature flags
simorenoh Oct 17, 2025
23f3b0d
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 20, 2025
65f9e01
Update test_per_partition_automatic_failover_async.py
simorenoh Oct 20, 2025
e15e43d
move user agent logic
simorenoh Oct 24, 2025
0d7e887
sync and async match, remove print statements
simorenoh Oct 29, 2025
aa3b641
leftover timer
simorenoh Oct 29, 2025
799f6de
Update _retry_utility.py
simorenoh Oct 30, 2025
36249b4
use constants
simorenoh Oct 30, 2025
f5cd24b
Merge branch 'main' into cosmos-ppaf
simorenoh Oct 31, 2025
0495c7b
pylint
simorenoh Oct 31, 2025
335e10e
Merge branch 'main' into cosmos-ppaf
simorenoh Nov 17, 2025
2f004b7
Merge branch 'main' into cosmos-ppaf
simorenoh Nov 17, 2025
8639093
Update CHANGELOG.md
simorenoh Nov 17, 2025
5b3815f
react to comments
simorenoh Nov 19, 2025
e31d674
Update _retry_utility.py
simorenoh Nov 19, 2025
e55871c
mypy pylint
simorenoh Nov 19, 2025
0463a3f
test fixes
simorenoh Nov 20, 2025
cdfdc01
add lock to failure additions
simorenoh Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
### 4.14.3 (Unreleased)

#### Features Added
* Added support for Per Partition Automatic Failover. To enable this feature, you must follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover). See [PR 41588](https://github.com/Azure/azure-sdk-for-python/pull/41588).

#### Breaking Changes

#### Bugs Fixed

#### Other Changes
* Added cross-regional retries for 503 (Service Unavailable) errors. See [PR 41588](https://github.com/Azure/azure-sdk-for-python/pull/41588).

### 4.14.2 (2025-11-14)

Expand Down
5 changes: 5 additions & 0 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,11 @@ requests to another region:
- `AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED`: Default is a `90` percent failure rate.
- After a partition reaches a 90 percent failure rate for all requests, the SDK will send requests routed to that partition to another region.

### Per Partition Automatic Failover (Public Preview)
Per partition automatic failover enables the SDK to automatically redirect write requests at the partition level to another region based on service-side signals. This feature is available
only for single write region accounts that have at least one read-only region. When per partition automatic failover is enabled, per partition circuit breaker and cross-region hedging is enabled by default, meaning
all its configurable options also apply to per partition automatic failover. To enable this feature, follow the guide [here](https://learn.microsoft.com/azure/cosmos-db/how-to-configure-per-partition-automatic-failover).

## Troubleshooting

### General
Expand Down
4 changes: 4 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@
if TYPE_CHECKING:
from ._cosmos_client_connection import CosmosClientConnection
from .aio._cosmos_client_connection_async import CosmosClientConnection as AsyncClientConnection
from ._global_partition_endpoint_manager_per_partition_automatic_failover import (
_GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover)
from ._request_object import RequestObject
from ._routing.routing_range import PartitionKeyRangeWrapper

# pylint: disable=protected-access
#cspell:ignore PPAF, ppaf

_COMMON_OPTIONS = {
'initial_headers': 'initialHeaders',
Expand Down
27 changes: 26 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
"""Class for defining internal constants in the Azure Cosmos database service.
"""


from enum import IntEnum
from typing_extensions import Literal
# cspell:ignore PPAF

# cspell:ignore reranker

Expand All @@ -40,6 +41,7 @@ class _Constants:
Name: Literal["name"] = "name"
DatabaseAccountEndpoint: Literal["databaseAccountEndpoint"] = "databaseAccountEndpoint"
DefaultEndpointsRefreshTime: int = 5 * 60 * 1000 # milliseconds
EnablePerPartitionFailoverBehavior: Literal["enablePerPartitionFailoverBehavior"] = "enablePerPartitionFailoverBehavior" #pylint: disable=line-too-long

# ServiceDocument Resource
EnableMultipleWritableLocations: Literal["enableMultipleWriteLocations"] = "enableMultipleWriteLocations"
Expand Down Expand Up @@ -74,6 +76,10 @@ class _Constants:
FAILURE_PERCENTAGE_TOLERATED = "AZURE_COSMOS_FAILURE_PERCENTAGE_TOLERATED"
FAILURE_PERCENTAGE_TOLERATED_DEFAULT: int = 90
# -------------------------------------------------------------------------
# Only applicable when per partition automatic failover is enabled --------
TIMEOUT_ERROR_THRESHOLD_PPAF = "AZURE_COSMOS_TIMEOUT_ERROR_THRESHOLD_FOR_PPAF"
TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT: int = 10
# -------------------------------------------------------------------------

# Error code translations
ERROR_TRANSLATIONS: dict[int, str] = {
Expand All @@ -99,3 +105,22 @@ class Kwargs:
"""Whether to retry write operations if they fail. Used either at client level or request level."""

EXCLUDED_LOCATIONS: Literal["excludedLocations"] = "excludedLocations"

class UserAgentFeatureFlags(IntEnum):
"""
User agent feature flags.
Each flag represents a bit in a number to encode what features are enabled. Therefore, the first feature flag
will be 1, the second 2, the third 4, etc. When constructing the user agent suffix, the feature flags will be
used to encode a unique number representing the features enabled. This number will be converted into a hex
string following the prefix "F" to save space in the user agent as it is limited and appended to the user agent
suffix. This number will then be used to determine what features are enabled by decoding the hex string back
to a number and checking what bits are set.

Features being developed should align with the .NET SDK as a source of truth for feature flag assignments:
https://github.com/Azure/azure-cosmos-dotnet-v3/blob/master/Microsoft.Azure.Cosmos/src/Diagnostics/UserAgentFeatureFlags.cs

Example:
If the user agent suffix has "F3", this means that flags 1 and 2.
"""
PER_PARTITION_AUTOMATIC_FAILOVER = 1
PER_PARTITION_CIRCUIT_BREAKER = 2
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
HttpResponse # pylint: disable=no-legacy-azure-core-http-response-import

from . import _base as base
from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker
from ._global_partition_endpoint_manager_per_partition_automatic_failover import _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover # pylint: disable=line-too-long
from . import _query_iterable as query_iterable
from . import _runtime_constants as runtime_constants
from . import _session
Expand Down Expand Up @@ -176,7 +176,7 @@ def __init__( # pylint: disable=too-many-statements
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()

self.UseMultipleWriteLocations = False
self._global_endpoint_manager = _GlobalPartitionEndpointManagerForCircuitBreaker(self)
self._global_endpoint_manager = _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover(self)

retry_policy = None
if isinstance(self.connection_policy.ConnectionRetryConfiguration, HTTPPolicy):
Expand Down Expand Up @@ -2688,12 +2688,15 @@ def GetDatabaseAccount(
database_account._ReadableLocations = result[Constants.ReadableLocations]
if Constants.EnableMultipleWritableLocations in result:
database_account._EnableMultipleWritableLocations = result[
Constants.EnableMultipleWritableLocations
]
Constants.EnableMultipleWritableLocations]

self.UseMultipleWriteLocations = (
self.connection_policy.UseMultipleWriteLocations and database_account._EnableMultipleWritableLocations
)

if Constants.EnablePerPartitionFailoverBehavior in result:
database_account._EnablePerPartitionFailoverBehavior = result[Constants.EnablePerPartitionFailoverBehavior]

if response_hook:
response_hook(last_response_headers, result)
return database_account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _get_client_settings(global_endpoint_manager: Optional[_GlobalEndpointManage
gem_client = global_endpoint_manager.client
if gem_client and gem_client.connection_policy:
connection_policy: ConnectionPolicy = gem_client.connection_policy
client_preferred_regions = connection_policy.PreferredLocations
client_preferred_regions = global_endpoint_manager.location_cache.effective_preferred_locations
client_excluded_regions = connection_policy.ExcludedLocations

if global_endpoint_manager.location_cache:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,9 @@
Azure Cosmos database service.
"""

import logging
from azure.cosmos.documents import _OperationType

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_formatter = logging.Formatter("%(levelname)s:%(message)s")
log_handler = logging.StreamHandler()
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
# cspell:ignore PPAF

from azure.cosmos.documents import _OperationType

class EndpointDiscoveryRetryPolicy(object):
"""The endpoint discovery retry policy class used for geo-replicated database accounts
Expand All @@ -43,8 +36,9 @@ class EndpointDiscoveryRetryPolicy(object):
Max_retry_attempt_count = 120
Retry_after_in_milliseconds = 1000

def __init__(self, connection_policy, global_endpoint_manager, *args):
def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper, *args):
self.global_endpoint_manager = global_endpoint_manager
self.pk_range_wrapper = pk_range_wrapper
self._max_retry_attempt_count = EndpointDiscoveryRetryPolicy.Max_retry_attempt_count
self.failover_retry_count = 0
self.retry_after_in_milliseconds = EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
Expand All @@ -70,6 +64,22 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument

self.failover_retry_count += 1

# set the refresh_needed flag to ensure that endpoint list is
# refreshed with new writable and readable locations
self.global_endpoint_manager.refresh_needed = True

# If per partition automatic failover is applicable, we mark the current endpoint as unavailable
# and resolve the service endpoint for the partition range - otherwise, continue the default retry logic
if self.global_endpoint_manager.is_per_partition_automatic_failover_applicable(self.request):
partition_level_info = self.global_endpoint_manager.partition_range_to_failover_info[self.pk_range_wrapper]
location = self.global_endpoint_manager.location_cache.get_location_from_endpoint(
str(self.request.location_endpoint_to_route))
regional_endpoint = (self.global_endpoint_manager.location_cache.
account_read_regional_routing_contexts_by_location.get(location))
partition_level_info.unavailable_regional_endpoints[location] = regional_endpoint
self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper)
return True

if self.request.location_endpoint_to_route:
context = self.__class__.__name__
if _OperationType.IsReadOnlyOperation(self.request.operation_type):
Expand All @@ -82,16 +92,11 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
self.request.location_endpoint_to_route,
True, context)

# set the refresh_needed flag to ensure that endpoint list is
# refreshed with new writable and readable locations
self.global_endpoint_manager.refresh_needed = True

# clear previous location-based routing directive
self.request.clear_route_to_location()

# set location-based routing directive based on retry count
# simulating single master writes by ensuring usePreferredLocations
# is set to false
# simulating single master writes by ensuring usePreferredLocations is set to false
# reasoning being that 403.3 is only expected for write region failover in single writer account
# and we must rely on account locations as they are the source of truth
self.request.route_to_location_with_preferred_location_flag(self.failover_retry_count, False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
if TYPE_CHECKING:
from azure.cosmos._cosmos_client_connection import CosmosClientConnection

#cspell:ignore ppcb

class _GlobalPartitionEndpointManagerForCircuitBreaker(_GlobalEndpointManager):
"""
This internal class implements the logic for partition endpoint management for
Expand Down Expand Up @@ -93,16 +95,17 @@ def create_pk_range_wrapper(self, request: RequestObject) -> Optional[PartitionK

return PartitionKeyRangeWrapper(partition_range, container_rid)

def record_failure(
def record_ppcb_failure(
self,
request: RequestObject
) -> None:
request: RequestObject,
pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None)-> None:
if self.is_circuit_breaker_applicable(request):
pk_range_wrapper = self.create_pk_range_wrapper(request)
if pk_range_wrapper is None:
pk_range_wrapper = self.create_pk_range_wrapper(request)
if pk_range_wrapper:
self.global_partition_endpoint_manager_core.record_failure(request, pk_range_wrapper)

def resolve_service_endpoint_for_partition(
def _resolve_service_endpoint_for_partition_circuit_breaker(
self,
request: RequestObject,
pk_range_wrapper: Optional[PartitionKeyRangeWrapper]
Expand All @@ -113,11 +116,12 @@ def resolve_service_endpoint_for_partition(
pk_range_wrapper)
return self._resolve_service_endpoint(request)

def record_success(
def record_ppcb_success(
self,
request: RequestObject
) -> None:
if self.global_partition_endpoint_manager_core.is_circuit_breaker_applicable(request):
pk_range_wrapper = self.create_pk_range_wrapper(request)
request: RequestObject,
pk_range_wrapper: Optional[PartitionKeyRangeWrapper] = None) -> None:
if self.is_circuit_breaker_applicable(request):
if pk_range_wrapper is None:
pk_range_wrapper = self.create_pk_range_wrapper(request)
if pk_range_wrapper:
self.global_partition_endpoint_manager_core.record_success(request, pk_range_wrapper)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# pylint: disable=protected-access

"""Internal class for global endpoint manager for circuit breaker.
"""
import logging
Expand Down Expand Up @@ -60,7 +62,10 @@ def is_circuit_breaker_applicable(self, request: RequestObject) -> bool:
return False

circuit_breaker_enabled = os.environ.get(Constants.CIRCUIT_BREAKER_ENABLED_CONFIG,
Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT) == "True"
Constants.CIRCUIT_BREAKER_ENABLED_CONFIG_DEFAULT).lower() == "true"
if not circuit_breaker_enabled and self.client._global_endpoint_manager is not None:
if self.client._global_endpoint_manager._database_account_cache is not None:
circuit_breaker_enabled = self.client._global_endpoint_manager._database_account_cache._EnablePerPartitionFailoverBehavior is True # pylint: disable=line-too-long
if not circuit_breaker_enabled:
return False

Expand Down
Loading
Loading