Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d35a0cc
Update `SessionOptions` to support `GOOGLE_CLOUD_SPANNER_FORCE_DISABL…
currantw May 29, 2025
34baadf
feat: Multiplexed sessions - Remove handling of `MethodNotImplemented…
currantw May 29, 2025
998f23f
feat: Multiplexed sessions - Update `Connection` to use multiplexed s…
currantw May 30, 2025
ec19f2d
cleanup: Rename `beforeNextRetry` to `before_next_retry`.
currantw May 30, 2025
25d0943
cleanup: Fix a few unrelated typos.
currantw May 30, 2025
fca6f06
feat: Multiplexed sessions - Add ingest of precommit tokens to `_Snap…
currantw May 30, 2025
56001b9
feat: Multiplexed sessions - Deprecate `StreamedResultSet._source` (r…
currantw May 30, 2025
b4eadca
feat: Multiplexed sessions - Move `_session_options` from `Database` …
currantw May 30, 2025
68e9b67
feat: Multiplexed sessions - Deprecate `SessionCheckout` and update `…
currantw May 30, 2025
6ca0d3f
feat: Multiplexed sessions - Deprecate `Database.session()` and minor…
currantw May 31, 2025
9057a64
feat: Multiplexed sessions - Update `BatchSnapshot` to use database s…
currantw May 31, 2025
c9dd818
feat: Multiplexed sessions - Move `Batch` and `Transaction` attribute…
currantw Jun 2, 2025
599939a
feat: Multiplexed sessions - Update pools so they don't use deprecate…
currantw Jun 2, 2025
2065e52
feat: Multiplexed sessions - Update session to remove class attribute…
currantw Jun 2, 2025
7b925b3
feat: Multiplexed sessions - Move begin transaction logic from `Snaps…
currantw Jun 3, 2025
9246dd2
feat: Multiplexed sessions - Remove begin transaction logic from `Tra…
currantw Jun 3, 2025
f1b3fdb
feat: Multiplexed sessions - Add logic for beginning mutations-only t…
currantw Jun 3, 2025
98c477d
feat: Multiplexed sessions - Cleanup and improve consistency of state…
currantw Jun 3, 2025
052f3e1
feat: Multiplexed sessions - Cleanup documentation for `Batch.commit`…
currantw Jun 3, 2025
2b9f212
feat: Multiplexed sessions - Add logic for retrying commits if precom…
currantw Jun 3, 2025
a77cc2b
feat: Multiplexed sessions - Remove `GOOGLE_CLOUD_SPANNER_FORCE_DISAB…
currantw Jun 4, 2025
5615f2c
feat: Multiplexed sessions - Cleanup `TestDatabaseSessionManager` so …
currantw Jun 4, 2025
00059f9
feat: Multiplexed sessions - Add type hints for `SessionOptions` and …
currantw Jun 4, 2025
083d6bc
feat: Multiplexed sessions - Fix `test_observability_options`
currantw Jun 4, 2025
6e33b1d
feat: Multiplexed sessions - Update `_builders` to use mock scoped cr…
currantw Jun 4, 2025
65042ab
feat: Multiplexed sessions - Add helpers for mock scoped credentials …
currantw Jun 4, 2025
9df088d
feat: Multiplexed sessions - Fix failing `test_batch_insert_then_read`.
currantw Jun 4, 2025
607df64
feat: Multiplexed sessions - Fix failing `test_transaction_read_and_i…
currantw Jun 4, 2025
0b6f5df
feat: Multiplexed sessions - Add test helper for multiplexed env vars.
currantw Jun 4, 2025
36c9775
feat: Multiplexed sessions - Add unit tests for begin transaction bas…
currantw Jun 4, 2025
418edda
feat: Multiplexed sessions - Attempt to fix `test_transaction_read_an…
currantw Jun 4, 2025
da226c1
feat: Multiplexed sessions - Add test for log when new session create…
currantw Jun 4, 2025
c6c130e
feat: Multiplexed sessions - Add additional multiplexed unit tests fo…
currantw Jun 4, 2025
1018f4c
feat: Multiplexed sessions - Cleanup `Transaction` by extracting some…
currantw Jun 4, 2025
b761e85
feat: Multiplexed sessions - Add additional `Transaction` tests for n…
currantw Jun 5, 2025
665547b
feat: Multiplexed sessions - Fix linter
currantw Jun 5, 2025
0bd5fd1
feat: Multiplexed sessions - Remove unnecessary TODO
currantw Jun 5, 2025
4cb1f05
feat: Multiplexed sessions - Remove unnecessary constants.
currantw Jun 5, 2025
17f3c5f
feat: Multiplexed sessions - Remove support for disabling the use of …
currantw Jun 5, 2025
9c3db74
feat: Multiplexed sessions - Make deprecation comments a bit more clear.
currantw Jun 9, 2025
17a2f47
feat: Multiplexed sessions - Add some more type hints.
currantw Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions google/cloud/spanner_dbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from google.cloud.spanner_dbapi.transaction_helper import TransactionRetryHelper
from google.cloud.spanner_dbapi.cursor import Cursor
from google.cloud.spanner_v1 import RequestOptions, TransactionOptions
from google.cloud.spanner_v1.database_sessions_manager import TransactionType
from google.cloud.spanner_v1.snapshot import Snapshot

from google.cloud.spanner_dbapi.exceptions import (
Expand Down Expand Up @@ -356,8 +357,16 @@ def _session_checkout(self):
"""
if self.database is None:
raise ValueError("Database needs to be passed for this operation")

if not self._session:
self._session = self.database._pool.get()
transaction_type = (
TransactionType.READ_ONLY
if self.read_only
else TransactionType.READ_WRITE
)
self._session = self.database._sessions_manager.get_session(
transaction_type
)

return self._session

Expand All @@ -368,9 +377,11 @@ def _release_session(self):
"""
if self._session is None:
return

if self.database is None:
raise ValueError("Database needs to be passed for this operation")
self.database._pool.put(self._session)

self.database._sessions_manager.put_session(self._session)
self._session = None

def transaction_checkout(self):
Expand Down Expand Up @@ -432,7 +443,7 @@ def close(self):
self._transaction.rollback()

if self._own_pool and self.database:
self.database._pool.clear()
self.database._sessions_manager._pool.clear()

self.is_closed = True

Expand Down
6 changes: 3 additions & 3 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ def _retry(
retry_count=5,
delay=2,
allowed_exceptions=None,
beforeNextRetry=None,
before_next_retry=None,
):
"""
Retry a function with a specified number of retries, delay between retries, and list of allowed exceptions.
Expand All @@ -552,8 +552,8 @@ def _retry(
"""
retries = 0
while retries <= retry_count:
if retries > 0 and beforeNextRetry:
beforeNextRetry(retries, delay)
if retries > 0 and before_next_retry:
before_next_retry(retries, delay)

try:
return func()
Expand Down
156 changes: 71 additions & 85 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

"""Context manager for Cloud Spanner batched writes."""
import functools
from typing import List, Optional

from google.cloud.spanner_v1 import CommitRequest
from google.cloud.spanner_v1 import CommitRequest, CommitResponse
from google.cloud.spanner_v1 import Mutation
from google.cloud.spanner_v1 import TransactionOptions
from google.cloud.spanner_v1 import BatchWriteRequest
Expand Down Expand Up @@ -47,22 +48,15 @@ class _BatchBase(_SessionWrapper):
:param session: the session used to perform the commit
"""

transaction_tag = None
_read_only = False

def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []

def _check_state(self):
"""Helper for :meth:`commit` et al.

Subclasses must override
self._mutations: List[Mutation] = []
self.transaction_tag: Optional[str] = None

:raises: :exc:`ValueError` if the object's state is invalid for making
API requests.
"""
raise NotImplementedError
self.committed = None
"""Timestamp at which the batch was successfully committed."""
self.commit_stats: Optional[CommitResponse.CommitStats] = None

def insert(self, table, columns, values):
"""Insert one or more new table rows.
Expand Down Expand Up @@ -148,29 +142,15 @@ def delete(self, table, keyset):
class Batch(_BatchBase):
"""Accumulate mutations for transmission during :meth:`commit`."""

committed = None
commit_stats = None
"""Timestamp at which the batch was successfully committed."""

def _check_state(self):
"""Helper for :meth:`commit` et al.

Subclasses must override

:raises: :exc:`ValueError` if the object's state is invalid for making
API requests.
"""
if self.committed is not None:
raise ValueError("Batch already committed")

def commit(
self,
return_commit_stats=False,
request_options=None,
max_commit_delay=None,
exclude_txn_from_change_streams=False,
isolation_level=TransactionOptions.IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED,
**kwargs,
timeout_secs=DEFAULT_RETRY_TIMEOUT_SECS,
default_retry_delay=None,
):
"""Commit mutations to the database.

Expand Down Expand Up @@ -202,12 +182,26 @@ def commit(
:param isolation_level:
(Optional) Sets isolation level for the transaction.

:type timeout_secs: int
:param timeout_secs: (Optional) The maximum time in seconds to wait for the commit to complete.

:type default_retry_delay: int
:param timeout_secs: (Optional) The default time in seconds to wait before re-trying the commit..

:rtype: datetime
:returns: timestamp of the committed changes.

:raises: ValueError: if the transaction is not ready to commit.
"""
self._check_state()
database = self._session._database

if self.committed is not None:
raise ValueError("Transaction already committed.")

mutations = self._mutations
session = self._session
database = session._database
api = database.spanner_api

metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
Expand All @@ -223,7 +217,6 @@ def commit(
database.default_transaction_options.default_read_write_transaction_options,
txn_options,
)
trace_attributes = {"num_mutations": len(self._mutations)}

if request_options is None:
request_options = RequestOptions()
Expand All @@ -234,27 +227,26 @@ def commit(
# Request tags are not supported for commit requests.
request_options.request_tag = None

request = CommitRequest(
session=self._session.name,
mutations=self._mutations,
single_use_transaction=txn_options,
return_commit_stats=return_commit_stats,
max_commit_delay=max_commit_delay,
request_options=request_options,
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
f"CloudSpanner.{type(self).__name__}.commit",
self._session,
trace_attributes,
observability_options=observability_options,
name=f"CloudSpanner.{type(self).__name__}.commit",
session=session,
extra_attributes={"num_mutations": len(mutations)},
observability_options=getattr(database, "observability_options", None),
metadata=metadata,
) as span, MetricsCapture():

def wrapped_method(*args, **kwargs):
method = functools.partial(
def wrapped_method():
commit_request = CommitRequest(
session=session.name,
mutations=mutations,
single_use_transaction=txn_options,
return_commit_stats=return_commit_stats,
max_commit_delay=max_commit_delay,
request_options=request_options,
)
commit_method = functools.partial(
api.commit,
request=request,
request=commit_request,
metadata=database.metadata_with_request_id(
# This code is retried due to ABORTED, hence nth_request
# should be increased. attempt can only be increased if
Expand All @@ -265,24 +257,23 @@ def wrapped_method(*args, **kwargs):
span,
),
)
return method(*args, **kwargs)
return commit_method()

deadline = time.time() + kwargs.get(
"timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS
)
default_retry_delay = kwargs.get("default_retry_delay", None)
response = _retry_on_aborted_exception(
wrapped_method,
deadline=deadline,
deadline=time.time() + timeout_secs,
default_retry_delay=default_retry_delay,
)

self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats

return self.committed

def __enter__(self):
"""Begin ``with`` block."""
self._check_state()
if self.committed is not None:
raise ValueError("Transaction already committed")

return self

Expand Down Expand Up @@ -317,20 +308,10 @@ class MutationGroups(_SessionWrapper):
:param session: the session used to perform the commit
"""

committed = None

def __init__(self, session):
super(MutationGroups, self).__init__(session)
self._mutation_groups = []

def _check_state(self):
"""Checks if the object's state is valid for making API requests.

:raises: :exc:`ValueError` if the object's state is invalid for making
API requests.
"""
if self.committed is not None:
raise ValueError("MutationGroups already committed")
self._mutation_groups: List[MutationGroup] = []
self.committed: bool = False

def group(self):
"""Returns a new `MutationGroup` to which mutations can be added."""
Expand Down Expand Up @@ -358,57 +339,62 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
:rtype: :class:`Iterable[google.cloud.spanner_v1.types.BatchWriteResponse]`
:returns: a sequence of responses for each batch.
"""
self._check_state()

database = self._session._database
if self.committed:
raise ValueError("MutationGroups already committed")

mutation_groups = self._mutation_groups
session = self._session
database = session._database
api = database.spanner_api

metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
trace_attributes = {"num_mutation_groups": len(self._mutation_groups)}

if request_options is None:
request_options = RequestOptions()
elif type(request_options) is dict:
request_options = RequestOptions(request_options)

request = BatchWriteRequest(
session=self._session.name,
mutation_groups=self._mutation_groups,
request_options=request_options,
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.batch_write",
self._session,
trace_attributes,
observability_options=observability_options,
name="CloudSpanner.batch_write",
session=session,
extra_attributes={"num_mutation_groups": len(mutation_groups)},
observability_options=getattr(database, "observability_options", None),
metadata=metadata,
) as span, MetricsCapture():
attempt = AtomicCounter(0)
nth_request = getattr(database, "_next_nth_request", 0)

def wrapped_method(*args, **kwargs):
method = functools.partial(
def wrapped_method():
batch_write_request = BatchWriteRequest(
session=session.name,
mutation_groups=mutation_groups,
request_options=request_options,
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
)
batch_write_method = functools.partial(
api.batch_write,
request=request,
request=batch_write_request,
metadata=database.metadata_with_request_id(
nth_request,
attempt.increment(),
metadata,
span,
),
)
return method(*args, **kwargs)
return batch_write_method()

response = _retry(
wrapped_method,
allowed_exceptions={
InternalServerError: _check_rst_stream_error,
},
)

self.committed = True
return response

Expand Down
Loading