diff --git a/gcloud/datastore/api.py b/gcloud/datastore/api.py index dca79c12aaeb..7e0cb3a402e0 100644 --- a/gcloud/datastore/api.py +++ b/gcloud/datastore/api.py @@ -24,6 +24,10 @@ from gcloud.datastore import helpers +_MAX_LOOPS = 128 +"""Maximum number of iterations to wait for deferred keys.""" + + def _require_dataset_id(dataset_id=None, first_key=None): """Infer a dataset ID from the environment, if not passed explicitly. @@ -80,6 +84,85 @@ def _require_connection(connection=None): return connection +def _extended_lookup(connection, dataset_id, key_pbs, + missing=None, deferred=None, + eventual=False, transaction_id=None): + """Repeat lookup until all keys found (unless stop requested). + + Helper method for :func:`get`. + + :type connection: :class:`gcloud.datastore.connection.Connection` + :param connection: The connection used to connect to datastore. + + :type dataset_id: string + :param dataset_id: The ID of the dataset of which to make the request. + + :type key_pbs: list of :class:`gcloud.datastore._datastore_v1_pb2.Key` + :param key_pbs: The keys to retrieve from the datastore. + + :type missing: an empty list or None. + :param missing: If a list is passed, the key-only entity protobufs + returned by the backend as "missing" will be copied + into it. Use only as a keyword param. + + :type deferred: an empty list or None. + :param deferred: If a list is passed, the key protobufs returned + by the backend as "deferred" will be copied into it. + Use only as a keyword param. + + :type eventual: boolean + :param eventual: If False (the default), request ``STRONG`` read + consistency. If True, request ``EVENTUAL`` read + consistency. + + :type transaction_id: string + :param transaction_id: If passed, make the request in the scope of + the given transaction. Incompatible with + ``eventual==True``. + + :rtype: list of :class:`gcloud.datastore._datastore_v1_pb2.Entity` + :returns: The requested entities. + :raises: :class:`ValueError` if missing / deferred are not null or + empty list. + """ + if missing is not None and missing != []: + raise ValueError('missing must be None or an empty list') + + if deferred is not None and deferred != []: + raise ValueError('deferred must be None or an empty list') + + results = [] + + loop_num = 0 + while loop_num < _MAX_LOOPS: # loop against possible deferred. + loop_num += 1 + + results_found, missing_found, deferred_found = connection.lookup( + dataset_id=dataset_id, + key_pbs=key_pbs, + eventual=eventual, + transaction_id=transaction_id, + ) + + results.extend(results_found) + + if missing is not None: + missing.extend(missing_found) + + if deferred is not None: + deferred.extend(deferred_found) + break + + if len(deferred_found) == 0: + break + + # We have deferred keys, and the user didn't ask to know about + # them, so retry (but only with the deferred ones). + key_pbs = deferred_found + + return results + + def get(keys, missing=None, deferred=None, connection=None, dataset_id=None): """Retrieves entities, along with their attributes. @@ -122,7 +205,8 @@ def get(keys, missing=None, deferred=None, connection=None, dataset_id=None): transaction = Transaction.current() - entity_pbs = connection.lookup( + entity_pbs = _extended_lookup( + connection, dataset_id=dataset_id, key_pbs=[k.to_protobuf() for k in keys], missing=missing, diff --git a/gcloud/datastore/connection.py b/gcloud/datastore/connection.py index 673e9374fe53..c262a53e9b62 100644 --- a/gcloud/datastore/connection.py +++ b/gcloud/datastore/connection.py @@ -122,7 +122,6 @@ def build_api_url(cls, dataset_id, method, base_url=None, dataset_id=dataset_id, method=method) def lookup(self, dataset_id, key_pbs, - missing=None, deferred=None, eventual=False, transaction_id=None): """Lookup keys from a dataset in the Cloud Datastore. @@ -150,16 +149,6 @@ def lookup(self, dataset_id, key_pbs, :type key_pbs: list of :class:`gcloud.datastore._datastore_v1_pb2.Key` :param key_pbs: The keys to retrieve from the datastore. - :type missing: an empty list or None. - :param missing: If a list is passed, the key-only entity protobufs - returned by the backend as "missing" will be copied - into it. Use only as a keyword param. - - :type deferred: an empty list or None. - :param deferred: If a list is passed, the key protobufs returned - by the backend as "deferred" will be copied into it. - Use only as a keyword param. - :type eventual: boolean :param eventual: If False (the default), request ``STRONG`` read consistency. If True, request ``EVENTUAL`` read @@ -170,35 +159,24 @@ def lookup(self, dataset_id, key_pbs, the given transaction. Incompatible with ``eventual==True``. - :rtype: list of :class:`gcloud.datastore._datastore_v1_pb2.Entity` - (or a single Entity) - :returns: The entities corresponding to the keys provided. - If a single key was provided and no results matched, - this will return None. - If multiple keys were provided and no results matched, - this will return an empty list. - :raises: ValueError if ``eventual`` is True + :rtype: tuple + :returns: A triple of (``results``, ``missing``, ``deferred``) where + both ``results`` and ``missing`` are lists of + :class:`gcloud.datastore._datastore_v1_pb2.Entity` and + ``deferred`` is a list of + :class:`gcloud.datastore._datastore_v1_pb2.Key`. """ - if missing is not None and missing != []: - raise ValueError('missing must be None or an empty list') - - if deferred is not None and deferred != []: - raise ValueError('deferred must be None or an empty list') - lookup_request = datastore_pb.LookupRequest() _set_read_options(lookup_request, eventual, transaction_id) helpers._add_keys_to_request(lookup_request.key, key_pbs) - results, missing_found, deferred_found = self._lookup( - lookup_request, dataset_id, deferred is not None) - - if missing is not None: - missing.extend(missing_found) + lookup_response = self._rpc(dataset_id, 'lookup', lookup_request, + datastore_pb.LookupResponse) - if deferred is not None: - deferred.extend(deferred_found) + results = [result.entity for result in lookup_response.found] + missing = [result.entity for result in lookup_response.missing] - return results + return results, missing, list(lookup_response.deferred) def run_query(self, dataset_id, query_pb, namespace=None, eventual=False, transaction_id=None): @@ -376,41 +354,14 @@ def allocate_ids(self, dataset_id, key_pbs): datastore_pb.AllocateIdsResponse) return list(response.key) - def _lookup(self, lookup_request, dataset_id, stop_on_deferred): - """Repeat lookup until all keys found (unless stop requested). - - Helper method for ``lookup()``. - """ - results = [] - missing = [] - deferred = [] - while True: # loop against possible deferred. - lookup_response = self._rpc(dataset_id, 'lookup', lookup_request, - datastore_pb.LookupResponse) - - results.extend( - [result.entity for result in lookup_response.found]) - - missing.extend( - [result.entity for result in lookup_response.missing]) - - if stop_on_deferred: - deferred.extend([key for key in lookup_response.deferred]) - break - - if not lookup_response.deferred: - break - - # We have deferred keys, and the user didn't ask to know about - # them, so retry (but only with the deferred ones). - _copy_deferred_keys(lookup_request, lookup_response) - return results, missing, deferred - def _set_read_options(request, eventual, transaction_id): """Validate rules for read options, and assign to the request. Helper method for ``lookup()`` and ``run_query``. + + :raises: :class:`ValueError` if ``eventual`` is ``True`` and the + ``transaction_id`` is not ``None``. """ if eventual and (transaction_id is not None): raise ValueError('eventual must be False when in a transaction') @@ -420,14 +371,3 @@ def _set_read_options(request, eventual, transaction_id): opts.read_consistency = datastore_pb.ReadOptions.EVENTUAL elif transaction_id: opts.transaction = transaction_id - - -def _copy_deferred_keys(lookup_request, lookup_response): - """Clear requested keys and copy deferred keys back in. - - Helper for ``Connection.lookup()``. - """ - for old_key in list(lookup_request.key): - lookup_request.key.remove(old_key) - for def_key in lookup_response.deferred: - lookup_request.key.add().CopyFrom(def_key) diff --git a/gcloud/datastore/test_api.py b/gcloud/datastore/test_api.py index abf238d796a0..605d6acf6a9f 100644 --- a/gcloud/datastore/test_api.py +++ b/gcloud/datastore/test_api.py @@ -220,6 +220,7 @@ def test_miss_wo_dataset_id(self): 'dataset_id': DATASET_ID, 'key_pbs': [key.to_protobuf()], 'transaction_id': None, + 'eventual': False, } self.assertEqual(connection._called_with, expected) @@ -251,6 +252,30 @@ def test_miss_w_missing(self): self.assertEqual([missed.key.to_protobuf() for missed in missing], [key.to_protobuf()]) + def test_w_missing_non_empty(self): + from gcloud.datastore.key import Key + + DATASET_ID = 'DATASET' + CONNECTION = object() + key = Key('Kind', 1234, dataset_id=DATASET_ID) + + missing = ['this', 'list', 'is', 'not', 'empty'] + self.assertRaises(ValueError, self._callFUT, + [key], connection=CONNECTION, + missing=missing) + + def test_w_deferred_non_empty(self): + from gcloud.datastore.key import Key + + DATASET_ID = 'DATASET' + CONNECTION = object() + key = Key('Kind', 1234, dataset_id=DATASET_ID) + + deferred = ['this', 'list', 'is', 'not', 'empty'] + self.assertRaises(ValueError, self._callFUT, + [key], connection=CONNECTION, + deferred=deferred) + def test_miss_w_deferred(self): from gcloud.datastore.key import Key from gcloud.datastore.test_connection import _Connection @@ -269,6 +294,94 @@ def test_miss_w_deferred(self): self.assertEqual([def_key.to_protobuf() for def_key in deferred], [key.to_protobuf()]) + def _verifyProtobufCall(self, called_with, URI, conn): + self.assertEqual(called_with['uri'], URI) + self.assertEqual(called_with['method'], 'POST') + self.assertEqual(called_with['headers']['Content-Type'], + 'application/x-protobuf') + self.assertEqual(called_with['headers']['User-Agent'], + conn.USER_AGENT) + + def test_w_deferred_from_backend_but_not_passed(self): + from gcloud.datastore import _datastore_v1_pb2 as datastore_pb + from gcloud.datastore.connection import Connection + from gcloud.datastore.key import Key + from gcloud.datastore import test_connection + + # Shortening name, import line above was too long. + cmp_key_after_req = test_connection._compare_key_pb_after_request + + DATASET_ID = 'DATASET' + key1 = Key('Kind', dataset_id=DATASET_ID) + key2 = Key('Kind', 2345, dataset_id=DATASET_ID) + key_pb1 = key1.to_protobuf() + key_pb2 = key2.to_protobuf() + + # Build mock first response. + rsp_pb1 = datastore_pb.LookupResponse() + entity1 = datastore_pb.Entity() + entity1.key.CopyFrom(key_pb1) + # Add the entity to the "found" part of the response. + rsp_pb1.found.add(entity=entity1) + # Add the second key to the "deferred" part of the response. + rsp_pb1.deferred.add().CopyFrom(key_pb2) + + # Build mock second response. + rsp_pb2 = datastore_pb.LookupResponse() + # Add in entity that was deferred. + entity2 = datastore_pb.Entity() + entity2.key.CopyFrom(key_pb2) + rsp_pb2.found.add(entity=entity2) + + conn = Connection() + # Add mock http object to connection with response from above. + http = conn._http = _HttpMultiple( + ({'status': '200'}, rsp_pb1.SerializeToString()), + ({'status': '200'}, rsp_pb2.SerializeToString()), + ) + + missing = [] + found = self._callFUT([key1, key2], missing=missing, connection=conn) + self.assertEqual(len(found), 2) + self.assertEqual(len(missing), 0) + + # Check the actual contents on the response. + self.assertEqual(found[0].key.path, key1.path) + self.assertEqual(found[0].key.dataset_id, key1.dataset_id) + self.assertEqual(found[1].key.path, key2.path) + self.assertEqual(found[1].key.dataset_id, key2.dataset_id) + + # Check that our http object was called correctly. + cw = http._called_with + rq_class = datastore_pb.LookupRequest + request = rq_class() + self.assertEqual(len(cw), 2) + + # Make URI to check for requests. + URI = '/'.join([ + conn.API_BASE_URL, + 'datastore', + conn.API_VERSION, + 'datasets', + DATASET_ID, + 'lookup', + ]) + + # Make sure the first called with argument checks out. + self._verifyProtobufCall(cw[0], URI, conn) + request.ParseFromString(cw[0]['body']) + keys = list(request.key) + self.assertEqual(len(keys), 2) + cmp_key_after_req(self, key_pb1, keys[0]) + cmp_key_after_req(self, key_pb2, keys[1]) + + # Make sure the second called with argument checks out. + self._verifyProtobufCall(cw[1], URI, conn) + request.ParseFromString(cw[1]['body']) + keys = list(request.key) + self.assertEqual(len(keys), 1) + cmp_key_after_req(self, key_pb2, keys[0]) + def test_hit(self): from gcloud.datastore.key import Key from gcloud.datastore.test_connection import _Connection @@ -366,6 +479,7 @@ def test_implicit_wo_transaction(self): 'dataset_id': DATASET_ID, 'key_pbs': [key.to_protobuf()], 'transaction_id': None, + 'eventual': False, } self.assertEqual(CUSTOM_CONNECTION._called_with, expected_called_with) @@ -403,6 +517,7 @@ def test_w_transaction(self): 'dataset_id': DATASET_ID, 'key_pbs': [key.to_protobuf()], 'transaction_id': TRANSACTION, + 'eventual': False, } self.assertEqual(CUSTOM_CONNECTION._called_with, expected_called_with) @@ -414,6 +529,37 @@ def test_w_transaction(self): self.assertEqual(list(result), ['foo']) self.assertEqual(result['foo'], 'Foo') + def test_max_loops(self): + from gcloud._testing import _Monkey + from gcloud.datastore import api + from gcloud.datastore.key import Key + from gcloud.datastore.test_connection import _Connection + + DATASET_ID = 'DATASET' + KIND = 'Kind' + ID = 1234 + + # Make a found entity pb to be returned from mock backend. + entity_pb = self._make_entity_pb(DATASET_ID, KIND, ID, + 'foo', 'Foo') + + # Make a connection to return the entity pb. + connection = _Connection(entity_pb) + + key = Key(KIND, ID, dataset_id=DATASET_ID) + deferred = [] + missing = [] + with _Monkey(api, _MAX_LOOPS=-1): + result = self._callFUT([key], missing=missing, deferred=deferred, + connection=connection, + dataset_id=DATASET_ID) + + # Make sure we have no results, even though the connection has been + # set up as in `test_hit` to return a single result. + self.assertEqual(result, []) + self.assertEqual(missing, []) + self.assertEqual(deferred, []) + class Test_put_function(unittest2.TestCase): @@ -785,3 +931,15 @@ def __enter__(self): def __exit__(self, *args): from gcloud.datastore.batch import _BATCHES _BATCHES.pop() + + +class _HttpMultiple(object): + + def __init__(self, *responses): + self._called_with = [] + self._responses = list(responses) + + def request(self, **kw): + self._called_with.append(kw) + result, self._responses = self._responses[0], self._responses[1:] + return result diff --git a/gcloud/datastore/test_connection.py b/gcloud/datastore/test_connection.py index 0523599f6ce8..49425a5fb04a 100644 --- a/gcloud/datastore/test_connection.py +++ b/gcloud/datastore/test_connection.py @@ -194,8 +194,10 @@ def test_lookup_single_key_empty_response(self): 'lookup', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - found = conn.lookup(DATASET_ID, [key_pb]) + found, missing, deferred = conn.lookup(DATASET_ID, [key_pb]) self.assertEqual(len(found), 0) + self.assertEqual(len(missing), 0) + self.assertEqual(len(deferred), 0) cw = http._called_with self._verifyProtobufCall(cw, URI, conn) rq_class = datastore_pb.LookupRequest @@ -221,8 +223,11 @@ def test_lookup_single_key_empty_response_w_eventual(self): 'lookup', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - found = conn.lookup(DATASET_ID, [key_pb], eventual=True) + found, missing, deferred = conn.lookup(DATASET_ID, [key_pb], + eventual=True) self.assertEqual(len(found), 0) + self.assertEqual(len(missing), 0) + self.assertEqual(len(deferred), 0) cw = http._called_with self._verifyProtobufCall(cw, URI, conn) rq_class = datastore_pb.LookupRequest @@ -260,8 +265,11 @@ def test_lookup_single_key_empty_response_w_transaction(self): 'lookup', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - found = conn.lookup(DATASET_ID, [key_pb], transaction_id=TRANSACTION) + found, missing, deferred = conn.lookup(DATASET_ID, [key_pb], + transaction_id=TRANSACTION) self.assertEqual(len(found), 0) + self.assertEqual(len(missing), 0) + self.assertEqual(len(deferred), 0) cw = http._called_with self._verifyProtobufCall(cw, URI, conn) rq_class = datastore_pb.LookupRequest @@ -291,7 +299,9 @@ def test_lookup_single_key_nonempty_response(self): 'lookup', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - found, = conn.lookup(DATASET_ID, [key_pb]) + (found,), missing, deferred = conn.lookup(DATASET_ID, [key_pb]) + self.assertEqual(len(missing), 0) + self.assertEqual(len(deferred), 0) self.assertEqual(found.key.path_element[0].kind, 'Kind') self.assertEqual(found.key.path_element[0].id, 1234) cw = http._called_with @@ -320,7 +330,10 @@ def test_lookup_multiple_keys_empty_response(self): 'lookup', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - self.assertEqual(conn.lookup(DATASET_ID, [key_pb1, key_pb2]), []) + found, missing, deferred = conn.lookup(DATASET_ID, [key_pb1, key_pb2]) + self.assertEqual(len(found), 0) + self.assertEqual(len(missing), 0) + self.assertEqual(len(deferred), 0) cw = http._called_with self._verifyProtobufCall(cw, URI, conn) rq_class = datastore_pb.LookupRequest @@ -352,9 +365,9 @@ def test_lookup_multiple_keys_w_missing(self): 'lookup', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - missing = [] - result = conn.lookup(DATASET_ID, [key_pb1, key_pb2], missing=missing) + result, missing, deferred = conn.lookup(DATASET_ID, [key_pb1, key_pb2]) self.assertEqual(result, []) + self.assertEqual(len(deferred), 0) self.assertEqual([missed.key for missed in missing], [key_pb1, key_pb2]) cw = http._called_with @@ -367,16 +380,6 @@ def test_lookup_multiple_keys_w_missing(self): _compare_key_pb_after_request(self, key_pb1, keys[0]) _compare_key_pb_after_request(self, key_pb2, keys[1]) - def test_lookup_multiple_keys_w_missing_non_empty(self): - DATASET_ID = 'DATASET' - key_pb1 = self._make_key_pb(DATASET_ID) - key_pb2 = self._make_key_pb(DATASET_ID, id=2345) - conn = self._makeOne() - missing = ['this', 'list', 'is', 'not', 'empty'] - self.assertRaises( - ValueError, - conn.lookup, DATASET_ID, [key_pb1, key_pb2], missing=missing) - def test_lookup_multiple_keys_w_deferred(self): from gcloud.datastore import _datastore_v1_pb2 as datastore_pb @@ -396,9 +399,9 @@ def test_lookup_multiple_keys_w_deferred(self): 'lookup', ]) http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) - deferred = [] - result = conn.lookup(DATASET_ID, [key_pb1, key_pb2], deferred=deferred) + result, missing, deferred = conn.lookup(DATASET_ID, [key_pb1, key_pb2]) self.assertEqual(result, []) + self.assertEqual(len(missing), 0) self.assertEqual([def_key for def_key in deferred], [key_pb1, key_pb2]) cw = http._called_with self._verifyProtobufCall(cw, URI, conn) @@ -415,68 +418,6 @@ def test_lookup_multiple_keys_w_deferred(self): _compare_key_pb_after_request(self, key_pb1, keys[0]) _compare_key_pb_after_request(self, key_pb2, keys[1]) - def test_lookup_multiple_keys_w_deferred_non_empty(self): - DATASET_ID = 'DATASET' - key_pb1 = self._make_key_pb(DATASET_ID) - key_pb2 = self._make_key_pb(DATASET_ID, id=2345) - conn = self._makeOne() - deferred = ['this', 'list', 'is', 'not', 'empty'] - self.assertRaises( - ValueError, - conn.lookup, DATASET_ID, [key_pb1, key_pb2], deferred=deferred) - - def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self): - from gcloud.datastore import _datastore_v1_pb2 as datastore_pb - - DATASET_ID = 'DATASET' - key_pb1 = self._make_key_pb(DATASET_ID) - key_pb2 = self._make_key_pb(DATASET_ID, id=2345) - rsp_pb1 = datastore_pb.LookupResponse() - entity1 = datastore_pb.Entity() - entity1.key.CopyFrom(key_pb1) - rsp_pb1.found.add(entity=entity1) - rsp_pb1.deferred.add().CopyFrom(key_pb2) - rsp_pb2 = datastore_pb.LookupResponse() - entity2 = datastore_pb.Entity() - entity2.key.CopyFrom(key_pb2) - rsp_pb2.found.add(entity=entity2) - conn = self._makeOne() - URI = '/'.join([ - conn.API_BASE_URL, - 'datastore', - conn.API_VERSION, - 'datasets', - DATASET_ID, - 'lookup', - ]) - http = conn._http = HttpMultiple( - ({'status': '200'}, rsp_pb1.SerializeToString()), - ({'status': '200'}, rsp_pb2.SerializeToString()), - ) - found = conn.lookup(DATASET_ID, [key_pb1, key_pb2]) - self.assertEqual(len(found), 2) - self.assertEqual(found[0].key.path_element[0].kind, 'Kind') - self.assertEqual(found[0].key.path_element[0].id, 1234) - self.assertEqual(found[1].key.path_element[0].kind, 'Kind') - self.assertEqual(found[1].key.path_element[0].id, 2345) - cw = http._called_with - rq_class = datastore_pb.LookupRequest - request = rq_class() - self.assertEqual(len(cw), 2) - - self._verifyProtobufCall(cw[0], URI, conn) - request.ParseFromString(cw[0]['body']) - keys = list(request.key) - self.assertEqual(len(keys), 2) - _compare_key_pb_after_request(self, key_pb1, keys[0]) - _compare_key_pb_after_request(self, key_pb2, keys[1]) - - self._verifyProtobufCall(cw[1], URI, conn) - request.ParseFromString(cw[1]['body']) - keys = list(request.key) - self.assertEqual(len(keys), 1) - self.assertEqual(keys[0], key_pb2) - def test_run_query_w_eventual_no_transaction(self): from gcloud.datastore import _datastore_v1_pb2 as datastore_pb @@ -855,18 +796,6 @@ def request(self, **kw): return self._response, self._content -class HttpMultiple(object): - - def __init__(self, *responses): - self._called_with = [] - self._responses = list(responses) - - def request(self, **kw): - self._called_with.append(kw) - result, self._responses = self._responses[0], self._responses[1:] - return result - - def _compare_key_pb_after_request(test, key_before, key_after): test.assertFalse(key_after.partition_id.HasField('dataset_id')) test.assertEqual(key_before.partition_id.namespace, @@ -886,13 +815,7 @@ def __init__(self, *result): def lookup(self, **kw): self._called_with = kw - missing = kw.pop('missing', None) - if missing is not None: - missing.extend(self._missing) - deferred = kw.pop('deferred', None) - if deferred is not None: - deferred.extend(self._deferred) - return self._result + return self._result, self._missing[:], self._deferred[:] def allocate_ids(self, dataset_id, key_pbs): self._called_dataset_id = dataset_id