Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion gcloud/datastore/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from gcloud.datastore import helpers


_MAX_LOOPS = 128
"""Maximum number of iterations to wait for deferred keys."""


def _require_dataset_id(dataset_id=None, first_key=None):
"""Infer a dataset ID from the environment, if not passed explicitly.

Expand Down Expand Up @@ -80,6 +84,85 @@ def _require_connection(connection=None):
return connection


def _extended_lookup(connection, dataset_id, key_pbs,
missing=None, deferred=None,
eventual=False, transaction_id=None):
"""Repeat lookup until all keys found (unless stop requested).

Helper method for :func:`get`.

:type connection: :class:`gcloud.datastore.connection.Connection`
:param connection: The connection used to connect to datastore.

:type dataset_id: string
:param dataset_id: The ID of the dataset of which to make the request.

:type key_pbs: list of :class:`gcloud.datastore._datastore_v1_pb2.Key`
:param key_pbs: The keys to retrieve from the datastore.

:type missing: an empty list or None.
:param missing: If a list is passed, the key-only entity protobufs
returned by the backend as "missing" will be copied
into it. Use only as a keyword param.

:type deferred: an empty list or None.
:param deferred: If a list is passed, the key protobufs returned
by the backend as "deferred" will be copied into it.
Use only as a keyword param.

:type eventual: boolean
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency.

:type transaction_id: string
:param transaction_id: If passed, make the request in the scope of
the given transaction. Incompatible with
``eventual==True``.

:rtype: list of :class:`gcloud.datastore._datastore_v1_pb2.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if missing / deferred are not null or
empty list.
"""
if missing is not None and missing != []:
raise ValueError('missing must be None or an empty list')

if deferred is not None and deferred != []:
raise ValueError('deferred must be None or an empty list')

results = []

loop_num = 0
while loop_num < _MAX_LOOPS: # loop against possible deferred.
loop_num += 1

results_found, missing_found, deferred_found = connection.lookup(
dataset_id=dataset_id,
key_pbs=key_pbs,
eventual=eventual,
transaction_id=transaction_id,
)

results.extend(results_found)

if missing is not None:
missing.extend(missing_found)

if deferred is not None:
deferred.extend(deferred_found)
break

if len(deferred_found) == 0:
break

# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
key_pbs = deferred_found

return results


def get(keys, missing=None, deferred=None, connection=None, dataset_id=None):
"""Retrieves entities, along with their attributes.

Expand Down Expand Up @@ -122,7 +205,8 @@ def get(keys, missing=None, deferred=None, connection=None, dataset_id=None):

transaction = Transaction.current()

entity_pbs = connection.lookup(
entity_pbs = _extended_lookup(
connection,
dataset_id=dataset_id,
key_pbs=[k.to_protobuf() for k in keys],
missing=missing,
Expand Down
88 changes: 14 additions & 74 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def build_api_url(cls, dataset_id, method, base_url=None,
dataset_id=dataset_id, method=method)

def lookup(self, dataset_id, key_pbs,
missing=None, deferred=None,
eventual=False, transaction_id=None):
"""Lookup keys from a dataset in the Cloud Datastore.

Expand Down Expand Up @@ -150,16 +149,6 @@ def lookup(self, dataset_id, key_pbs,
:type key_pbs: list of :class:`gcloud.datastore._datastore_v1_pb2.Key`
:param key_pbs: The keys to retrieve from the datastore.

:type missing: an empty list or None.
:param missing: If a list is passed, the key-only entity protobufs
returned by the backend as "missing" will be copied
into it. Use only as a keyword param.

:type deferred: an empty list or None.
:param deferred: If a list is passed, the key protobufs returned
by the backend as "deferred" will be copied into it.
Use only as a keyword param.

:type eventual: boolean
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
Expand All @@ -170,35 +159,24 @@ def lookup(self, dataset_id, key_pbs,
the given transaction. Incompatible with
``eventual==True``.

:rtype: list of :class:`gcloud.datastore._datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
:raises: ValueError if ``eventual`` is True
:rtype: tuple
:returns: A triple of (``results``, ``missing``, ``deferred``) where
both ``results`` and ``missing`` are lists of
:class:`gcloud.datastore._datastore_v1_pb2.Entity` and
``deferred`` is a list of
:class:`gcloud.datastore._datastore_v1_pb2.Key`.
"""
if missing is not None and missing != []:
raise ValueError('missing must be None or an empty list')

if deferred is not None and deferred != []:
raise ValueError('deferred must be None or an empty list')

lookup_request = datastore_pb.LookupRequest()
_set_read_options(lookup_request, eventual, transaction_id)
helpers._add_keys_to_request(lookup_request.key, key_pbs)

results, missing_found, deferred_found = self._lookup(
lookup_request, dataset_id, deferred is not None)

if missing is not None:
missing.extend(missing_found)
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

if deferred is not None:
deferred.extend(deferred_found)
results = [result.entity for result in lookup_response.found]
missing = [result.entity for result in lookup_response.missing]

return results
return results, missing, list(lookup_response.deferred)

def run_query(self, dataset_id, query_pb, namespace=None,
eventual=False, transaction_id=None):
Expand Down Expand Up @@ -376,41 +354,14 @@ def allocate_ids(self, dataset_id, key_pbs):
datastore_pb.AllocateIdsResponse)
return list(response.key)

def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
"""Repeat lookup until all keys found (unless stop requested).

Helper method for ``lookup()``.
"""
results = []
missing = []
deferred = []
while True: # loop against possible deferred.
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

results.extend(
[result.entity for result in lookup_response.found])

missing.extend(
[result.entity for result in lookup_response.missing])

if stop_on_deferred:
deferred.extend([key for key in lookup_response.deferred])
break

if not lookup_response.deferred:
break

# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
_copy_deferred_keys(lookup_request, lookup_response)
return results, missing, deferred


def _set_read_options(request, eventual, transaction_id):
"""Validate rules for read options, and assign to the request.

Helper method for ``lookup()`` and ``run_query``.

:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if eventual and (transaction_id is not None):
raise ValueError('eventual must be False when in a transaction')
Expand All @@ -420,14 +371,3 @@ def _set_read_options(request, eventual, transaction_id):
opts.read_consistency = datastore_pb.ReadOptions.EVENTUAL
elif transaction_id:
opts.transaction = transaction_id


def _copy_deferred_keys(lookup_request, lookup_response):
"""Clear requested keys and copy deferred keys back in.

Helper for ``Connection.lookup()``.
"""
for old_key in list(lookup_request.key):
lookup_request.key.remove(old_key)
for def_key in lookup_response.deferred:
lookup_request.key.add().CopyFrom(def_key)
Loading