Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4c4b078
make consistent_hash_ring private
bryevdv Jul 11, 2019
038db50
make default_retry_policy private
bryevdv Jul 11, 2019
51d4466
make endpoint_discovery_retry_policy private
bryevdv Jul 11, 2019
b1719fe
make hash_partition_resolver private
bryevdv Jul 11, 2019
46e7fed
make location_cache private
bryevdv Jul 11, 2019
bff6cc9
make murmur_hash private
bryevdv Jul 11, 2019
0e100af
make range private
bryevdv Jul 11, 2019
3d2e65c
make range_partition_resolver private
bryevdv Jul 11, 2019
3e43f69
make vector_session_token private
bryevdv Jul 16, 2019
60ad7fb
make resource_throttle_retry_policy private
bryevdv Jul 16, 2019
68fd7a9
make retry_utility private
bryevdv Jul 16, 2019
8e9274c
make utils private
bryevdv Jul 16, 2019
6dad678
make routing private
bryevdv Jul 17, 2019
6b1a641
make execution_context private
bryevdv Jul 18, 2019
ce36df2
make cosmos_client_connection private
bryevdv Jul 18, 2019
2063355
make retry_options private
bryevdv Jul 18, 2019
8e84029
make query_iterable private
bryevdv Jul 18, 2019
eedb532
make constants private
bryevdv Jul 19, 2019
56c074a
make synchronized_request private
bryevdv Jul 19, 2019
109e496
make session_retry_policy private
bryevdv Jul 19, 2019
c94dc6d
make partition private
bryevdv Jul 19, 2019
4502740
make global_endpoint_manager private
bryevdv Jul 19, 2019
0a60acf
make runtime_constants private
bryevdv Jul 19, 2019
4fb54ac
make session private
bryevdv Jul 19, 2019
2a41dfc
make request_object private
bryevdv Jul 19, 2019
444d5d5
make base private
bryevdv Jul 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from . import partition


class _ConsistentHashRing(object):
class ConsistentHashRing(object):
"""The ConsistentHashRing class implements a consistent hash ring using the
hash generator specified.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""
from . import http_constants

class _DefaultRetryPolicy(object):
class DefaultRetryPolicy(object):

error_codes = http_constants._ErrorCodes
CONNECTION_ERROR_CODES = [
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(self, *args):
self.args = args

def needsRetry(self, error_code):
if error_code in _DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if (len(self.args) > 0):
if (self.args[4]['method'] == 'GET') or (http_constants.HttpHeaders.IsQuery in self.args[4]['headers']):
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
logger.addHandler(log_handler)


class _EndpointDiscoveryRetryPolicy(object):
class EndpointDiscoveryRetryPolicy(object):
"""The endpoint discovery retry policy class used for geo-replicated database accounts
to handle the write forbidden exceptions due to writable/readable location changes
(say, after a failover).
Expand All @@ -44,9 +44,9 @@ class _EndpointDiscoveryRetryPolicy(object):

def __init__(self, connection_policy, global_endpoint_manager, *args):
self.global_endpoint_manager = global_endpoint_manager
self._max_retry_attempt_count = _EndpointDiscoveryRetryPolicy.Max_retry_attempt_count
self._max_retry_attempt_count = EndpointDiscoveryRetryPolicy.Max_retry_attempt_count
self.failover_retry_count = 0
self.retry_after_in_milliseconds = _EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
self.retry_after_in_milliseconds = EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
self.connection_policy = connection_policy
self.request = args[0] if args else None
#clear previous location-based routing directive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
"""Hash partition resolver implementation in the Azure Cosmos database service.
"""

from . import murmur_hash
from . import consistent_hash_ring
from . import _murmur_hash
from . import _consistent_hash_ring

class HashPartitionResolver(object):
"""HashPartitionResolver implements partitioning based on the value of a hash function, allowing you to evenly
Expand Down Expand Up @@ -51,9 +51,9 @@ def __init__(self, partition_key_extractor, collection_links, default_number_of_
self.collection_links = collection_links

if hash_generator is None:
hash_generator = murmur_hash._MurmurHash()
hash_generator = _murmur_hash.MurmurHash()

self.consistent_hash_ring = consistent_hash_ring._ConsistentHashRing(self.collection_links, default_number_of_virtual_nodes_per_collection, hash_generator)
self.consistent_hash_ring = _consistent_hash_ring.ConsistentHashRing(self.collection_links, default_number_of_virtual_nodes_per_collection, hash_generator)

def ResolveForCreate(self, document):
"""Resolves the collection for creating the document based on the partition key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

https://pypi.python.org/pypi/mmh3/2.0
'''
class _MurmurHash(object):
class MurmurHash(object):
""" The 32 bit x86 version of MurmurHash3 implementation.
"""
def ComputeHash(self, key):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from . import http_constants

class _ResourceThrottleRetryPolicy(object):
class ResourceThrottleRetryPolicy(object):

def __init__(self, max_retry_attempt_count, fixed_retry_interval_in_milliseconds, max_wait_time_in_seconds):
self._max_retry_attempt_count = max_retry_attempt_count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import time

from . import errors
from . import endpoint_discovery_retry_policy
from . import resource_throttle_retry_policy
from . import default_retry_policy
from . import _endpoint_discovery_retry_policy
from . import _resource_throttle_retry_policy
from . import _default_retry_policy
from . import session_retry_policy
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes

def _Execute(client, global_endpoint_manager, function, *args, **kwargs):
def Execute(client, global_endpoint_manager, function, *args, **kwargs):
"""Exectutes the function with passed parameters applying all retry policies

:param object client:
Expand All @@ -45,20 +45,20 @@ def _Execute(client, global_endpoint_manager, function, *args, **kwargs):

"""
# instantiate all retry policies here to be applied for each request execution
endpointDiscovery_retry_policy = endpoint_discovery_retry_policy._EndpointDiscoveryRetryPolicy(client.connection_policy, global_endpoint_manager, *args)
endpointDiscovery_retry_policy = _endpoint_discovery_retry_policy.EndpointDiscoveryRetryPolicy(client.connection_policy, global_endpoint_manager, *args)

resourceThrottle_retry_policy = resource_throttle_retry_policy._ResourceThrottleRetryPolicy(client.connection_policy.RetryOptions.MaxRetryAttemptCount,
resourceThrottle_retry_policy = _resource_throttle_retry_policy.ResourceThrottleRetryPolicy(client.connection_policy.RetryOptions.MaxRetryAttemptCount,
client.connection_policy.RetryOptions.FixedRetryIntervalInMilliseconds,
client.connection_policy.RetryOptions.MaxWaitTimeInSeconds)
defaultRetry_policy = default_retry_policy._DefaultRetryPolicy(*args)
defaultRetry_policy = _default_retry_policy.DefaultRetryPolicy(*args)

sessionRetry_policy = session_retry_policy._SessionRetryPolicy(client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, *args)
while True:
try:
if args:
result = _ExecuteFunction(function, global_endpoint_manager, *args, **kwargs)
result = ExecuteFunction(function, global_endpoint_manager, *args, **kwargs)
else:
result = _ExecuteFunction(function, *args, **kwargs)
result = ExecuteFunction(function, *args, **kwargs)
if not client.last_response_headers:
client.last_response_headers = {}

Expand Down Expand Up @@ -94,7 +94,7 @@ def _Execute(client, global_endpoint_manager, function, *args, **kwargs):
# Wait for retry_after_in_milliseconds time before the next retry
time.sleep(retry_policy.retry_after_in_milliseconds / 1000.0)

def _ExecuteFunction(function, *args, **kwargs):
def ExecuteFunction(function, *args, **kwargs):
""" Stub method so that it can be used for mocking purposes as well.
"""
return function(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""

from collections import deque
from .. import retry_utility
from .. import _retry_utility
from .. import http_constants
from .. import base

Expand Down Expand Up @@ -140,7 +140,7 @@ def _fetch_items_helper_with_retries(self, fetch_function):
def callback():
return self._fetch_items_helper_no_retries(fetch_function)

return retry_utility._Execute(self._client, self._client._global_endpoint_manager, callback)
return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)


class _DefaultQueryExecutionContext(_QueryExecutionContextBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import threading
from . import constants
from . import errors
from .location_cache import LocationCache
from ._location_cache import LocationCache

class _GlobalEndpointManager(object):
"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from . import base
from . import http_constants
from .vector_session_token import VectorSessionToken
from ._vector_session_token import VectorSessionToken
from .errors import HTTPFailure

class SessionContainer(object):
Expand Down
4 changes: 2 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/synchronized_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from . import documents
from . import errors
from . import http_constants
from . import retry_utility
from . import _retry_utility

def _IsReadableStream(obj):
"""Checks whether obj is a file-like readable stream.
Expand Down Expand Up @@ -224,5 +224,5 @@ def SynchronizedRequest(client,
request_options['headers'][http_constants.HttpHeaders.ContentLength] = 0

# Pass _Request function with it's parameters to retry_utility's Execute method that wraps the call with retries
return retry_utility._Execute(client, global_endpoint_manager, _Request, request, connection_policy, requests_session, path, request_options, request_body)
return _retry_utility.Execute(client, global_endpoint_manager, _Request, request, connection_policy, requests_session, path, request_options, request_body)

50 changes: 25 additions & 25 deletions sdk/cosmos/azure-cosmos/test/crud_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@
import urllib.parse as urllib
import uuid
import pytest
import azure.cosmos.consistent_hash_ring as consistent_hash_ring
from azure.cosmos import _consistent_hash_ring
import azure.cosmos.documents as documents
import azure.cosmos.errors as errors
from azure.cosmos.http_constants import HttpHeaders, StatusCodes, SubStatusCodes
import azure.cosmos.murmur_hash as murmur_hash
import azure.cosmos._murmur_hash as _murmur_hash
import test_config
import azure.cosmos.base as base
import azure.cosmos.cosmos_client as cosmos_client
from azure.cosmos.diagnostics import RecordDiagnostics
from azure.cosmos.partition_key import PartitionKey
import conftest
import azure.cosmos.retry_utility as retry_utility
from azure.cosmos import _retry_utility

pytestmark = pytest.mark.cosmosEmulator

Expand Down Expand Up @@ -298,11 +298,11 @@ def test_partitioned_collection_partition_key_extraction(self):
}
}

self.OriginalExecuteFunction = retry_utility._ExecuteFunction
retry_utility._ExecuteFunction = self._MockExecuteFunction
self.OriginalExecuteFunction = _retry_utility.ExecuteFunction
_retry_utility.ExecuteFunction = self._MockExecuteFunction
# create document without partition key being specified
created_document = created_collection.create_item(body=document_definition)
retry_utility._ExecuteFunction = self.OriginalExecuteFunction
_retry_utility.ExecuteFunction = self.OriginalExecuteFunction
self.assertEquals(self.last_headers[1], '["WA"]')
del self.last_headers[:]

Expand All @@ -315,11 +315,11 @@ def test_partitioned_collection_partition_key_extraction(self):
partition_key=PartitionKey(path='/address', kind=documents.PartitionKind.Hash)
)

self.OriginalExecuteFunction = retry_utility._ExecuteFunction
retry_utility._ExecuteFunction = self._MockExecuteFunction
self.OriginalExecuteFunction = _retry_utility.ExecuteFunction
_retry_utility.ExecuteFunction = self._MockExecuteFunction
# Create document with partitionkey not present as a leaf level property but a dict
created_document = created_collection1.create_item(document_definition)
retry_utility._ExecuteFunction = self.OriginalExecuteFunction
_retry_utility.ExecuteFunction = self.OriginalExecuteFunction
self.assertEquals(self.last_headers[1], [{}])
del self.last_headers[:]

Expand All @@ -331,11 +331,11 @@ def test_partitioned_collection_partition_key_extraction(self):
partition_key=PartitionKey(path='/address/state/city', kind=documents.PartitionKind.Hash)
)

self.OriginalExecuteFunction = retry_utility._ExecuteFunction
retry_utility._ExecuteFunction = self._MockExecuteFunction
self.OriginalExecuteFunction = _retry_utility.ExecuteFunction
_retry_utility.ExecuteFunction = self._MockExecuteFunction
# Create document with partitionkey not present in the document
created_document = created_collection2.create_item(document_definition)
retry_utility._ExecuteFunction = self.OriginalExecuteFunction
_retry_utility.ExecuteFunction = self.OriginalExecuteFunction
self.assertEquals(self.last_headers[1], [{}])
del self.last_headers[:]

Expand All @@ -358,10 +358,10 @@ def test_partitioned_collection_partition_key_extraction_special_chars(self):
"level' 1*()": {"le/vel2": 'val1'}
}

self.OriginalExecuteFunction = retry_utility._ExecuteFunction
retry_utility._ExecuteFunction = self._MockExecuteFunction
self.OriginalExecuteFunction = _retry_utility.ExecuteFunction
_retry_utility.ExecuteFunction = self._MockExecuteFunction
created_document = created_collection1.create_item(body=document_definition)
retry_utility._ExecuteFunction = self.OriginalExecuteFunction
_retry_utility.ExecuteFunction = self.OriginalExecuteFunction
self.assertEquals(self.last_headers[1], '["val1"]')
del self.last_headers[:]

Expand All @@ -385,11 +385,11 @@ def test_partitioned_collection_partition_key_extraction_special_chars(self):
'level\" 1*()': {'le/vel2': 'val2'}
}

self.OriginalExecuteFunction = retry_utility._ExecuteFunction
retry_utility._ExecuteFunction = self._MockExecuteFunction
self.OriginalExecuteFunction = _retry_utility.ExecuteFunction
_retry_utility.ExecuteFunction = self._MockExecuteFunction
# create document without partition key being specified
created_document = created_collection2.create_item(body=document_definition)
retry_utility._ExecuteFunction = self.OriginalExecuteFunction
_retry_utility.ExecuteFunction = self.OriginalExecuteFunction
self.assertEquals(self.last_headers[1], '["val2"]')
del self.last_headers[:]

Expand Down Expand Up @@ -851,13 +851,13 @@ def test_murmur_hash(self):
str = 'afdgdd'
bytes = bytearray(str, encoding='utf-8')

hash_value = murmur_hash._MurmurHash._ComputeHash(bytes)
hash_value = _murmur_hash.MurmurHash._ComputeHash(bytes)
self.assertEqual(1099701186, hash_value)

num = 374.0
bytes = bytearray(pack('d', num))

hash_value = murmur_hash._MurmurHash._ComputeHash(bytes)
hash_value = _murmur_hash.MurmurHash._ComputeHash(bytes)
self.assertEqual(3717946798, hash_value)

self._validate_bytes("", 0x1B873593, bytearray(b'\xEE\xA8\xA2\x67'), 1738713326)
Expand All @@ -878,25 +878,25 @@ def test_murmur_hash(self):
3381504877)

def _validate_bytes(self, str, seed, expected_hash_bytes, expected_value):
hash_value = murmur_hash._MurmurHash._ComputeHash(bytearray(str, encoding='utf-8'), seed)
hash_value = _murmur_hash.MurmurHash._ComputeHash(bytearray(str, encoding='utf-8'), seed)
bytes = bytearray(pack('I', hash_value))
self.assertEqual(expected_value, hash_value)
self.assertEqual(expected_hash_bytes, bytes)

def test_get_bytes(self):
actual_bytes = consistent_hash_ring._ConsistentHashRing._GetBytes("documentdb")
actual_bytes = _consistent_hash_ring.ConsistentHashRing._GetBytes("documentdb")
expected_bytes = bytearray(b'\x64\x6F\x63\x75\x6D\x65\x6E\x74\x64\x62')
self.assertEqual(expected_bytes, actual_bytes)

actual_bytes = consistent_hash_ring._ConsistentHashRing._GetBytes("azure")
actual_bytes = _consistent_hash_ring.ConsistentHashRing._GetBytes("azure")
expected_bytes = bytearray(b'\x61\x7A\x75\x72\x65')
self.assertEqual(expected_bytes, actual_bytes)

actual_bytes = consistent_hash_ring._ConsistentHashRing._GetBytes("json")
actual_bytes = _consistent_hash_ring.ConsistentHashRing._GetBytes("json")
expected_bytes = bytearray(b'\x6A\x73\x6F\x6E')
self.assertEqual(expected_bytes, actual_bytes)

actual_bytes = consistent_hash_ring._ConsistentHashRing._GetBytes("nosql")
actual_bytes = _consistent_hash_ring.ConsistentHashRing._GetBytes("nosql")
expected_bytes = bytearray(b'\x6E\x6F\x73\x71\x6C')
self.assertEqual(expected_bytes, actual_bytes)

Expand Down
12 changes: 6 additions & 6 deletions sdk/cosmos/azure-cosmos/test/globaldb_mock_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import azure.cosmos.constants as constants
from azure.cosmos.http_constants import StatusCodes
import azure.cosmos.global_endpoint_manager as global_endpoint_manager
import azure.cosmos.retry_utility as retry_utility
from azure.cosmos import _retry_utility
import test_config

pytestmark = pytest.mark.cosmosEmulator
Expand Down Expand Up @@ -132,7 +132,7 @@ def setUp(self):
# Copying the original objects and functions before assigning the mock versions of them
self.OriginalGetDatabaseAccountStub = global_endpoint_manager._GlobalEndpointManager._GetDatabaseAccountStub
self.OriginalGlobalEndpointManager = global_endpoint_manager._GlobalEndpointManager
self.OriginalExecuteFunction = retry_utility._ExecuteFunction
self.OriginalExecuteFunction = _retry_utility.ExecuteFunction

# Make azure-cosmos use the MockGlobalEndpointManager
global_endpoint_manager._GlobalEndpointManager = MockGlobalEndpointManager
Expand All @@ -141,13 +141,13 @@ def tearDown(self):
# Restoring the original objects and functions
global_endpoint_manager._GlobalEndpointManager = self.OriginalGlobalEndpointManager
global_endpoint_manager._GlobalEndpointManager._GetDatabaseAccountStub = self.OriginalGetDatabaseAccountStub
retry_utility._ExecuteFunction = self.OriginalExecuteFunction
_retry_utility.ExecuteFunction = self.OriginalExecuteFunction

def MockExecuteFunction(self, function, *args, **kwargs):
global location_changed

if self.endpoint_discovery_retry_count == 2:
retry_utility._ExecuteFunction = self.OriginalExecuteFunction
_retry_utility.ExecuteFunction = self.OriginalExecuteFunction
return (json.dumps([{ 'id': 'mock database' }]), None)
else:
self.endpoint_discovery_retry_count += 1
Expand All @@ -158,8 +158,8 @@ def MockGetDatabaseAccountStub(self, endpoint):
raise errors.HTTPFailure(StatusCodes.SERVICE_UNAVAILABLE, "Service unavailable")

def MockCreateDatabase(self, client, database):
self.OriginalExecuteFunction = retry_utility._ExecuteFunction
retry_utility._ExecuteFunction = self.MockExecuteFunction
self.OriginalExecuteFunction = _retry_utility.ExecuteFunction
_retry_utility.ExecuteFunction = self.MockExecuteFunction
client.CreateDatabase(database)

def test_globaldb_endpoint_discovery_retry_policy(self):
Expand Down
Loading