Skip to content

[Cosmos] Port None handling fix for database_account parameter#44987

Merged
simorenoh merged 4 commits into
mainfrom
copilot/port-none-handling-changes
Feb 5, 2026
Merged

[Cosmos] Port None handling fix for database_account parameter#44987
simorenoh merged 4 commits into
mainfrom
copilot/port-none-handling-changes

Apply suggestion from @simorenoh

7e582d3
Select commit
Loading
Failed to load commit list.
Azure Pipelines / python - cosmos - tests succeeded Feb 5, 2026 in 32m 24s

Build #20260205.3 had test failures

Details

Tests

  • Failed: 4 (0.06%)
  • Passed: 6,929 (98.86%)
  • Other: 76 (1.08%)
  • Total: 7,009
Code coverage

  • 7654 of 13581 lines covered (56.36%)

Annotations

Check failure on line 1 in test_default_availability_strategy_with_ppaf_enabled[query]

See this annotation in the file changed.

@azure-pipelines azure-pipelines / python - cosmos - tests

test_default_availability_strategy_with_ppaf_enabled[query]

azure.cosmos.exceptions.CosmosHttpResponseError: Status code: 400
Injected Error
Raw output
self = <test_availability_strategy.TestAvailabilityStrategy object at 0x0000026707CE8880>
operation = 'query'

    @pytest.mark.parametrize("operation", [READ, QUERY, QUERY_PK, READ_ALL, CHANGE_FEED, CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH])
    def test_default_availability_strategy_with_ppaf_enabled(self, operation):
        """Test availability strategy is enabled when ppaf is enabled, operations failover to second preferred location on errors"""
        uri_down = _location_cache.LocationCache.GetLocationalEndpoint(self.host, self.REGION_1)
        failed_over_uri = _location_cache.LocationCache.GetLocationalEndpoint(self.host, self.REGION_2)
    
        predicate = lambda r: (FaultInjectionTransport.predicate_is_document_operation(r) and
                               FaultInjectionTransport.predicate_is_operation_type(r,
                                                                                   _get_operation_type(operation)) and
                               FaultInjectionTransport.predicate_targets_region(r, uri_down))
    
        error_lambda = lambda r: FaultInjectionTransport.error_after_delay(
            1000,  # Add delay to trigger hedging
            CosmosHttpResponseError(status_code=400, message="Injected Error")
        )
        custom_transport = self._get_custom_transport_with_fault_injection(predicate, error_lambda)
        # enable ppaf
        is_get_account_predicate = lambda r: FaultInjectionTransport.predicate_is_database_account_call(r)
        # Set the database account response to have PPAF enabled
        ppaf_enabled_database_account = \
            lambda r, inner: FaultInjectionTransport.transform_topology_ppaf_enabled(inner=inner)
        custom_transport.add_response_transformation(
            is_get_account_predicate,
            ppaf_enabled_database_account)
    
        setup = self._setup_method_with_custom_transport(custom_transport,multiple_write_locations=True)
    
        setup_without_fault = self._setup_method_with_custom_transport(None)
        doc = _create_doc()
        setup_without_fault['col'].create_item(body=doc)
    
        expected_uris = [uri_down, failed_over_uri]
        # Test operation with fault injection
    
        if operation in [READ, QUERY, QUERY_PK, READ_ALL, CHANGE_FEED]:
>           _perform_read_operation(
                operation,
                setup['col'],
                doc,
                expected_uris,
                [])

tests\test_availability_strategy.py:875: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests\test_availability_strategy.py:91: in _perform_read_operation
    response = list(container.query_items(
.tox\whl\lib\site-packages\azure\core\paging.py:136: in __next__
    return next(self._page_iterator)
.tox\whl\lib\site-packages\azure\core\paging.py:82: in __next__
    self._response = self._get_next(self.continuation_token)
.tox\whl\lib\site-packages\azure\cosmos\_query_iterable.py:116: in _fetch_next
    block = self._ex_context.fetch_next_block()
.tox\whl\lib\site-packages\azure\cosmos\_execution_context\execution_dispatcher.py:147: in fetch_next_block
    raise e
.tox\whl\lib\site-packages\azure\cosmos\_execution_context\execution_dispatcher.py:142: in fetch_next_block
    return self._execution_context.fetch_next_block()
.tox\whl\lib\site-packages\azure\cosmos\_execution_context\base_execution_context.py:81: in fetch_next_block
    self._ensure()
.tox\whl\lib\site-packages\azure\cosmos\_execution_context\base_execution_context.py:66: in _ensure
    results = self._fetch_next_block()
.tox\whl\lib\site-packages\azure\cosmos\_execution_context\base_execution_context.py:224: in _fetch_next_block
    return self._fetch_items_helper_with_retries(self._fetch_function)
.tox\whl\lib\site-packages\azure\cosmos\_execution_context\base_execution_context.py:163: in _fetch_items_helper_with_retries
    return execute_fetch()
.tox\whl\lib\site-packages\azure\cosmos\_execution_context

Check failure on line 1 in test_default_availability_strategy_with_ppaf_enabled[batch]

See this annotation in the file changed.

@azure-pipelines azure-pipelines / python - cosmos - tests

test_default_availability_strategy_with_ppaf_enabled[batch]

AssertionError: No matching request URLs found in mock handler messages
assert {'https://ta6...ure.com:443/'} == {'https://ta6...ure.com:443/'}
  
  Extra items in the right set:
  'https://ta6a7f3e79c854eb7-westus3.documents.azure.com:443/'
  
  Full diff:
    {
        'https://ta6a7f3e79c854eb7-westcentralus.documents.azure.com:443/',
  -     'https://ta6a7f3e79c854eb7-westus3.documents.azure.com:443/',
    }
Raw output
self = <test_availability_strategy_async.TestAsyncAvailabilityStrategy object at 0x0000027B5BBD0CD0>
operation = 'batch'
setup = {'client_without_fault': <CosmosClient [https://ta6a7f3e79c854eb7.documents.azure.com:443/]>, 'read_locations': ['West US 3', 'West Central US'], 'region_1': 'West US 3', 'region_2': 'West Central US', ...}

    @pytest.mark.asyncio
    @pytest.mark.parametrize("operation",[READ, QUERY, QUERY_PK, READ_ALL, CHANGE_FEED, CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH])
    async def test_default_availability_strategy_with_ppaf_enabled(
            self,
            operation,
            setup):
        """Test availability strategy is enabled when ppaf is enabled, operations failover to second preferred location on errors"""
        uri_down = _location_cache.LocationCache.GetLocationalEndpoint(self.host, setup['region_1'])
        failed_over_uri = _location_cache.LocationCache.GetLocationalEndpoint(self.host, setup['region_2'])
    
        predicate = lambda r: (FaultInjectionTransportAsync.predicate_is_document_operation(r) and
                               FaultInjectionTransportAsync.predicate_is_operation_type(r, _get_operation_type(
                                   operation)) and
                               FaultInjectionTransportAsync.predicate_targets_region(r, uri_down))
    
        error_lambda = lambda r: FaultInjectionTransportAsync.error_after_delay(
            1000,  # Add delay to trigger hedging
            CosmosHttpResponseError(status_code=400, message="Injected Error")
        )
        custom_transport = self._get_custom_transport_with_fault_injection(predicate, error_lambda)
        #enable ppaf
        is_get_account_predicate = lambda r: FaultInjectionTransportAsync.predicate_is_database_account_call(r)
        # Set the database account response to have PPAF enabled
        ppaf_enabled_database_account = \
            lambda r, inner: FaultInjectionTransportAsync.transform_topology_ppaf_enabled(inner=inner)
        custom_transport.add_response_transformation(
            is_get_account_predicate,
            ppaf_enabled_database_account)
    
        setup_with_transport = await self._setup_method_with_custom_transport(
            setup['write_locations'],
            setup['read_locations'],
            custom_transport,
            multiple_write_locations=True)
        setup_without_fault = await self._setup_method_with_custom_transport(
            setup['write_locations'],
            setup['read_locations'],
            None)
    
        doc = _create_doc()
        await setup_without_fault['col'].create_item(doc)
    
        # Test operation with fault injection
        if operation in [READ, QUERY, QUERY_PK, READ_ALL, CHANGE_FEED]:
            await _perform_read_operation(
                operation,
                setup_with_transport['col'],
                doc,
                [uri_down, failed_over_uri],
                [])
        else:
>           await _perform_write_operation(
                operation,
                setup_with_transport['col'],
                doc,
                [uri_down, failed_over_uri],
                [],
                retry_write=True)

tests\test_availability_strategy_async.py:1029: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests\test_availability_strategy_async.py:222: in _perform_write_operation
    _validate_response_uris(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

expected_location_uris = ['https://ta6a7f3e79c854eb7-westus3.documents.azure.com:443/', 'https://ta6a7f3e79c854eb7-westcentralus.documents.azure.com:443/']
excluded_location_uris = [], operation_type = 'Batch', resource_type = 'docs'

    def _validate_response_uris(expected_location_uris, excluded_location_uris, operation_type=None, resource_type=None):
        """Validate that response came from expected region and not from excluded regions"""
        # Get Request URLs fr

Check failure on line 1 in test_per_partition_circular_breaker_with_cancelled_first_future[replace]

See this annotation in the file changed.

@azure-pipelines azure-pipelines / python - cosmos - tests

test_per_partition_circular_breaker_with_cancelled_first_future[replace]

AssertionError: No matching request URLs found in mock handler messages
assert {'https://t41...ure.com:443/'} == {'https://t41...ure.com:443/'}
  
  Extra items in the right set:
  'https://t4126c5d05be94bcd-westus3.documents.azure.com:443/'
  
  Full diff:
    {
        'https://t4126c5d05be94bcd-westcentralus.documents.azure.com:443/',
  -     'https://t4126c5d05be94bcd-westus3.documents.azure.com:443/',
    }
Raw output
self = <test_availability_strategy_async.TestAsyncAvailabilityStrategy object at 0x0000013F3501C640>
operation = 'replace'
setup = {'client_without_fault': <CosmosClient [https://t4126c5d05be94bcd.documents.azure.com:443/]>, 'read_locations': ['West US 3', 'West Central US'], 'region_1': 'West US 3', 'region_2': 'West Central US', ...}

    @pytest.mark.asyncio
    @pytest.mark.parametrize("operation", [READ, QUERY_PK, CHANGE_FEED, CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH])
    async def test_per_partition_circular_breaker_with_cancelled_first_future(self, operation, setup):
        # QUERY, READ_ALL are not included because currently they are not targeting to a specific pkRange
        os.environ["AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER"] = "True"
        os.environ["AZURE_COSMOS_CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_WRITE"] = "5"
        os.environ["AZURE_COSMOS_CONSECUTIVE_ERROR_COUNT_TOLERATED_FOR_READ"] = "5"
    
        try:
            """Test that when per partition circular breaker is enabled and after hitting the threshold, subsequent requests go directly to second region.
            This test verifies the logic of recording failure of cancelled first_future."""
    
            # Setup fault injection for first region
            uri_down = _location_cache.LocationCache.GetLocationalEndpoint(self.host, setup['region_1'])
            failed_over_uri = _location_cache.LocationCache.GetLocationalEndpoint(self.host, setup['region_2'])
    
            predicate = lambda r: (FaultInjectionTransportAsync.predicate_is_document_operation(r) and
                                   FaultInjectionTransportAsync.predicate_is_operation_type(r, _get_operation_type(operation)) and
                                   FaultInjectionTransportAsync.predicate_targets_region(r, uri_down))
    
            error_lambda = lambda r: FaultInjectionTransportAsync.error_after_delay(
                1000,  # Add delay to trigger hedging
                CosmosHttpResponseError(status_code=503, message="Injected Error")
            )
    
            custom_transport = self._get_custom_transport_with_fault_injection(predicate, error_lambda)
    
            strategy = {'threshold_ms':100, 'threshold_steps_ms':50}
    
            setup_with_fault_injection = await self._setup_method_with_custom_transport(
                setup['write_locations'],
                setup['read_locations'],
                custom_transport,
                multiple_write_locations=True,
                container_id=self.TEST_SINGLE_PARTITION_CONTAINER_ID
            )
            setup_without_fault = await self._setup_method_with_custom_transport(
                setup['write_locations'],
                setup['read_locations'],
                None,
                container_id=self.TEST_SINGLE_PARTITION_CONTAINER_ID)
    
            # First operation will attempt first region, fail, and then succeed in second region
            expected_uris = [uri_down, failed_over_uri]
    
            for _ in range(5):
                doc = _create_doc()
                await setup_without_fault['col'].create_item(body=doc)
                if operation in [READ, QUERY, QUERY_PK, READ_ALL, CHANGE_FEED]:
                    await _perform_read_operation(
                        operation,
                        setup_with_fault_injection['col'],
                        doc,
                        expected_uris,
                        [],
                        availability_strategy_config=strategy)
                else:
>                   await _perform_write_operation(
                        operation,
                        setup_with_fault_injection['col'],
                        doc,
                        expected_uris,
                        [],
                        retry_write=True,
                        availability_strategy_config=strategy)

tests\test_availability_strategy_async.py:891: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

Check failure on line 1 in test_availability_strategy_in_steady_state[client_availability_strategy1-request_availability_strategy1-read]

See this annotation in the file changed.

@azure-pipelines azure-pipelines / python - cosmos - tests

test_availability_strategy_in_steady_state[client_availability_strategy1-request_availability_strategy1-read]

AssertionError: No matching request URLs found in mock handler messages
assert {'https://td9...ure.com:443/'} == {'https://td9...ure.com:443/'}
  
  Extra items in the left set:
  'https://td9b1d7ca61a647ff-westcentralus.documents.azure.com:443/'
  
  Full diff:
    {
  +     'https://td9b1d7ca61a647ff-westcentralus.documents.azure.com:443/',
        'https://td9b1d7ca61a647ff-westus3.documents.azure.com:443/',
    }
Raw output
self = <test_availability_strategy_async.TestAsyncAvailabilityStrategy object at 0x00000219BB6A8310>
operation = 'read'
client_availability_strategy = {'threshold_ms': 150, 'threshold_steps_ms': 50}
request_availability_strategy = <object object at 0x00000219BB4C1870>
setup = {'client_without_fault': <CosmosClient [https://td9b1d7ca61a647ff.documents.azure.com:443/]>, 'read_locations': ['West US 3', 'West Central US'], 'region_1': 'West US 3', 'region_2': 'West Central US', ...}

    @pytest.mark.asyncio
    @pytest.mark.parametrize("operation", [READ, QUERY, QUERY_PK, READ_ALL, CHANGE_FEED, CREATE, UPSERT, REPLACE, DELETE, PATCH, BATCH])
    @pytest.mark.parametrize("client_availability_strategy, request_availability_strategy", [
        (None, {'threshold_ms':150, 'threshold_steps_ms':50}),
        ({'threshold_ms':150, 'threshold_steps_ms':50}, _Unset),
        ({'threshold_ms':150, 'threshold_steps_ms':50},
         {'threshold_ms':150, 'threshold_steps_ms':50})
    ])
    async def test_availability_strategy_in_steady_state(
            self,
            operation,
            client_availability_strategy,
            request_availability_strategy,
            setup):
        """Test for steady state, operations go to first preferred location even with availability strategy enabled"""
        # Setup client with availability strategy
        setup_with_transport = await self._setup_method_with_custom_transport(
            setup['write_locations'],
            setup['read_locations'],
            None,
            availability_strategy_config=client_availability_strategy)
        doc = _create_doc()
        await setup_with_transport['col'].create_item(doc)
        await asyncio.sleep(0.5)
    
        container = setup_with_transport['col']
        expected_uris = [_location_cache.LocationCache.GetLocationalEndpoint(self.host, setup['region_1'])]
        excluded_uris = [_location_cache.LocationCache.GetLocationalEndpoint(self.host, setup['region_2'])]
    
        # Test operation
        if operation in [READ, QUERY, QUERY_PK, READ_ALL, CHANGE_FEED]:
>           await _perform_read_operation(
                operation,
                container,
                doc,
                expected_uris,
                excluded_uris,
                availability_strategy_config=request_availability_strategy)

tests\test_availability_strategy_async.py:397: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests\test_availability_strategy_async.py:151: in _perform_read_operation
    _validate_response_uris(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

expected_location_uris = ['https://td9b1d7ca61a647ff-westus3.documents.azure.com:443/']
excluded_location_uris = ['https://td9b1d7ca61a647ff-westcentralus.documents.azure.com:443/']
operation_type = 'Read', resource_type = 'docs'

    def _validate_response_uris(expected_location_uris, excluded_location_uris, operation_type=None, resource_type=None):
        """Validate that response came from expected region and not from excluded regions"""
        # Get Request URLs from mock handler messages
        req_urls = []
        for msg in TestAsyncAvailabilityStrategy.MOCK_HANDLER.messages:
            if 'Request URL:' not in msg:
                continue
    
            # If operation_type and resource_type specified, filter messages
            if operation_type and resource_type:
                req_resource_type = re.search(r"'x-ms-thinclient-proxy-resource-type':\s*'([^']+)'", msg)
                req_operation_type = re.search(r"'x-ms-thinclient-proxy-operation-type':\s*'([^']+)'", msg)
    
                if not (req_resource_type and req_operation_type):
                    continue
    
                if resource_type != req_resource_type.group(1) or operation_type != req_operation_type.group(1):
                    continue
    
            scheme, rest = msg.split()[2].strip("'").split("//", 1)