Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions gcloud/datastore/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,55 @@
# limitations under the License.

"""Create / interact with a batch of updates / deletes."""
try:
from threading import local as Local
except ImportError: # pragma: NO COVER (who doesn't have it?)
class Local(object):
"""Placeholder for non-threaded applications."""

from gcloud.datastore import _implicit_environ
from gcloud.datastore import datastore_v1_pb2 as datastore_pb


class _Batches(Local):
"""Manage a thread-local LIFO stack of active batches / transactions.

Intended for use only in :class:`gcloud.datastore.batch.Batch.__enter__`
"""
def __init__(self):
super(_Batches, self).__init__()
self._stack = []

@property
def stack(self):
"""Return a copy of our stack."""
return self._stack[:]

def _push_batch(self, batch):

This comment was marked as spam.

"""Push a batch onto our stack.

Intended for use only in :meth:`gcloud.datastore.batch.Batch.__enter__`

:type batch: :class:`gcloud.datastore.batch.Batch` or
:class:`gcloud.datastore.transaction.Transaction`
"""
self._stack.append(batch)

def _pop_batch(self):
"""Pop a batch onto our stack.

Intended for use only in :meth:`gcloud.datastore.batch.Batch.__enter__`

:rtype: :class:`gcloud.datastore.batch.Batch` or
:class:`gcloud.datastore.transaction.Transaction`
:raises: IndexError if the stack is empty.
"""
return self._stack.pop()


_BATCHES = _Batches()


class Batch(object):
"""An abstraction representing a collected group of updates / deletes.

Expand Down Expand Up @@ -180,6 +224,13 @@ def delete(self, key):
self.connection.delete_entities(
self.dataset_id, [key_pb], mutation=self.mutation)

def begin(self):
"""No-op

Overridden by :class:`gcloud.datastore.transaction.Transaction`.
"""
pass

def commit(self):
"""Commits the batch.

Expand All @@ -197,9 +248,23 @@ def commit(self):
new_id = new_key_pb.path_element[-1].id
entity.key = entity.key.completed_key(new_id)

def rollback(self):
"""No-op

Overridden by :class:`gcloud.datastore.transaction.Transaction`.
"""
pass

def __enter__(self):
_BATCHES._push_batch(self)
self.begin()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.commit()
try:
if exc_type is None:

This comment was marked as spam.

self.commit()
else:
self.rollback()
finally:
_BATCHES._pop_batch()
76 changes: 62 additions & 14 deletions gcloud/datastore/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_put_entity_w_partial_key(self):

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
self.assertEqual(batch._auto_id_entities, [entity])

def test_put_entity_w_completed_key(self):
Expand All @@ -121,7 +121,7 @@ def test_put_entity_w_completed_key(self):

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])

def test_delete_w_partial_key(self):
_DATASET = 'DATASET'
Expand All @@ -142,7 +142,7 @@ def test_delete_w_completed_key(self):

self.assertEqual(
connection._deleted,
(_DATASET, [key._key], batch.mutation))
[(_DATASET, [key._key], batch.mutation)])

def test_commit(self):
_DATASET = 'DATASET'
Expand All @@ -151,7 +151,7 @@ def test_commit(self):

batch.commit()

self.assertEqual(connection._committed, (_DATASET, batch.mutation))
self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])

def test_commit_w_auto_id_entities(self):
_DATASET = 'DATASET'
Expand All @@ -165,45 +165,91 @@ def test_commit_w_auto_id_entities(self):

batch.commit()

self.assertEqual(connection._committed, (_DATASET, batch.mutation))
self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])
self.assertFalse(key._partial)
self.assertEqual(key._id, _NEW_ID)

def test_as_context_mgr_wo_error(self):
from gcloud.datastore.batch import _BATCHES
_DATASET = 'DATASET'
_PROPERTIES = {'foo': 'bar'}
connection = _Connection()
entity = _Entity(_PROPERTIES)
key = entity.key = _Key(_DATASET)

self.assertEqual(_BATCHES.stack, [])

with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch:
self.assertEqual(_BATCHES.stack, [batch])
batch.put(entity)

self.assertEqual(_BATCHES.stack, [])

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
self.assertEqual(connection._committed, (_DATASET, batch.mutation))
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])

def test_as_context_mgr_nested(self):
from gcloud.datastore.batch import _BATCHES
_DATASET = 'DATASET'
_PROPERTIES = {'foo': 'bar'}
connection = _Connection()
entity1 = _Entity(_PROPERTIES)
key = entity1.key = _Key(_DATASET)
entity2 = _Entity(_PROPERTIES)
key = entity2.key = _Key(_DATASET)

self.assertEqual(_BATCHES.stack, [])

with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch1:
self.assertEqual(_BATCHES.stack, [batch1])
batch1.put(entity1)
with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch2:
self.assertEqual(_BATCHES.stack, [batch1, batch2])
batch2.put(entity2)

self.assertEqual(_BATCHES.stack, [batch1])

self.assertEqual(_BATCHES.stack, [])

self.assertEqual(
connection._saved,
[(_DATASET, key._key, _PROPERTIES, (), batch1.mutation),
(_DATASET, key._key, _PROPERTIES, (), batch2.mutation)]
)
self.assertEqual(connection._committed,
[(_DATASET, batch2.mutation),
(_DATASET, batch1.mutation)])

def test_as_context_mgr_w_error(self):
from gcloud.datastore.batch import _BATCHES
_DATASET = 'DATASET'
_PROPERTIES = {'foo': 'bar'}
connection = _Connection()
entity = _Entity(_PROPERTIES)
key = entity.key = _Key(_DATASET)

self.assertEqual(_BATCHES.stack, [])

try:
with self._makeOne(dataset_id=_DATASET,
connection=connection) as batch:
self.assertEqual(_BATCHES.stack, [batch])
batch.put(entity)
raise ValueError("testing")
except ValueError:
pass

self.assertEqual(_BATCHES.stack, [])

self.assertEqual(
connection._saved,
(_DATASET, key._key, _PROPERTIES, (), batch.mutation))
self.assertEqual(connection._committed, None)
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
self.assertEqual(connection._committed, [])


class _CommitResult(object):
Expand All @@ -226,23 +272,25 @@ def __init__(self, id):

class _Connection(object):
_marker = object()
_committed = _saved = _deleted = None
_save_result = (False, None)

def __init__(self, *new_keys):
self._commit_result = _CommitResult(*new_keys)
self._committed = []
self._saved = []
self._deleted = []

def save_entity(self, dataset_id, key_pb, properties,
exclude_from_indexes=(), mutation=None):
self._saved = (dataset_id, key_pb, properties,
tuple(exclude_from_indexes), mutation)
self._saved.append((dataset_id, key_pb, properties,
tuple(exclude_from_indexes), mutation))
return self._save_result

def delete_entities(self, dataset_id, key_pbs, mutation=None):
self._deleted = (dataset_id, key_pbs, mutation)
self._deleted.append((dataset_id, key_pbs, mutation))

def commit(self, dataset_id, mutation):
self._committed = (dataset_id, mutation)
self._committed.append((dataset_id, mutation))
return self._commit_result


Expand Down
12 changes: 0 additions & 12 deletions gcloud/datastore/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,3 @@ def commit(self):

# Clear our own ID in case this gets accidentally reused.
self._id = None

def __enter__(self):
# Don't call super() -- we have different semantics.
self.begin()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
# Don't call super() -- we have different semantics.
if exc_type is None:
self.commit()
else:
self.rollback()