Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
from azure.core import PipelineClient # type: ignore
from azure.core.pipeline.transport import RequestsTransport
from azure.core.pipeline.policies import ( # type: ignore
HTTPPolicy,
ContentDecodePolicy,
HeadersPolicy,
UserAgentPolicy,
NetworkTraceLoggingPolicy,
CustomHookPolicy,
RetryPolicy,
ProxyPolicy)
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy # type: ignore

Expand Down Expand Up @@ -151,15 +153,22 @@ def __init__(
self._useMultipleWriteLocations = False
self._global_endpoint_manager = global_endpoint_manager._GlobalEndpointManager(self)

# creating a requests session used for connection pooling and re-used by all requests
requests_session = requests.Session()

transport = None
if self.connection_policy.ConnectionRetryConfiguration is not None:
adapter = HTTPAdapter(max_retries=self.connection_policy.ConnectionRetryConfiguration)
requests_session.mount('http://', adapter)
requests_session.mount('https://', adapter)
transport = RequestsTransport(session=requests_session)
retry_policy = None
if isinstance(self.connection_policy.ConnectionRetryConfiguration, HTTPPolicy):
retry_policy = self.connection_policy.ConnectionRetryConfiguration
elif isinstance(self.connection_policy.ConnectionRetryConfiguration, int):
retry_policy = RetryPolicy(total=self.connection_policy.ConnectionRetryConfiguration)
elif self.connection_policy.ConnectionRetryConfiguration is not None:
# Convert a urllib3 retry policy to a Pipeline policy
retry_policy = RetryPolicy(
retry_total=self.connection_policy.ConnectionRetryConfiguration.total,
retry_connect=self.connection_policy.ConnectionRetryConfiguration.connect,
retry_read=self.connection_policy.ConnectionRetryConfiguration.read,
retry_status=self.connection_policy.ConnectionRetryConfiguration.status,
retry_backoff_max=self.connection_policy.ConnectionRetryConfiguration.BACKOFF_MAX,
retry_on_status_codes=list(self.connection_policy.ConnectionRetryConfiguration.status_forcelist),
retry_backoff_factor=self.connection_policy.ConnectionRetryConfiguration.backoff_factor
)
Comment thread
bryevdv marked this conversation as resolved.

proxies = kwargs.pop('proxies', {})
if self.connection_policy.ProxyConfiguration and self.connection_policy.ProxyConfiguration.Host:
Expand All @@ -173,11 +182,13 @@ def __init__(
ProxyPolicy(proxies=proxies),
UserAgentPolicy(base_user_agent=_utils.get_user_agent(), **kwargs),
ContentDecodePolicy(),
retry_policy,
CustomHookPolicy(**kwargs),
DistributedTracingPolicy(),
NetworkTraceLoggingPolicy(**kwargs),
]

transport = kwargs.pop("transport", None)
self.pipeline_client = PipelineClient(url_connection, "empty-config", transport=transport, policies=policies)

# Query compatibility mode.
Expand Down
24 changes: 22 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import six
from azure.core.tracing.decorator import distributed_trace # type: ignore
from azure.core.pipeline.policies import RetryPolicy

from ._cosmos_client_connection import CosmosClientConnection
from ._base import build_options
Expand Down Expand Up @@ -96,11 +97,25 @@ def _build_connection_policy(kwargs):

# Retry config
retry = kwargs.pop('retry_options', None) or policy.RetryOptions
retry._max_retry_attempt_count = kwargs.pop('retry_total', None) or retry._max_retry_attempt_count
total_retries = kwargs.pop('retry_total', None)
retry._max_retry_attempt_count = total_retries or retry._max_retry_attempt_count
retry._fixed_retry_interval_in_milliseconds = kwargs.pop('retry_fixed_interval', None) or \
retry._fixed_retry_interval_in_milliseconds
retry._max_wait_time_in_seconds = kwargs.pop('retry_backoff_max', None) or retry._max_wait_time_in_seconds
max_backoff = kwargs.pop('retry_backoff_max', None)
retry._max_wait_time_in_seconds = max_backoff or retry._max_wait_time_in_seconds
policy.RetryOptions = retry
connection_retry = kwargs.pop('connection_retry_policy', None) or policy.ConnectionRetryConfiguration
if not connection_retry:
connection_retry = RetryPolicy(
retry_total=total_retries,
retry_connect=kwargs.pop('retry_connect', None),
retry_read=kwargs.pop('retry_read', None),
retry_status=kwargs.pop('retry_status', None),
retry_backoff_max=max_backoff,
retry_on_status_codes=kwargs.pop('retry_on_status_codes', []),
retry_backoff_factor=kwargs.pop('retry_backoff_factor', 0.8),
)
policy.ConnectionRetryConfiguration = connection_retry

return policy

Expand Down Expand Up @@ -130,6 +145,11 @@ class CosmosClient(object):
*retry_total* - Maximum retry attempts.
*retry_backoff_max* - Maximum retry wait time in seconds.
*retry_fixed_interval* - Fixed retry interval in milliseconds.
*retry_read* - Maximum number of socket read retry attempts.
*retry_connect* - Maximum number of connection error retry attempts.
*retry_status* - Maximum number of retry attempts on error status codes.
*retry_on_status_codes* - A list of specific status codes to retry on.
*retry_backoff_factor* - Factor to calculate wait time between retry attempts.
*enable_endpoint_discovery* - Enable endpoint discovery for geo-replicated database accounts. Default is True.
*preferred_locations* - The preferred locations for geo-replicated database accounts.
When `enable_endpoint_discovery` is true and `preferred_locations` is non-empty,
Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,10 @@ class ConnectionPolicy(object): # pylint: disable=too-many-instance-attributes
:ivar boolean UseMultipleWriteLocations:
Flag to enable writes on any locations (regions) for geo-replicated database accounts
in the azure Cosmos service.
:ivar (int or requests.packages.urllib3.util.retry) ConnectionRetryConfiguration:
:ivar ConnectionRetryConfiguration:
Retry Configuration to be used for urllib3 connection retries.
:vartype ConnectionRetryConfiguration:
int or requests.packages.urllib3.util.retry or azure.core.pipeline.policies.HTTPPolicy
"""

__defaultRequestTimeout = 60000 # milliseconds
Expand Down
55 changes: 39 additions & 16 deletions sdk/cosmos/azure-cosmos/test/crud_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import urllib.parse as urllib
import uuid
import pytest
from azure.core.exceptions import ServiceRequestError
from azure.cosmos import _consistent_hash_ring
import azure.cosmos.documents as documents
import azure.cosmos.errors as errors
Expand Down Expand Up @@ -1977,44 +1978,66 @@ def __get_first(array):
def test_client_request_timeout(self):
connection_policy = documents.ConnectionPolicy()
# making timeout 0 ms to make sure it will throw
connection_policy.RequestTimeout = 0
connection_policy.RequestTimeout = 0.000000000001
with self.assertRaises(Exception):
# client does a getDatabaseAccount on initialization, which will time out
cosmos_client.CosmosClient(CRUDTests.host, CRUDTests.masterKey, "Session", connection_policy=connection_policy)

def test_client_request_timeout_when_connection_retry_configuration_specified(self):
connection_policy = documents.ConnectionPolicy()
# making timeout 0 ms to make sure it will throw
connection_policy.RequestTimeout = 0
connection_policy.RequestTimeout = 0.000000000001
connection_policy.ConnectionRetryConfiguration = Retry(
total=3,
read=3,
connect=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504)
)
with self.assertRaises(Exception):
with self.assertRaises(ServiceRequestError):
# client does a getDatabaseAccount on initialization, which will time out
cosmos_client.CosmosClient(CRUDTests.host, CRUDTests.masterKey, "Session", connection_policy=connection_policy)

def test_client_connection_retry_configuration(self):
total_time_for_two_retries = self.initialize_client_with_connection_retry_config(2)
total_time_for_three_retries = self.initialize_client_with_connection_retry_config(3)
total_time_for_two_retries = self.initialize_client_with_connection_urllib_retry_config(2)
total_time_for_three_retries = self.initialize_client_with_connection_urllib_retry_config(3)
self.assertGreater(total_time_for_three_retries, total_time_for_two_retries)

def initialize_client_with_connection_retry_config(self, retries):
from azure.core.exceptions import ServiceRequestError
connection_policy = documents.ConnectionPolicy()
connection_policy.ConnectionRetryConfiguration = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=0.3,
status_forcelist=(500, 502, 504)
)
total_time_for_two_retries = self.initialize_client_with_connection_core_retry_config(2)
total_time_for_three_retries = self.initialize_client_with_connection_core_retry_config(3)
self.assertGreater(total_time_for_three_retries, total_time_for_two_retries)

def initialize_client_with_connection_urllib_retry_config(self, retries):
retry_policy = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=0.3,
status_forcelist=(500, 502, 504)
)
start_time = time.time()
try:
cosmos_client.CosmosClient(
"https://localhost:9999",
CRUDTests.masterKey,
"Session",
connection_retry_policy=retry_policy)
self.fail()
except ServiceRequestError as e:
end_time = time.time()
return end_time - start_time

def initialize_client_with_connection_core_retry_config(self, retries):
start_time = time.time()
try:
cosmos_client.CosmosClient("https://localhost:9999", CRUDTests.masterKey, "Session", connection_policy=connection_policy)
cosmos_client.CosmosClient(
"https://localhost:9999",
CRUDTests.masterKey,
"Session",
retry_total=retries,
retry_read=retries,
retry_connect=retries,
retry_status=retries)
self.fail()
except ServiceRequestError as e:
end_time = time.time()
Expand Down