Skip to content

Commit ce3f230

Browse files
currantwrahul2393
andauthored
feat: Add support for multiplexed sessions - read/write (#1389)
* feat: Multiplexed sessions - Support multiplexed sessions for read/write transactions. Signed-off-by: Taylor Curran <[email protected]> * feat: Multiplexed sessions - Remove `Session._transaction` attribute, since each session may not correspond to multiple transactions. Signed-off-by: Taylor Curran <[email protected]> * feat: Multiplexed sessions - Refactor logic for creating transaction selector to base class. Signed-off-by: Taylor Curran <[email protected]> * feat: Multiplexed sessions - Add retry logic to run_in_transaction with previous transaction ID. Signed-off-by: Taylor Curran <[email protected]> * feat: Multiplexed sessions - Remove unnecessary divider comments Signed-off-by: Taylor Curran <[email protected]> * feat: Multiplexed sessions - Only populate previous transaction ID for transactions with multiplexed session. Signed-off-by: Taylor Curran <[email protected]> --------- Signed-off-by: Taylor Curran <[email protected]> Co-authored-by: rahul2393 <[email protected]>
1 parent cb25de4 commit ce3f230

File tree

9 files changed

+570
-548
lines changed

9 files changed

+570
-548
lines changed

google/cloud/spanner_v1/database_sessions_manager.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,10 @@ def get_session(self, transaction_type: TransactionType) -> Session:
8686
:returns: a session for the given transaction type.
8787
"""
8888

89-
use_multiplexed = self._use_multiplexed(transaction_type)
90-
91-
# TODO multiplexed: enable for read/write transactions
92-
if use_multiplexed and transaction_type == TransactionType.READ_WRITE:
93-
raise NotImplementedError(
94-
f"Multiplexed sessions are not yet supported for {transaction_type} transactions."
95-
)
96-
9789
session = (
98-
self._get_multiplexed_session() if use_multiplexed else self._pool.get()
90+
self._get_multiplexed_session()
91+
if self._use_multiplexed(transaction_type)
92+
else self._pool.get()
9993
)
10094

10195
add_span_event(

google/cloud/spanner_v1/session.py

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ def __init__(self, database, labels=None, database_role=None, is_multiplexed=Fal
7474
self._database = database
7575
self._session_id: Optional[str] = None
7676

77-
# TODO multiplexed - remove
78-
self._transaction: Optional[Transaction] = None
79-
8077
if labels is None:
8178
labels = {}
8279

@@ -467,23 +464,18 @@ def batch(self):
467464

468465
return Batch(self)
469466

470-
def transaction(self):
467+
def transaction(self) -> Transaction:
471468
"""Create a transaction to perform a set of reads with shared staleness.
472469
473470
:rtype: :class:`~google.cloud.spanner_v1.transaction.Transaction`
474471
:returns: a transaction bound to this session
472+
475473
:raises ValueError: if the session has not yet been created.
476474
"""
477475
if self._session_id is None:
478476
raise ValueError("Session has not been created.")
479477

480-
# TODO multiplexed - remove
481-
if self._transaction is not None:
482-
self._transaction.rolled_back = True
483-
self._transaction = None
484-
485-
txn = self._transaction = Transaction(self)
486-
return txn
478+
return Transaction(self)
487479

488480
def run_in_transaction(self, func, *args, **kw):
489481
"""Perform a unit of work in a transaction, retrying on abort.
@@ -528,42 +520,43 @@ def run_in_transaction(self, func, *args, **kw):
528520
)
529521
isolation_level = kw.pop("isolation_level", None)
530522

531-
attempts = 0
523+
database = self._database
524+
log_commit_stats = database.log_commit_stats
532525

533-
observability_options = getattr(self._database, "observability_options", None)
534526
with trace_call(
535527
"CloudSpanner.Session.run_in_transaction",
536528
self,
537-
observability_options=observability_options,
529+
observability_options=getattr(database, "observability_options", None),
538530
) as span, MetricsCapture():
531+
attempts: int = 0
532+
533+
# If a transaction using a multiplexed session is retried after an aborted
534+
# user operation, it should include the previous transaction ID in the
535+
# transaction options used to begin the transaction. This allows the backend
536+
# to recognize the transaction and increase the lock order for the new
537+
# transaction that is created.
538+
# See :attr:`~google.cloud.spanner_v1.types.TransactionOptions.ReadWrite.multiplexed_session_previous_transaction_id`
539+
previous_transaction_id: Optional[bytes] = None
540+
539541
while True:
540-
# TODO multiplexed - remove
541-
if self._transaction is None:
542-
txn = self.transaction()
543-
txn.transaction_tag = transaction_tag
544-
txn.exclude_txn_from_change_streams = (
545-
exclude_txn_from_change_streams
542+
txn = self.transaction()
543+
txn.transaction_tag = transaction_tag
544+
txn.exclude_txn_from_change_streams = exclude_txn_from_change_streams
545+
txn.isolation_level = isolation_level
546+
547+
if self.is_multiplexed:
548+
txn._multiplexed_session_previous_transaction_id = (
549+
previous_transaction_id
546550
)
547-
txn.isolation_level = isolation_level
548-
else:
549-
txn = self._transaction
550551

551-
span_attributes = dict()
552+
attempts += 1
553+
span_attributes = dict(attempt=attempts)
552554

553555
try:
554-
attempts += 1
555-
span_attributes["attempt"] = attempts
556-
txn_id = getattr(txn, "_transaction_id", "") or ""
557-
if txn_id:
558-
span_attributes["transaction.id"] = txn_id
559-
560556
return_value = func(txn, *args, **kw)
561557

562-
# TODO multiplexed: store previous transaction ID.
563558
except Aborted as exc:
564-
# TODO multiplexed - remove
565-
self._transaction = None
566-
559+
previous_transaction_id = txn._transaction_id
567560
if span:
568561
delay_seconds = _get_retry_delay(
569562
exc.errors[0],
@@ -582,16 +575,15 @@ def run_in_transaction(self, func, *args, **kw):
582575
exc, deadline, attempts, default_retry_delay=default_retry_delay
583576
)
584577
continue
585-
except GoogleAPICallError:
586-
# TODO multiplexed - remove
587-
self._transaction = None
588578

579+
except GoogleAPICallError:
589580
add_span_event(
590581
span,
591582
"User operation failed due to GoogleAPICallError, not retrying",
592583
span_attributes,
593584
)
594585
raise
586+
595587
except Exception:
596588
add_span_event(
597589
span,
@@ -603,14 +595,13 @@ def run_in_transaction(self, func, *args, **kw):
603595

604596
try:
605597
txn.commit(
606-
return_commit_stats=self._database.log_commit_stats,
598+
return_commit_stats=log_commit_stats,
607599
request_options=commit_request_options,
608600
max_commit_delay=max_commit_delay,
609601
)
610-
except Aborted as exc:
611-
# TODO multiplexed - remove
612-
self._transaction = None
613602

603+
except Aborted as exc:
604+
previous_transaction_id = txn._transaction_id
614605
if span:
615606
delay_seconds = _get_retry_delay(
616607
exc.errors[0],
@@ -621,26 +612,25 @@ def run_in_transaction(self, func, *args, **kw):
621612
attributes.update(span_attributes)
622613
add_span_event(
623614
span,
624-
"Transaction got aborted during commit, retrying afresh",
615+
"Transaction was aborted during commit, retrying",
625616
attributes,
626617
)
627618

628619
_delay_until_retry(
629620
exc, deadline, attempts, default_retry_delay=default_retry_delay
630621
)
631-
except GoogleAPICallError:
632-
# TODO multiplexed - remove
633-
self._transaction = None
634622

623+
except GoogleAPICallError:
635624
add_span_event(
636625
span,
637626
"Transaction.commit failed due to GoogleAPICallError, not retrying",
638627
span_attributes,
639628
)
640629
raise
630+
641631
else:
642-
if self._database.log_commit_stats and txn.commit_stats:
643-
self._database.logger.info(
632+
if log_commit_stats and txn.commit_stats:
633+
database.logger.info(
644634
"CommitStats: {}".format(txn.commit_stats),
645635
extra={"commit_stats": txn.commit_stats},
646636
)

google/cloud/spanner_v1/snapshot.py

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def _restart_on_unavailable(
9393
item_buffer: List[PartialResultSet] = []
9494

9595
if transaction is not None:
96-
transaction_selector = transaction._make_txn_selector()
96+
transaction_selector = transaction._build_transaction_selector_pb()
9797
elif transaction_selector is None:
9898
raise InvalidArgument(
9999
"Either transaction or transaction_selector should be set"
@@ -149,7 +149,7 @@ def _restart_on_unavailable(
149149
) as span, MetricsCapture():
150150
request.resume_token = resume_token
151151
if transaction is not None:
152-
transaction_selector = transaction._make_txn_selector()
152+
transaction_selector = transaction._build_transaction_selector_pb()
153153
request.transaction = transaction_selector
154154
attempt += 1
155155
iterator = method(
@@ -180,7 +180,7 @@ def _restart_on_unavailable(
180180
) as span, MetricsCapture():
181181
request.resume_token = resume_token
182182
if transaction is not None:
183-
transaction_selector = transaction._make_txn_selector()
183+
transaction_selector = transaction._build_transaction_selector_pb()
184184
attempt += 1
185185
request.transaction = transaction_selector
186186
iterator = method(
@@ -238,17 +238,6 @@ def __init__(self, session):
238238
# threads, so we need to use a lock when updating the transaction.
239239
self._lock: threading.Lock = threading.Lock()
240240

241-
def _make_txn_selector(self):
242-
"""Helper for :meth:`read` / :meth:`execute_sql`.
243-
244-
Subclasses must override, returning an instance of
245-
:class:`transaction_pb2.TransactionSelector`
246-
appropriate for making ``read`` / ``execute_sql`` requests
247-
248-
:raises: NotImplementedError, always
249-
"""
250-
raise NotImplementedError
251-
252241
def begin(self) -> bytes:
253242
"""Begins a transaction on the database.
254243
@@ -732,7 +721,7 @@ def partition_read(
732721
metadata.append(
733722
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
734723
)
735-
transaction = self._make_txn_selector()
724+
transaction = self._build_transaction_selector_pb()
736725
partition_options = PartitionOptions(
737726
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
738727
)
@@ -854,7 +843,7 @@ def partition_query(
854843
metadata.append(
855844
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
856845
)
857-
transaction = self._make_txn_selector()
846+
transaction = self._build_transaction_selector_pb()
858847
partition_options = PartitionOptions(
859848
partition_size_bytes=partition_size_bytes, max_partitions=max_partitions
860849
)
@@ -944,7 +933,7 @@ def _begin_transaction(self, mutation: Mutation = None) -> bytes:
944933
def wrapped_method():
945934
begin_transaction_request = BeginTransactionRequest(
946935
session=session.name,
947-
options=self._make_txn_selector().begin,
936+
options=self._build_transaction_selector_pb().begin,
948937
mutation_key=mutation,
949938
)
950939
begin_transaction_method = functools.partial(
@@ -983,6 +972,34 @@ def before_next_retry(nth_retry, delay_in_seconds):
983972
self._update_for_transaction_pb(transaction_pb)
984973
return self._transaction_id
985974

975+
def _build_transaction_options_pb(self) -> TransactionOptions:
976+
"""Builds and returns the transaction options for this snapshot.
977+
978+
:rtype: :class:`transaction_pb2.TransactionOptions`
979+
:returns: the transaction options for this snapshot.
980+
"""
981+
raise NotImplementedError
982+
983+
def _build_transaction_selector_pb(self) -> TransactionSelector:
984+
"""Builds and returns a transaction selector for this snapshot.
985+
986+
:rtype: :class:`transaction_pb2.TransactionSelector`
987+
:returns: a transaction selector for this snapshot.
988+
"""
989+
990+
# Select a previously begun transaction.
991+
if self._transaction_id is not None:
992+
return TransactionSelector(id=self._transaction_id)
993+
994+
options = self._build_transaction_options_pb()
995+
996+
# Select a single-use transaction.
997+
if not self._multi_use:
998+
return TransactionSelector(single_use=options)
999+
1000+
# Select a new, multi-use transaction.
1001+
return TransactionSelector(begin=options)
1002+
9861003
def _update_for_result_set_pb(
9871004
self, result_set_pb: Union[ResultSet, PartialResultSet]
9881005
) -> None:
@@ -1101,38 +1118,28 @@ def __init__(
11011118
self._multi_use = multi_use
11021119
self._transaction_id = transaction_id
11031120

1104-
# TODO multiplexed - refactor to base class
1105-
def _make_txn_selector(self):
1106-
"""Helper for :meth:`read`."""
1107-
if self._transaction_id is not None:
1108-
return TransactionSelector(id=self._transaction_id)
1121+
def _build_transaction_options_pb(self) -> TransactionOptions:
1122+
"""Builds and returns transaction options for this snapshot.
1123+
1124+
:rtype: :class:`transaction_pb2.TransactionOptions`
1125+
:returns: transaction options for this snapshot.
1126+
"""
1127+
1128+
read_only_pb_args = dict(return_read_timestamp=True)
11091129

11101130
if self._read_timestamp:
1111-
key = "read_timestamp"
1112-
value = self._read_timestamp
1131+
read_only_pb_args["read_timestamp"] = self._read_timestamp
11131132
elif self._min_read_timestamp:
1114-
key = "min_read_timestamp"
1115-
value = self._min_read_timestamp
1133+
read_only_pb_args["min_read_timestamp"] = self._min_read_timestamp
11161134
elif self._max_staleness:
1117-
key = "max_staleness"
1118-
value = self._max_staleness
1135+
read_only_pb_args["max_staleness"] = self._max_staleness
11191136
elif self._exact_staleness:
1120-
key = "exact_staleness"
1121-
value = self._exact_staleness
1137+
read_only_pb_args["exact_staleness"] = self._exact_staleness
11221138
else:
1123-
key = "strong"
1124-
value = True
1125-
1126-
options = TransactionOptions(
1127-
read_only=TransactionOptions.ReadOnly(
1128-
**{key: value, "return_read_timestamp": True}
1129-
)
1130-
)
1139+
read_only_pb_args["strong"] = True
11311140

1132-
if self._multi_use:
1133-
return TransactionSelector(begin=options)
1134-
else:
1135-
return TransactionSelector(single_use=options)
1141+
read_only_pb = TransactionOptions.ReadOnly(**read_only_pb_args)
1142+
return TransactionOptions(read_only=read_only_pb)
11361143

11371144
def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
11381145
"""Updates the snapshot for the given transaction.

0 commit comments

Comments
 (0)