Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ced5d4b
feat: enable multiplex session for all operations unless explicitly s…
rahul2393 Jul 16, 2025
4878ff3
fix tests
rahul2393 Jul 16, 2025
15767c9
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 16, 2025
7225b88
rename job name
rahul2393 Jul 16, 2025
f224acb
fux emulator systest
rahul2393 Jul 16, 2025
311451a
update python version for emulator tests
rahul2393 Jul 16, 2025
be1c9d8
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 16, 2025
e5993ff
fix test
rahul2393 Jul 16, 2025
73f6145
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 16, 2025
b940b69
fix test
rahul2393 Jul 16, 2025
679f9be
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 16, 2025
3ad9ce9
fix systests
rahul2393 Jul 17, 2025
8e8449d
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 17, 2025
7036734
skip dbapi test which depends on session delete
rahul2393 Jul 17, 2025
8741633
revert timestamp changes
rahul2393 Jul 17, 2025
67ea777
revert timestamp changes
rahul2393 Jul 17, 2025
6f4add1
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 17, 2025
8976025
Merge branch 'multiplex-default-enable' of https://github.com/googlea…
gcf-owl-bot[bot] Jul 17, 2025
63bac62
more fixes
rahul2393 Jul 17, 2025
739a78e
fix regular session systests
rahul2393 Jul 24, 2025
ccf299f
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 24, 2025
f8d0345
expect precommit token only when session is multiplexed.
rahul2393 Jul 24, 2025
c942b18
pin emulator version to make multiplex session with emulator pass
rahul2393 Jul 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
branches:
- main
pull_request:
name: Run Spanner integration tests against emulator with multiplexed sessions
name: Run Spanner integration tests against emulator with regular sessions
jobs:
system-tests:
runs-on: ubuntu-latest
Expand All @@ -21,7 +21,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: 3.8
python-version: 3.12
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because in followup PR we will be removing 3.8 runtime support
#1395

- name: Install nox
run: python -m pip install nox
- name: Run system tests
Expand All @@ -30,5 +30,6 @@ jobs:
SPANNER_EMULATOR_HOST: localhost:9010
GOOGLE_CLOUD_PROJECT: emulator-test-project
GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: false
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: false
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW: false
4 changes: 2 additions & 2 deletions .github/workflows/integration-tests-against-emulator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:

services:
emulator:
image: gcr.io/cloud-spanner-emulator/emulator:latest
image: gcr.io/cloud-spanner-emulator/emulator:1.5.37
ports:
- 9010:9010
- 9020:9020
Expand All @@ -21,7 +21,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: 3.8
python-version: 3.12
- name: Install nox
run: python -m pip install nox
- name: Run system tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ env_vars: {

env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
value: "true"
value: "false"
}

env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
value: "true"
value: "false"
}

env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"
value: "false"
}
9 changes: 8 additions & 1 deletion google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,14 @@ def session(self, labels=None, database_role=None):
# If role is specified in param, then that role is used
# instead.
role = database_role or self._database_role
return Session(self, labels=labels, database_role=role)
is_multiplexed = False
if self.sessions_manager._use_multiplexed(
transaction_type=TransactionType.READ_ONLY
):
is_multiplexed = True
return Session(
self, labels=labels, database_role=role, is_multiplexed=is_multiplexed
)

def snapshot(self, **kw):
"""Return an object which wraps a snapshot.
Expand Down
26 changes: 10 additions & 16 deletions google/cloud/spanner_v1/database_sessions_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,13 @@ def _use_multiplexed(cls, transaction_type: TransactionType) -> bool:
"""Returns whether to use multiplexed sessions for the given transaction type.

Multiplexed sessions are enabled for read-only transactions if:
* _ENV_VAR_MULTIPLEXED is set to true.
* _ENV_VAR_MULTIPLEXED != 'false'.

Multiplexed sessions are enabled for partitioned transactions if:
* _ENV_VAR_MULTIPLEXED is set to true; and
* _ENV_VAR_MULTIPLEXED_PARTITIONED is set to true.
* _ENV_VAR_MULTIPLEXED_PARTITIONED != 'false'.

Multiplexed sessions are enabled for read/write transactions if:
* _ENV_VAR_MULTIPLEXED is set to true; and
* _ENV_VAR_MULTIPLEXED_READ_WRITE is set to true.
* _ENV_VAR_MULTIPLEXED_READ_WRITE != 'false'.

:type transaction_type: :class:`TransactionType`
:param transaction_type: the type of transaction
Expand All @@ -254,30 +252,26 @@ def _use_multiplexed(cls, transaction_type: TransactionType) -> bool:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED)

elif transaction_type is TransactionType.PARTITIONED:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED) and cls._getenv(
cls._ENV_VAR_MULTIPLEXED_PARTITIONED
)
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_PARTITIONED)

elif transaction_type is TransactionType.READ_WRITE:
return cls._getenv(cls._ENV_VAR_MULTIPLEXED) and cls._getenv(
cls._ENV_VAR_MULTIPLEXED_READ_WRITE
)
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_READ_WRITE)

raise ValueError(f"Transaction type {transaction_type} is not supported.")

@classmethod
def _getenv(cls, env_var_name: str) -> bool:
"""Returns the value of the given environment variable as a boolean.

True values are '1' and 'true' (case-insensitive).
All other values are considered false.
True unless explicitly 'false' (case-insensitive).
All other values (including unset) are considered true.

:type env_var_name: str
:param env_var_name: the name of the boolean environment variable

:rtype: bool
:returns: True if the environment variable is set to a true value, False otherwise.
:returns: True unless the environment variable is set to 'false', False otherwise.
"""

env_var_value = getenv(env_var_name, "").lower().strip()
return env_var_value in ["1", "true"]
env_var_value = getenv(env_var_name, "true").lower().strip()
return env_var_value != "false"
8 changes: 7 additions & 1 deletion google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,13 @@ def delete(self):
current_span, "Deleting Session failed due to unset session_id"
)
raise ValueError("Session ID not set by back-end")

if self._is_multiplexed:
add_span_event(
current_span,
"Skipped deleting Multiplexed Session",
{"session.id": self._session_id},
)
return
add_span_event(
current_span, "Deleting Session", {"session.id": self._session_id}
)
Expand Down
27 changes: 19 additions & 8 deletions google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def _restart_on_unavailable(
# Update the transaction from the response.
if transaction is not None:
transaction._update_for_result_set_pb(item)
if item.precommit_token is not None and transaction is not None:
transaction._update_for_precommit_token_pb(item.precommit_token)

if item.resume_token:
resume_token = item.resume_token
Expand Down Expand Up @@ -1013,9 +1015,6 @@ def _update_for_result_set_pb(
if result_set_pb.metadata and result_set_pb.metadata.transaction:
self._update_for_transaction_pb(result_set_pb.metadata.transaction)

if result_set_pb.precommit_token:
self._update_for_precommit_token_pb(result_set_pb.precommit_token)

def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
"""Updates the snapshot for the given transaction.

Expand All @@ -1031,7 +1030,7 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
self._transaction_id = transaction_pb.id

if transaction_pb.precommit_token:
self._update_for_precommit_token_pb(transaction_pb.precommit_token)
self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we need two methods?

_update_for_precommit_token_pb, _update_for_precommit_token_pb_unsafe

Copy link
Contributor Author

@rahul2393 rahul2393 Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a deadlock happening, transaction was locked already when calling _update_for_transaction_pb, and in _update_for_precommit_token_pb we were again trying to lock it.

This was the issue from previous release which got uncovered once I tried making mux default enabled.

Copy link
Contributor Author

@rahul2393 rahul2393 Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read this comment The caller is responsible for locking until the transaction ID is updated.

https://github.com/googleapis/python-spanner/blob/main/google/cloud/spanner_v1/snapshot.py#L1026-L1029

And then when we were calling _update_for_precommit_token_pb we were taking the lock again
https://github.com/googleapis/python-spanner/blob/main/google/cloud/spanner_v1/snapshot.py#L1046

So we needed 2 methods which takes lock when updating precommit token from unlocked code, and one which is called from places which assume lock is already taken


def _update_for_precommit_token_pb(
self, precommit_token_pb: MultiplexedSessionPrecommitToken
Expand All @@ -1044,10 +1043,22 @@ def _update_for_precommit_token_pb(
# Because multiple threads can be used to perform operations within a
# transaction, we need to use a lock when updating the precommit token.
with self._lock:
if self._precommit_token is None or (
precommit_token_pb.seq_num > self._precommit_token.seq_num
):
self._precommit_token = precommit_token_pb
self._update_for_precommit_token_pb_unsafe(precommit_token_pb)

def _update_for_precommit_token_pb_unsafe(
self, precommit_token_pb: MultiplexedSessionPrecommitToken
) -> None:
"""Updates the snapshot for the given multiplexed session precommit token.
This method is unsafe because it does not acquire a lock before updating
the precommit token. It should only be used when the caller has already
acquired the lock.
:type precommit_token_pb: :class:`~google.cloud.spanner_v1.MultiplexedSessionPrecommitToken`
:param precommit_token_pb: The multiplexed session precommit token to update the snapshot with.
"""
if self._precommit_token is None or (
precommit_token_pb.seq_num > self._precommit_token.seq_num
):
self._precommit_token = precommit_token_pb


class Snapshot(_SnapshotBase):
Expand Down
29 changes: 21 additions & 8 deletions google/cloud/spanner_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,18 @@ def commit(

def wrapped_method(*args, **kwargs):
attempt.increment()
commit_request_args = {
"mutations": mutations,
**common_commit_request_args,
}
# Check if session is multiplexed (safely handle mock sessions)
is_multiplexed = getattr(self._session, "is_multiplexed", False)
if is_multiplexed and self._precommit_token is not None:
commit_request_args["precommit_token"] = self._precommit_token

commit_method = functools.partial(
api.commit,
request=CommitRequest(
mutations=mutations,
precommit_token=self._precommit_token,
**common_commit_request_args,
),
request=CommitRequest(**commit_request_args),
metadata=database.metadata_with_request_id(
nth_request,
attempt.value,
Expand Down Expand Up @@ -516,6 +521,9 @@ def wrapped_method(*args, **kwargs):
if is_inline_begin:
self._lock.release()

if result_set_pb.precommit_token is not None:
self._update_for_precommit_token_pb(result_set_pb.precommit_token)

return result_set_pb.stats.row_count_exact

def batch_update(
Expand Down Expand Up @@ -660,6 +668,14 @@ def wrapped_method(*args, **kwargs):
if is_inline_begin:
self._lock.release()

if (
len(response_pb.result_sets) > 0
and response_pb.result_sets[0].precommit_token
):
self._update_for_precommit_token_pb(
response_pb.result_sets[0].precommit_token
)

row_counts = [
result_set.stats.row_count_exact for result_set in response_pb.result_sets
]
Expand Down Expand Up @@ -736,9 +752,6 @@ def _update_for_execute_batch_dml_response_pb(
:type response_pb: :class:`~google.cloud.spanner_v1.types.ExecuteBatchDmlResponse`
:param response_pb: The execute batch DML response to update the transaction with.
"""
if response_pb.precommit_token:
self._update_for_precommit_token_pb(response_pb.precommit_token)

# Only the first result set contains the result set metadata.
if len(response_pb.result_sets) > 0:
self._update_for_result_set_pb(response_pb.result_sets[0])
Expand Down
2 changes: 1 addition & 1 deletion tests/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def is_multiplexed_enabled(transaction_type: TransactionType) -> bool:
env_var_read_write = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"

def _getenv(val: str) -> bool:
return getenv(val, "false").lower() == "true"
return getenv(val, "true").lower().strip() != "false"

if transaction_type is TransactionType.READ_ONLY:
return _getenv(env_var)
Expand Down
107 changes: 107 additions & 0 deletions tests/mockserver_tests/mock_server_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
SpannerServicer,
start_mock_server,
)
from tests._helpers import is_multiplexed_enabled


# Creates an aborted status with the smallest possible retry delay.
Expand Down Expand Up @@ -228,3 +229,109 @@ def database(self) -> Database:
enable_interceptors_in_tests=True,
)
return self._database

def assert_requests_sequence(
self,
requests,
expected_types,
transaction_type,
allow_multiple_batch_create=True,
):
"""Assert that the requests sequence matches the expected types, accounting for multiplexed sessions and retries.

Args:
requests: List of requests from spanner_service.requests
expected_types: List of expected request types (excluding session creation requests)
transaction_type: TransactionType enum value to check multiplexed session status
allow_multiple_batch_create: If True, skip all leading BatchCreateSessionsRequest and one optional CreateSessionRequest
"""
from google.cloud.spanner_v1 import (
BatchCreateSessionsRequest,
CreateSessionRequest,
)

mux_enabled = is_multiplexed_enabled(transaction_type)
idx = 0
# Skip all leading BatchCreateSessionsRequest (for retries)
if allow_multiple_batch_create:
while idx < len(requests) and isinstance(
requests[idx], BatchCreateSessionsRequest
):
idx += 1
# For multiplexed, optionally skip a CreateSessionRequest
if (
mux_enabled
and idx < len(requests)
and isinstance(requests[idx], CreateSessionRequest)
):
idx += 1
else:
if mux_enabled:
self.assertTrue(
isinstance(requests[idx], BatchCreateSessionsRequest),
f"Expected BatchCreateSessionsRequest at index {idx}, got {type(requests[idx])}",
)
idx += 1
self.assertTrue(
isinstance(requests[idx], CreateSessionRequest),
f"Expected CreateSessionRequest at index {idx}, got {type(requests[idx])}",
)
idx += 1
else:
self.assertTrue(
isinstance(requests[idx], BatchCreateSessionsRequest),
f"Expected BatchCreateSessionsRequest at index {idx}, got {type(requests[idx])}",
)
idx += 1
# Check the rest of the expected request types
for expected_type in expected_types:
self.assertTrue(
isinstance(requests[idx], expected_type),
f"Expected {expected_type} at index {idx}, got {type(requests[idx])}",
)
idx += 1
self.assertEqual(
idx, len(requests), f"Expected {idx} requests, got {len(requests)}"
)

def adjust_request_id_sequence(self, expected_segments, requests, transaction_type):
"""Adjust expected request ID sequence numbers based on actual session creation requests.

Args:
expected_segments: List of expected (method, (sequence_numbers)) tuples
requests: List of actual requests from spanner_service.requests
transaction_type: TransactionType enum value to check multiplexed session status

Returns:
List of adjusted expected segments with corrected sequence numbers
"""
from google.cloud.spanner_v1 import (
BatchCreateSessionsRequest,
CreateSessionRequest,
ExecuteSqlRequest,
BeginTransactionRequest,
)

# Count session creation requests that come before the first non-session request
session_requests_before = 0
for req in requests:
if isinstance(req, (BatchCreateSessionsRequest, CreateSessionRequest)):
session_requests_before += 1
elif isinstance(req, (ExecuteSqlRequest, BeginTransactionRequest)):
break

# For multiplexed sessions, we expect 2 session requests (BatchCreateSessions + CreateSession)
# For non-multiplexed, we expect 1 session request (BatchCreateSessions)
mux_enabled = is_multiplexed_enabled(transaction_type)
expected_session_requests = 2 if mux_enabled else 1
extra_session_requests = session_requests_before - expected_session_requests

# Adjust sequence numbers based on extra session requests
adjusted_segments = []
for method, seq_nums in expected_segments:
# Adjust the sequence number (5th element in the tuple)
adjusted_seq_nums = list(seq_nums)
adjusted_seq_nums[4] += extra_session_requests
adjusted_segments.append((method, tuple(adjusted_seq_nums)))

return adjusted_segments
Loading