Skip to content

Commit b45500d

Browse files
committed
feat: Add support for multiplexed sessions
1 parent b433281 commit b45500d

File tree

10 files changed

+705
-180
lines changed

10 files changed

+705
-180
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
on:
2+
push:
3+
branches:
4+
- main
5+
pull_request:
6+
name: Run Spanner integration tests against emulator
7+
jobs:
8+
system-tests:
9+
runs-on: ubuntu-latest
10+
11+
services:
12+
emulator:
13+
image: gcr.io/cloud-spanner-emulator/emulator:latest
14+
ports:
15+
- 9010:9010
16+
- 9020:9020
17+
18+
steps:
19+
- name: Checkout code
20+
uses: actions/checkout@v4
21+
- name: Setup Python
22+
uses: actions/setup-python@v5
23+
with:
24+
python-version: 3.8
25+
- name: Install nox
26+
run: python -m pip install nox
27+
- name: Run system tests
28+
run: nox -s system
29+
env:
30+
SPANNER_EMULATOR_HOST: localhost:9010
31+
GOOGLE_CLOUD_PROJECT: emulator-test-project
32+
GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true
33+
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Format: //devtools/kokoro/config/proto/build.proto
2+
3+
# Only run a subset of all nox sessions
4+
env_vars: {
5+
key: "NOX_SESSION"
6+
value: "unit-3.8 unit-3.12 system-3.8"
7+
}
8+
9+
env_vars: {
10+
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
11+
value: "true"
12+
}
13+
14+
env_vars: {
15+
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
16+
value: "true"
17+
}

google/cloud/spanner_dbapi/connection.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from google.cloud.spanner_dbapi.transaction_helper import TransactionRetryHelper
2929
from google.cloud.spanner_dbapi.cursor import Cursor
3030
from google.cloud.spanner_v1 import RequestOptions, TransactionOptions
31+
from google.cloud.spanner_v1.session_options import TransactionType
3132
from google.cloud.spanner_v1.snapshot import Snapshot
3233

3334
from google.cloud.spanner_dbapi.exceptions import (
@@ -357,7 +358,12 @@ def _session_checkout(self):
357358
if self.database is None:
358359
raise ValueError("Database needs to be passed for this operation")
359360
if not self._session:
360-
self._session = self.database._pool.get()
361+
transaction_type = (
362+
TransactionType.READ_ONLY
363+
if self.read_only
364+
else TransactionType.READ_WRITE
365+
)
366+
self._session = self.database._session_manager.get_session(transaction_type)
361367

362368
return self._session
363369

@@ -370,7 +376,7 @@ def _release_session(self):
370376
return
371377
if self.database is None:
372378
raise ValueError("Database needs to be passed for this operation")
373-
self.database._pool.put(self._session)
379+
self.database._session_manager.put_session(self._session)
374380
self._session = None
375381

376382
def transaction_checkout(self):
@@ -432,7 +438,7 @@ def close(self):
432438
self._transaction.rollback()
433439

434440
if self._own_pool and self.database:
435-
self.database._pool.clear()
441+
self.database._session_manager._pool.clear()
436442

437443
self.is_closed = True
438444

google/cloud/spanner_v1/client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
from google.cloud.spanner_v1.metrics.metrics_exporter import (
6161
CloudMonitoringMetricsExporter,
6262
)
63+
from google.cloud.spanner_v1.session_options import SessionOptions
6364

6465
try:
6566
from opentelemetry import metrics
@@ -172,6 +173,9 @@ class Client(ClientWithProject):
172173
or :class:`dict`
173174
:param default_transaction_options: (Optional) Default options to use for all transactions.
174175
176+
:type session_options: :class:`~google.cloud.spanner_v1.SessionOptions`
177+
:param session_options: (Optional) Options for client sessions.
178+
175179
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
176180
and ``admin`` are :data:`True`
177181
"""
@@ -196,6 +200,7 @@ def __init__(
196200
directed_read_options=None,
197201
observability_options=None,
198202
default_transaction_options: Optional[DefaultTransactionOptions] = None,
203+
session_options=None,
199204
):
200205
self._emulator_host = _get_spanner_emulator_host()
201206

@@ -268,6 +273,7 @@ def __init__(
268273
self._default_transaction_options = default_transaction_options
269274
self._nth_client_id = Client.NTH_CLIENT.increment()
270275
self._nth_request = AtomicCounter(0)
276+
self._session_options = session_options or SessionOptions()
271277

272278
@property
273279
def _next_nth_request(self):
@@ -373,6 +379,14 @@ def default_transaction_options(self):
373379
"""
374380
return self._default_transaction_options
375381

382+
@property
383+
def session_options(self):
384+
"""Returns the session options for the client.
385+
:rtype: :class:`~google.cloud.spanner_v1.SessionOptions`
386+
:returns: The session options for the client.
387+
"""
388+
return self._session_options
389+
376390
@property
377391
def directed_read_options(self):
378392
"""Getter for directed_read_options.

google/cloud/spanner_v1/database.py

Lines changed: 83 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@
5757
)
5858
from google.cloud.spanner_v1.batch import Batch
5959
from google.cloud.spanner_v1.batch import MutationGroups
60+
from google.cloud.spanner_v1.database_sessions_manager import DatabaseSessionsManager
6061
from google.cloud.spanner_v1.keyset import KeySet
6162
from google.cloud.spanner_v1.merged_result_set import MergedResultSet
6263
from google.cloud.spanner_v1.pool import BurstyPool
63-
from google.cloud.spanner_v1.pool import SessionCheckout
64-
from google.cloud.spanner_v1.session import Session
64+
from google.cloud.spanner_v1.session_options import SessionOptions, TransactionType
6565
from google.cloud.spanner_v1.snapshot import _restart_on_unavailable
6666
from google.cloud.spanner_v1.snapshot import Snapshot
6767
from google.cloud.spanner_v1.streamed import StreamedResultSet
@@ -196,9 +196,9 @@ def __init__(
196196

197197
if pool is None:
198198
pool = BurstyPool(database_role=database_role)
199-
200-
self._pool = pool
201199
pool.bind(self)
200+
self._session_manager = DatabaseSessionsManager(database=self, pool=pool)
201+
202202

203203
@classmethod
204204
def from_pb(cls, database_pb, instance, pool=None):
@@ -462,6 +462,14 @@ def spanner_api(self):
462462

463463
return self._spanner_api
464464

465+
@property
466+
def session_options(self) -> SessionOptions:
467+
"""Session options for the database.
468+
:rtype: :class:`~google.cloud.spanner_v1.session_options.SessionOptions`
469+
:returns: the session options
470+
"""
471+
return self._instance._client.session_options
472+
465473
def metadata_with_request_id(
466474
self, nth_request, nth_attempt, prior_metadata=[], span=None
467475
):
@@ -759,18 +767,31 @@ def execute_pdml():
759767
"CloudSpanner.Database.execute_partitioned_pdml",
760768
observability_options=self.observability_options,
761769
) as span, MetricsCapture():
762-
with SessionCheckout(self._pool) as session:
770+
with SessionCheckout(self, TransactionType.PARTITIONED) as session:
763771
add_span_event(span, "Starting BeginTransaction")
764-
txn = api.begin_transaction(
765-
session=session.name,
766-
options=txn_options,
767-
metadata=self.metadata_with_request_id(
768-
self._next_nth_request,
769-
1,
770-
metadata,
771-
span,
772-
),
773-
)
772+
try:
773+
txn = api.begin_transaction(
774+
session=session.name,
775+
options=txn_options,
776+
metadata=self.metadata_with_request_id(
777+
self._next_nth_request,
778+
1,
779+
metadata,
780+
span,
781+
),
782+
)
783+
# If partitioned DML is not supported with multiplexed sessions,
784+
# disable multiplexed sessions for partitioned transactions before
785+
# re-raising the error.
786+
except NotImplementedError as exc:
787+
if (
788+
"Transaction type partitioned_dml not supported with multiplexed sessions"
789+
in str(exc)
790+
):
791+
self.session_options.disable_multiplexed(
792+
self.logger, TransactionType.PARTITIONED
793+
)
794+
raise exc
774795

775796
txn_selector = TransactionSelector(id=txn.id)
776797

@@ -792,6 +813,7 @@ def execute_pdml():
792813
method=method,
793814
trace_name="CloudSpanner.ExecuteStreamingSql",
794815
request=request,
816+
session=session,
795817
metadata=metadata,
796818
transaction_selector=txn_selector,
797819
observability_options=self.observability_options,
@@ -817,23 +839,6 @@ def _nth_client_id(self):
817839
return self._instance._client._nth_client_id
818840
return 0
819841

820-
def session(self, labels=None, database_role=None):
821-
"""Factory to create a session for this database.
822-
823-
:type labels: dict (str -> str) or None
824-
:param labels: (Optional) user-assigned labels for the session.
825-
826-
:type database_role: str
827-
:param database_role: (Optional) user-assigned database_role for the session.
828-
829-
:rtype: :class:`~google.cloud.spanner_v1.session.Session`
830-
:returns: a session bound to this database.
831-
"""
832-
# If role is specified in param, then that role is used
833-
# instead.
834-
role = database_role or self._database_role
835-
return Session(self, labels=labels, database_role=role)
836-
837842
def snapshot(self, **kw):
838843
"""Return an object which wraps a snapshot.
839844
@@ -995,7 +1000,7 @@ def run_in_transaction(self, func, *args, **kw):
9951000
# Check out a session and run the function in a transaction; once
9961001
# done, flip the sanity check bit back.
9971002
try:
998-
with SessionCheckout(self._pool) as session:
1003+
with SessionCheckout(self) as session:
9991004
return session.run_in_transaction(func, *args, **kw)
10001005
finally:
10011006
self._local.transaction_running = False
@@ -1241,6 +1246,50 @@ def observability_options(self):
12411246
return opts
12421247

12431248

1249+
class SessionCheckout(object):
1250+
"""Context manager for using a session from a database.
1251+
:type database: :class:`~google.cloud.spanner_v1.database.Database`
1252+
:param database: database to use the session from
1253+
"""
1254+
1255+
_session = None # Not checked out until '__enter__'.
1256+
1257+
def __init__(
1258+
self,
1259+
database, # type: ignore
1260+
transaction_type: TransactionType = TransactionType.READ_WRITE,
1261+
):
1262+
# Move import here to avoid circular import
1263+
from google.cloud.spanner_v1.database import Database
1264+
if not isinstance(database, Database):
1265+
raise TypeError(
1266+
"{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format(
1267+
class_name=self.__class__.__name__,
1268+
expected_class_name=Database.__name__,
1269+
actual_class_name=database.__class__.__name__,
1270+
)
1271+
)
1272+
1273+
if not isinstance(transaction_type, TransactionType):
1274+
raise TypeError(
1275+
"{class_name} must receive an instance of {expected_class_name}. Received: {actual_class_name}".format(
1276+
class_name=self.__class__.__name__,
1277+
expected_class_name=TransactionType.__name__,
1278+
actual_class_name=transaction_type.__class__.__name__,
1279+
)
1280+
)
1281+
1282+
self._database = database
1283+
self._transaction_type = transaction_type
1284+
1285+
def __enter__(self):
1286+
session_manager = self._database._session_manager
1287+
self._session = session_manager.get_session(self._transaction_type)
1288+
return self._session
1289+
1290+
def __exit__(self, *ignored):
1291+
self._database._session_manager.put_session(self._session)
1292+
12441293
class BatchCheckout(object):
12451294
"""Context manager for using a batch from a database.
12461295
@@ -1929,3 +1978,4 @@ def _retry_on_aborted(func, retry_config):
19291978
"""
19301979
retry = retry_config.with_predicate(if_exception_type(Aborted))
19311980
return retry(func)
1981+

0 commit comments

Comments
 (0)