Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def commit(
max_commit_delay=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
timeout_secs=DEFAULT_RETRY_TIMEOUT_SECS,
default_retry_delay=None,
):
Expand Down Expand Up @@ -182,6 +183,11 @@ def commit(
:param isolation_level:
(Optional) Sets isolation level for the transaction.

:type read_lock_mode:
:class:`google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.ReadLockMode`
:param read_lock_mode:
(Optional) Sets the read lock mode for this transaction.

:type timeout_secs: int
:param timeout_secs: (Optional) The maximum time in seconds to wait for the commit to complete.

Expand All @@ -208,7 +214,9 @@ def commit(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
txn_options = TransactionOptions(
read_write=TransactionOptions.ReadWrite(),
read_write=TransactionOptions.ReadWrite(
read_lock_mode=read_lock_mode,
),
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
isolation_level=isolation_level,
)
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ def batch(
max_commit_delay=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
**kw,
):
"""Return an object which wraps a batch.
Expand Down Expand Up @@ -914,6 +915,11 @@ def batch(
:param isolation_level:
(Optional) Sets the isolation level for this transaction. This overrides any default isolation level set for the client.

:type read_lock_mode:
:class:`google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.ReadLockMode`
:param read_lock_mode:
(Optional) Sets the read lock mode for this transaction. This overrides any default read lock mode set for the client.

:rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout`
:returns: new wrapper
"""
Expand All @@ -924,6 +930,7 @@ def batch(
max_commit_delay,
exclude_txn_from_change_streams,
isolation_level,
read_lock_mode,
**kw,
)

Expand Down Expand Up @@ -996,6 +1003,7 @@ def run_in_transaction(self, func, *args, **kw):
This does not exclude the transaction from being recorded in the change streams with
the DDL option `allow_txn_exclusion` being false or unset.
"isolation_level" sets the isolation level for the transaction.
"read_lock_mode" sets the read lock mode for the transaction.

:rtype: Any
:returns: The return value of ``func``.
Expand Down Expand Up @@ -1310,6 +1318,7 @@ def __init__(
max_commit_delay=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
**kw,
):
self._database: Database = database
Expand All @@ -1325,6 +1334,7 @@ def __init__(
self._max_commit_delay = max_commit_delay
self._exclude_txn_from_change_streams = exclude_txn_from_change_streams
self._isolation_level = isolation_level
self._read_lock_mode = read_lock_mode
self._kw = kw

def __enter__(self):
Expand Down Expand Up @@ -1357,6 +1367,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
max_commit_delay=self._max_commit_delay,
exclude_txn_from_change_streams=self._exclude_txn_from_change_streams,
isolation_level=self._isolation_level,
read_lock_mode=self._read_lock_mode,
**self._kw,
)
finally:
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def run_in_transaction(self, func, *args, **kw):
This does not exclude the transaction from being recorded in the change streams with
the DDL option `allow_txn_exclusion` being false or unset.
"isolation_level" sets the isolation level for the transaction.
"read_lock_mode" sets the read lock mode for the transaction.

:rtype: Any
:returns: The return value of ``func``.
Expand All @@ -525,6 +526,7 @@ def run_in_transaction(self, func, *args, **kw):
"exclude_txn_from_change_streams", None
)
isolation_level = kw.pop("isolation_level", None)
read_lock_mode = kw.pop("read_lock_mode", None)

database = self._database
log_commit_stats = database.log_commit_stats
Expand All @@ -549,6 +551,7 @@ def run_in_transaction(self, func, *args, **kw):
txn.transaction_tag = transaction_tag
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
txn.isolation_level = isolation_level
txn.read_lock_mode = read_lock_mode

if self.is_multiplexed:
txn._multiplexed_session_previous_transaction_id = (
Expand Down
14 changes: 12 additions & 2 deletions google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class Transaction(_SnapshotBase, _BatchBase):
isolation_level: TransactionOptions.IsolationLevel = (
TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED
)
read_lock_mode: TransactionOptions.ReadWrite.ReadLockMode = (
TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED
)

# Override defaults from _SnapshotBase.
_multi_use: bool = True
Expand Down Expand Up @@ -89,7 +92,8 @@ def _build_transaction_options_pb(self) -> TransactionOptions:

merge_transaction_options = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
multiplexed_session_previous_transaction_id=self._multiplexed_session_previous_transaction_id
multiplexed_session_previous_transaction_id=self._multiplexed_session_previous_transaction_id,
read_lock_mode=self.read_lock_mode,
),
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
isolation_level=self.isolation_level,
Expand Down Expand Up @@ -784,14 +788,20 @@ class BatchTransactionId:
@dataclass
class DefaultTransactionOptions:
isolation_level: str = TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED
read_lock_mode: str = (
TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED
)
_defaultReadWriteTransactionOptions: Optional[TransactionOptions] = field(
init=False, repr=False
)

def __post_init__(self):
"""Initialize _defaultReadWriteTransactionOptions automatically"""
self._defaultReadWriteTransactionOptions = TransactionOptions(
isolation_level=self.isolation_level
read_write=TransactionOptions.ReadWrite(
read_lock_mode=self.read_lock_mode,
),
isolation_level=self.isolation_level,
)

@property
Expand Down
71 changes: 69 additions & 2 deletions tests/unit/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,10 @@ def test_default_none_and_merge_none(self):

def test_default_options_and_merge_none(self):
default = TransactionOptions(
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ,
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.PESSIMISTIC,
),
)
merge = None
result = self._callFUT(default, merge)
Expand All @@ -988,7 +991,10 @@ def test_default_options_and_merge_none(self):
def test_default_none_and_merge_options(self):
default = None
merge = TransactionOptions(
isolation_level=TransactionOptions.IsolationLevel.SERIALIZABLE
isolation_level=TransactionOptions.IsolationLevel.SERIALIZABLE,
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
),
)
expected = merge
result = self._callFUT(default, merge)
Expand Down Expand Up @@ -1044,6 +1050,67 @@ def test_default_isolation_and_merge_options_isolation_unspecified(self):
result = self._callFUT(default, merge)
self.assertEqual(result, expected)

def test_default_and_merge_read_lock_mode_options(self):
default = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.PESSIMISTIC,
),
)
merge = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
),
exclude_txn_from_change_streams=True,
)
expected = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
),
exclude_txn_from_change_streams=True,
)
result = self._callFUT(default, merge)
self.assertEqual(result, expected)

def test_default_read_lock_mode_and_merge_options(self):
default = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
),
)
merge = TransactionOptions(
read_write=TransactionOptions.ReadWrite(),
exclude_txn_from_change_streams=True,
)
expected = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
),
exclude_txn_from_change_streams=True,
)
result = self._callFUT(default, merge)
self.assertEqual(result, expected)

def test_default_read_lock_mode_and_merge_options_isolation_unspecified(self):
default = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
),
)
merge = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
),
exclude_txn_from_change_streams=True,
)
expected = TransactionOptions(
read_write=TransactionOptions.ReadWrite(
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
),
exclude_txn_from_change_streams=True,
)
result = self._callFUT(default, merge)
self.assertEqual(result, expected)


class Test_interval(unittest.TestCase):
from google.protobuf.struct_pb2 import Value
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def _test_commit_with_options(
max_commit_delay_in=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.READ_LOCK_MODE_UNSPECIFIED,
):
now = datetime.datetime.utcnow().replace(tzinfo=UTC)
now_pb = _datetime_to_pb_timestamp(now)
Expand All @@ -315,6 +316,7 @@ def _test_commit_with_options(
max_commit_delay=max_commit_delay_in,
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
isolation_level=isolation_level,
read_lock_mode=read_lock_mode,
)

self.assertEqual(committed, now)
Expand Down Expand Up @@ -347,6 +349,10 @@ def _test_commit_with_options(
single_use_txn.isolation_level,
isolation_level,
)
self.assertEqual(
single_use_txn.read_write.read_lock_mode,
read_lock_mode,
)
req_id = f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.{database._channel_id}.1.1"
self.assertEqual(
metadata,
Expand Down Expand Up @@ -424,6 +430,25 @@ def test_commit_w_isolation_level(self):
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ,
)

def test_commit_w_read_lock_mode(self):
request_options = RequestOptions(
request_tag="tag-1",
)
self._test_commit_with_options(
request_options=request_options,
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC,
)

def test_commit_w_isolation_level_and_read_lock_mode(self):
request_options = RequestOptions(
request_tag="tag-1",
)
self._test_commit_with_options(
request_options=request_options,
isolation_level=TransactionOptions.IsolationLevel.REPEATABLE_READ,
read_lock_mode=TransactionOptions.ReadWrite.ReadLockMode.PESSIMISTIC,
)

def test_context_mgr_already_committed(self):
now = datetime.datetime.utcnow().replace(tzinfo=UTC)
database = _Database()
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class TestClient(unittest.TestCase):
},
}
DEFAULT_TRANSACTION_OPTIONS = DefaultTransactionOptions(
isolation_level="SERIALIZABLE"
isolation_level="SERIALIZABLE",
read_lock_mode="PESSIMISTIC",
)

def _get_target_class(self):
Expand Down
Loading