Skip to content
Merged
3 changes: 2 additions & 1 deletion google/cloud/spanner_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from .types.type import TypeAnnotationCode
from .types.type import TypeCode
from .data_types import JsonObject
from .transaction import BatchTransactionId
from .transaction import BatchTransactionId, DefaultTransactionOptions

from google.cloud.spanner_v1 import param_types
from google.cloud.spanner_v1.client import Client
Expand Down Expand Up @@ -149,4 +149,5 @@
"SpannerClient",
"SpannerAsyncClient",
"BatchTransactionId",
"DefaultTransactionOptions",
)
36 changes: 36 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from google.cloud.spanner_v1 import TypeCode
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1 import JsonObject
from google.cloud.spanner_v1 import TransactionOptions
from google.cloud.spanner_v1.request_id_header import with_request_id
from google.rpc.error_details_pb2 import RetryInfo

Expand Down Expand Up @@ -690,3 +691,38 @@ def __radd__(self, n):

def _metadata_with_request_id(*args, **kwargs):
return with_request_id(*args, **kwargs)


def _merge_Transaction_Options(
defaultTransactionOptions: TransactionOptions,
mergeTransactionOptions: TransactionOptions,
) -> TransactionOptions:
"""Merges two TransactionOptions objects.

- Values from `mergeTransactionOptions` take precedence if set.
- Values from `defaultTransactionOptions` are used only if missing.

Args:
defaultTransactionOptions (TransactionOptions): The default transaction options (fallback values).
mergeTransactionOptions (TransactionOptions): The main transaction options (overrides when set).

Returns:
TransactionOptions: A merged TransactionOptions object.
"""

if defaultTransactionOptions is None:
return mergeTransactionOptions

if mergeTransactionOptions is None:
return defaultTransactionOptions

merged_pb = TransactionOptions()._pb # Create a new protobuf object

# Merge defaultTransactionOptions first
merged_pb.MergeFrom(defaultTransactionOptions._pb)

# Merge transactionOptions, ensuring it overrides default values
merged_pb.MergeFrom(mergeTransactionOptions._pb)

# Convert protobuf object back into a TransactionOptions instance
return TransactionOptions(merged_pb)
20 changes: 20 additions & 0 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from google.cloud.spanner_v1._helpers import (
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
_merge_Transaction_Options,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1 import RequestOptions
Expand Down Expand Up @@ -167,6 +168,7 @@ def commit(
request_options=None,
max_commit_delay=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
**kwargs,
):
"""Commit mutations to the database.
Expand All @@ -187,6 +189,18 @@ def commit(
(Optional) The amount of latency this request is willing to incur
in order to improve throughput.

:type exclude_txn_from_change_streams: bool
:param exclude_txn_from_change_streams:
(Optional) If true, instructs the transaction to be excluded from being recorded in change streams
with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
unset.

:type isolation_level:
:class:`google.cloud.spanner_v1.types.TransactionOptions.IsolationLevel`
:param isolation_level:
(Optional) Sets isolation level for the transaction.

:rtype: datetime
:returns: timestamp of the committed changes.
"""
Expand All @@ -201,6 +215,12 @@ def commit(
txn_options = TransactionOptions(
read_write=TransactionOptions.ReadWrite(),
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
isolation_level=isolation_level,
)

txn_options = _merge_Transaction_Options(
database.default_transaction_options.default_read_write_transaction_options,
txn_options,
)
trace_attributes = {"num_mutations": len(self._mutations)}

Expand Down
44 changes: 44 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from google.auth.credentials import AnonymousCredentials
import google.api_core.client_options
from google.cloud.client import ClientWithProject
from typing import Optional


from google.cloud.spanner_admin_database_v1 import DatabaseAdminClient
Expand All @@ -45,6 +46,7 @@
from google.cloud.spanner_admin_instance_v1 import ListInstancesRequest
from google.cloud.spanner_v1 import __version__
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1 import DefaultTransactionOptions
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1.instance import Instance
Expand Down Expand Up @@ -165,6 +167,10 @@ class Client(ClientWithProject):
or you can use the environment variable `SPANNER_ENABLE_END_TO_END_TRACING=<boolean>`
to control it.

:type default_transaction_options: :class:`~google.cloud.spanner_v1.DefaultTransactionOptions`
or :class:`dict`
:param default_transaction_options: (Optional) Default options to use for all transactions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clearly call out that this is only for r/w transactions?

Suggested change
:param default_transaction_options: (Optional) Default options to use for all transactions.
:param default_transaction_options: (Optional) Default options to use for all read/write transactions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep it flexible for all types of transaction options. For example, if we add exclude_txn_from_change_streams to the default transaction options, it should apply to both RW and PDML. To handle this, we can introduce another private property _defaultPDMTransactionOptions within the DefaultTransactionOptions class, where exclude_txn_from_change_streams will be set.


:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -186,6 +192,7 @@ def __init__(
route_to_leader_enabled=True,
directed_read_options=None,
observability_options=None,
default_transaction_options: Optional[DefaultTransactionOptions] = None,
):
self._emulator_host = _get_spanner_emulator_host()

Expand Down Expand Up @@ -247,6 +254,14 @@ def __init__(
self._route_to_leader_enabled = route_to_leader_enabled
self._directed_read_options = directed_read_options
self._observability_options = observability_options
if default_transaction_options is None:
default_transaction_options = DefaultTransactionOptions()
elif not isinstance(default_transaction_options, DefaultTransactionOptions):
raise TypeError(
"default_transaction_options must be an instance of DefaultTransactionOptions"
)

self._default_transaction_options = default_transaction_options

@property
def credentials(self):
Expand Down Expand Up @@ -337,6 +352,17 @@ def observability_options(self):
"""
return self._observability_options

@property
def default_transaction_options(self):
"""Getter for default_transaction_options.

:rtype:
:class:`~google.cloud.spanner_v1.DefaultTransactionOptions`
or :class:`dict`
:returns: The default transaction options that are used by this client for all transactions.
"""
return self._default_transaction_options

@property
def directed_read_options(self):
"""Getter for directed_read_options.
Expand Down Expand Up @@ -482,3 +508,21 @@ def directed_read_options(self, directed_read_options):
or regions should be used for non-transactional reads or queries.
"""
self._directed_read_options = directed_read_options

@default_transaction_options.setter
def default_transaction_options(
self, default_transaction_options: DefaultTransactionOptions
):
"""Sets default_transaction_options for the client
:type default_transaction_options: :class:`~google.cloud.spanner_v1.DefaultTransactionOptions`
or :class:`dict`
:param default_transaction_options: Default options to use for transactions.
"""
if default_transaction_options is None:
default_transaction_options = DefaultTransactionOptions()
elif not isinstance(default_transaction_options, DefaultTransactionOptions):
raise TypeError(
"default_transaction_options must be an instance of DefaultTransactionOptions"
)

self._default_transaction_options = default_transaction_options
16 changes: 16 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from google.cloud.spanner_v1 import TypeCode
from google.cloud.spanner_v1 import TransactionSelector
from google.cloud.spanner_v1 import TransactionOptions
from google.cloud.spanner_v1 import DefaultTransactionOptions
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1 import SpannerClient
from google.cloud.spanner_v1._helpers import _merge_query_options
Expand Down Expand Up @@ -183,6 +184,9 @@ def __init__(
self._enable_drop_protection = enable_drop_protection
self._reconciling = False
self._directed_read_options = self._instance._client.directed_read_options
self.default_transaction_options: DefaultTransactionOptions = (
self._instance._client.default_transaction_options
)
self._proto_descriptors = proto_descriptors

if pool is None:
Expand Down Expand Up @@ -782,6 +786,7 @@ def batch(
request_options=None,
max_commit_delay=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
**kw,
):
"""Return an object which wraps a batch.
Expand Down Expand Up @@ -809,14 +814,21 @@ def batch(
being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
unset.

:type isolation_level:
:class:`google.cloud.spanner_v1.types.TransactionOptions.IsolationLevel`
:param isolation_level:
(Optional) Sets the isolation level for this transaction. This overrides any default isolation level set for the client.

:rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout`
:returns: new wrapper
"""

return BatchCheckout(
self,
request_options,
max_commit_delay,
exclude_txn_from_change_streams,
isolation_level,
**kw,
)

Expand Down Expand Up @@ -888,6 +900,7 @@ def run_in_transaction(self, func, *args, **kw):
from being recorded in change streams with the DDL option `allow_txn_exclusion=true`.
This does not exclude the transaction from being recorded in the change streams with
the DDL option `allow_txn_exclusion` being false or unset.
"isolation_level" sets the isolation level for transaction.

:rtype: Any
:returns: The return value of ``func``.
Expand Down Expand Up @@ -1178,6 +1191,7 @@ def __init__(
request_options=None,
max_commit_delay=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
**kw,
):
self._database = database
Expand All @@ -1190,6 +1204,7 @@ def __init__(
self._request_options = request_options
self._max_commit_delay = max_commit_delay
self._exclude_txn_from_change_streams = exclude_txn_from_change_streams
self._isolation_level = isolation_level
self._kw = kw

def __enter__(self):
Expand All @@ -1211,6 +1226,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
request_options=self._request_options,
max_commit_delay=self._max_commit_delay,
exclude_txn_from_change_streams=self._exclude_txn_from_change_streams,
isolation_level=self._isolation_level,
**self._kw,
)
finally:
Expand Down
5 changes: 4 additions & 1 deletion google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction

from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture


Expand Down Expand Up @@ -453,6 +452,7 @@ def run_in_transaction(self, func, *args, **kw):
from being recorded in change streams with the DDL option `allow_txn_exclusion=true`.
This does not exclude the transaction from being recorded in the change streams with
the DDL option `allow_txn_exclusion` being false or unset.
"isolation_level" sets the isolation level for transaction.

:rtype: Any
:returns: The return value of ``func``.
Expand All @@ -467,6 +467,8 @@ def run_in_transaction(self, func, *args, **kw):
exclude_txn_from_change_streams = kw.pop(
"exclude_txn_from_change_streams", None
)
isolation_level = kw.pop("isolation_level", None)

attempts = 0

observability_options = getattr(self._database, "observability_options", None)
Expand All @@ -482,6 +484,7 @@ def run_in_transaction(self, func, *args, **kw):
txn.exclude_txn_from_change_streams = (
exclude_txn_from_change_streams
)
txn.isolation_level = isolation_level
else:
txn = self._transaction

Expand Down
Loading
Loading