Skip to content

Commit 3ad9ce9

Browse files
committed
fix systests
1 parent 679f9be commit 3ad9ce9

File tree

6 files changed

+47
-16
lines changed

6 files changed

+47
-16
lines changed

google/cloud/spanner_v1/database.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,14 @@ def session(self, labels=None, database_role=None):
848848
# If role is specified in param, then that role is used
849849
# instead.
850850
role = database_role or self._database_role
851-
return Session(self, labels=labels, database_role=role)
851+
is_multiplexed = False
852+
if self.sessions_manager._use_multiplexed(
853+
transaction_type=TransactionType.READ_ONLY
854+
):
855+
is_multiplexed = True
856+
return Session(
857+
self, labels=labels, database_role=role, is_multiplexed=is_multiplexed
858+
)
852859

853860
def snapshot(self, **kw):
854861
"""Return an object which wraps a snapshot.

google/cloud/spanner_v1/session.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,13 @@ def delete(self):
275275
current_span, "Deleting Session failed due to unset session_id"
276276
)
277277
raise ValueError("Session ID not set by back-end")
278-
278+
if self._is_multiplexed:
279+
add_span_event(
280+
current_span,
281+
"Skipped deleting Multiplexed Session",
282+
{"session.id": self._session_id},
283+
)
284+
return
279285
add_span_event(
280286
current_span, "Deleting Session", {"session.id": self._session_id}
281287
)

google/cloud/spanner_v1/snapshot.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ def _restart_on_unavailable(
133133
# Update the transaction from the response.
134134
if transaction is not None:
135135
transaction._update_for_result_set_pb(item)
136+
if item.precommit_token is not None and transaction is not None:
137+
transaction._update_for_precommit_token_pb(item.precommit_token)
136138

137139
if item.resume_token:
138140
resume_token = item.resume_token
@@ -1013,9 +1015,6 @@ def _update_for_result_set_pb(
10131015
if result_set_pb.metadata and result_set_pb.metadata.transaction:
10141016
self._update_for_transaction_pb(result_set_pb.metadata.transaction)
10151017

1016-
if result_set_pb.precommit_token:
1017-
self._update_for_precommit_token_pb(result_set_pb.precommit_token)
1018-
10191018
def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
10201019
"""Updates the snapshot for the given transaction.
10211020
@@ -1031,7 +1030,7 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
10311030
self._transaction_id = transaction_pb.id
10321031

10331032
if transaction_pb.precommit_token:
1034-
self._update_for_precommit_token_pb(transaction_pb.precommit_token)
1033+
self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token)
10351034

10361035
def _update_for_precommit_token_pb(
10371036
self, precommit_token_pb: MultiplexedSessionPrecommitToken
@@ -1044,10 +1043,22 @@ def _update_for_precommit_token_pb(
10441043
# Because multiple threads can be used to perform operations within a
10451044
# transaction, we need to use a lock when updating the precommit token.
10461045
with self._lock:
1047-
if self._precommit_token is None or (
1048-
precommit_token_pb.seq_num > self._precommit_token.seq_num
1049-
):
1050-
self._precommit_token = precommit_token_pb
1046+
self._update_for_precommit_token_pb_unsafe(precommit_token_pb)
1047+
1048+
def _update_for_precommit_token_pb_unsafe(
1049+
self, precommit_token_pb: MultiplexedSessionPrecommitToken
1050+
) -> None:
1051+
"""Updates the snapshot for the given multiplexed session precommit token.
1052+
This method is unsafe because it does not acquire a lock before updating
1053+
the precommit token. It should only be used when the caller has already
1054+
acquired the lock.
1055+
:type precommit_token_pb: :class:`~google.cloud.spanner_v1.MultiplexedSessionPrecommitToken`
1056+
:param precommit_token_pb: The multiplexed session precommit token to update the snapshot with.
1057+
"""
1058+
if self._precommit_token is None or (
1059+
precommit_token_pb.seq_num > self._precommit_token.seq_num
1060+
):
1061+
self._precommit_token = precommit_token_pb
10511062

10521063

10531064
class Snapshot(_SnapshotBase):

google/cloud/spanner_v1/transaction.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,9 @@ def wrapped_method(*args, **kwargs):
516516
if is_inline_begin:
517517
self._lock.release()
518518

519+
if result_set_pb.precommit_token is not None:
520+
self._update_for_precommit_token_pb(result_set_pb.precommit_token)
521+
519522
return result_set_pb.stats.row_count_exact
520523

521524
def batch_update(
@@ -660,6 +663,9 @@ def wrapped_method(*args, **kwargs):
660663
if is_inline_begin:
661664
self._lock.release()
662665

666+
if len(response_pb.result_sets) > 0 and response_pb.result_sets[0].precommit_token:
667+
self._update_for_precommit_token_pb(response_pb.result_sets[0].precommit_token)
668+
663669
row_counts = [
664670
result_set.stats.row_count_exact for result_set in response_pb.result_sets
665671
]
@@ -736,9 +742,6 @@ def _update_for_execute_batch_dml_response_pb(
736742
:type response_pb: :class:`~google.cloud.spanner_v1.types.ExecuteBatchDmlResponse`
737743
:param response_pb: The execute batch DML response to update the transaction with.
738744
"""
739-
if response_pb.precommit_token:
740-
self._update_for_precommit_token_pb(response_pb.precommit_token)
741-
742745
# Only the first result set contains the result set metadata.
743746
if len(response_pb.result_sets) > 0:
744747
self._update_for_result_set_pb(response_pb.result_sets[0])

tests/system/_sample_data.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ def _assert_timestamp(value, nano_value):
101101
# Allow up to 1 microsecond difference for timestamp precision issues
102102
us_diff = abs(value.microsecond - nano_value.microsecond)
103103
if us_diff > 1:
104-
print(f"DEBUG: Microsecond comparison failed:")
105104
print(f" Expected: {value} (microsecond: {value.microsecond})")
106105
print(f" Found: {nano_value} (microsecond: {nano_value.microsecond})")
107106
print(f" Difference: {us_diff} microseconds")

tests/system/test_session_api.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,9 @@ def handle_abort(self, database):
424424

425425

426426
def test_session_crud(sessions_database):
427+
if is_multiplexed_enabled(transaction_type=TransactionType.READ_ONLY):
428+
pytest.skip("Multiplexed sessions do not support CRUD operations.")
429+
427430
session = sessions_database.session()
428431
assert not session.exists()
429432

@@ -1412,11 +1415,13 @@ def unit_of_work(transaction):
14121415
for span in ot_exporter.get_finished_spans():
14131416
if span and span.name:
14141417
span_list.append(span)
1415-
1418+
multiplexed_enabled = is_multiplexed_enabled(TransactionType.READ_WRITE)
14161419
span_list = sorted(span_list, key=lambda v1: v1.start_time)
14171420
got_span_names = [span.name for span in span_list]
14181421
expected_span_names = [
1419-
"CloudSpanner.CreateSession",
1422+
"CloudSpanner.CreateMultiplexedSession"
1423+
if multiplexed_enabled
1424+
else "CloudSpanner.CreateSession",
14201425
"CloudSpanner.Batch.commit",
14211426
"Test Span",
14221427
"CloudSpanner.Session.run_in_transaction",

0 commit comments

Comments
 (0)