Skip to content

Commit 80faca0

Browse files
committed
Adjust with updates
1 parent a5fdebd commit 80faca0

File tree

8 files changed

+154
-66
lines changed

8 files changed

+154
-66
lines changed

google/cloud/spanner_v1/database.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -759,19 +759,15 @@ def execute_partitioned_dml(
759759
_metadata_with_leader_aware_routing(self._route_to_leader_enabled)
760760
)
761761

762-
begin_txn_nth_request = self._next_nth_request
763-
begin_txn_attempt = AtomicCounter(0)
764-
partial_nth_request = self._next_nth_request
765-
# partial_attempt will be incremented inside _restart_on_unavailable.
766-
partial_attempt = AtomicCounter(0)
767-
768762
def execute_pdml():
769763
with trace_call(
770764
"CloudSpanner.Database.execute_partitioned_pdml",
771765
observability_options=self.observability_options,
772766
) as span, MetricsCapture():
773767
with SessionCheckout(self._pool) as session:
774768
add_span_event(span, "Starting BeginTransaction")
769+
begin_txn_nth_request = self._next_nth_request
770+
begin_txn_attempt = AtomicCounter(0)
775771
txn = api.begin_transaction(
776772
session=session.name,
777773
options=txn_options,

google/cloud/spanner_v1/request_id_header.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ def generate_rand_uint64():
3737

3838
def with_request_id(client_id, channel_id, nth_request, attempt, other_metadata=[]):
3939
req_id = f"{REQ_ID_VERSION}.{REQ_RAND_PROCESS_ID}.{client_id}.{channel_id}.{nth_request}.{attempt}"
40-
all_metadata = other_metadata.copy()
40+
all_metadata = (other_metadata or []).copy()
4141
all_metadata.append((REQ_ID_HEADER_KEY, req_id))
4242
return all_metadata

google/cloud/spanner_v1/session.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,7 @@ def delete(self):
268268
), MetricsCapture():
269269
api.delete_session(
270270
name=self.name,
271-
metadata=database.metadata_with_request_id(
272-
database._next_nth_request, 1, metadata
273-
),
271+
metadata=metadata,
274272
)
275273

276274
def ping(self):

google/cloud/spanner_v1/snapshot.py

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -355,13 +355,10 @@ def read(
355355
directed_read_options=directed_read_options,
356356
)
357357

358-
nth_request = getattr(database, "_next_nth_request", 0)
359-
all_metadata = database.metadata_with_request_id(nth_request, 1, metadata)
360-
361358
restart = functools.partial(
362359
api.streaming_read,
363360
request=request,
364-
metadata=all_metadata,
361+
metadata=metadata,
365362
retry=retry,
366363
timeout=timeout,
367364
)
@@ -751,17 +748,24 @@ def partition_read(
751748
metadata=metadata,
752749
), MetricsCapture():
753750
nth_request = getattr(database, "_next_nth_request", 0)
754-
all_metadata = database.metadata_with_request_id(nth_request, 1, metadata)
755-
method = functools.partial(
756-
api.partition_read,
757-
request=request,
758-
metadata=all_metadata,
759-
retry=retry,
760-
timeout=timeout,
761-
)
751+
counters = dict(attempt=0)
752+
753+
def attempt_tracking_method():
754+
counters["attempt"] += 1
755+
all_metadata = database.metadata_with_request_id(
756+
nth_request, counters["attempt"], metadata
757+
)
758+
method = functools.partial(
759+
api.partition_read,
760+
request=request,
761+
metadata=all_metadata,
762+
retry=retry,
763+
timeout=timeout,
764+
)
765+
return method()
762766

763767
response = _retry(
764-
method,
768+
attempt_tracking_method,
765769
allowed_exceptions={InternalServerError: _check_rst_stream_error},
766770
)
767771

@@ -858,17 +862,24 @@ def partition_query(
858862
metadata=metadata,
859863
), MetricsCapture():
860864
nth_request = getattr(database, "_next_nth_request", 0)
861-
all_metadata = database.metadata_with_request_id(nth_request, 1, metadata)
862-
method = functools.partial(
863-
api.partition_query,
864-
request=request,
865-
metadata=all_metadata,
866-
retry=retry,
867-
timeout=timeout,
868-
)
865+
counters = dict(attempt=0)
866+
867+
def attempt_tracking_method():
868+
counters["attempt"] += 1
869+
all_metadata = database.metadata_with_request_id(
870+
nth_request, counters["attempt"], metadata
871+
)
872+
method = functools.partial(
873+
api.partition_query,
874+
request=request,
875+
metadata=all_metadata,
876+
retry=retry,
877+
timeout=timeout,
878+
)
879+
return method()
869880

870881
response = _retry(
871-
method,
882+
attempt_tracking_method,
872883
allowed_exceptions={InternalServerError: _check_rst_stream_error},
873884
)
874885

@@ -1008,16 +1019,23 @@ def begin(self):
10081019
metadata=metadata,
10091020
), MetricsCapture():
10101021
nth_request = getattr(database, "_next_nth_request", 0)
1011-
all_metadata = database.metadata_with_request_id(nth_request, 1, metadata)
1012-
method = functools.partial(
1013-
api.begin_transaction,
1014-
session=self._session.name,
1015-
options=txn_selector.begin,
1016-
metadata=all_metadata,
1017-
)
1022+
counters = dict(attempt=0)
1023+
1024+
def attempt_tracking_method():
1025+
counters["attempt"] += 1
1026+
all_metadata = database.metadata_with_request_id(
1027+
nth_request, counters["attempt"], metadata
1028+
)
1029+
method = functools.partial(
1030+
api.begin_transaction,
1031+
session=self._session.name,
1032+
options=txn_selector.begin,
1033+
metadata=all_metadata,
1034+
)
1035+
return method()
10181036

10191037
response = _retry(
1020-
method,
1038+
attempt_tracking_method,
10211039
allowed_exceptions={InternalServerError: _check_rst_stream_error},
10221040
)
10231041
self._transaction_id = response.id

tests/unit/test_batch.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -590,10 +590,6 @@ def _test_batch_write_with_request_options(
590590
expected_metadata = [
591591
("google-cloud-resource-prefix", database.name),
592592
("x-goog-spanner-route-to-leader", "true"),
593-
(
594-
"x-goog-spanner-request-id",
595-
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
596-
),
597593
]
598594

599595
if enable_end_to_end_tracing and ot_helpers.HAS_OPENTELEMETRY_INSTALLED:
@@ -603,6 +599,12 @@ def _test_batch_write_with_request_options(
603599
"traceparent is missing in metadata",
604600
)
605601

602+
expected_metadata.append(
603+
(
604+
"x-goog-spanner-request-id",
605+
f"1.{REQ_RAND_PROCESS_ID}.{database._nth_client_id}.1.1.1",
606+
)
607+
)
606608
# Remove traceparent from actual metadata for comparison
607609
filtered_metadata = [item for item in metadata if item[0] != "traceparent"]
608610

tests/unit/test_session.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2065,6 +2065,10 @@ def unit_of_work(txn, *args, **kw):
20652065
metadata=[
20662066
("google-cloud-resource-prefix", database.name),
20672067
("x-goog-spanner-route-to-leader", "true"),
2068+
(
2069+
"x-goog-spanner-request-id",
2070+
f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1",
2071+
),
20682072
],
20692073
)
20702074

@@ -2099,6 +2103,10 @@ def unit_of_work(txn, *args, **kw):
20992103
metadata=[
21002104
("google-cloud-resource-prefix", database.name),
21012105
("x-goog-spanner-route-to-leader", "true"),
2106+
(
2107+
"x-goog-spanner-request-id",
2108+
f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1",
2109+
),
21022110
],
21032111
)
21042112

@@ -2137,6 +2145,10 @@ def unit_of_work(txn, *args, **kw):
21372145
metadata=[
21382146
("google-cloud-resource-prefix", database.name),
21392147
("x-goog-spanner-route-to-leader", "true"),
2148+
(
2149+
"x-goog-spanner-request-id",
2150+
f"1.{REQ_RAND_PROCESS_ID}.{database.NTH_CLIENT.value}.1.1.1",
2151+
),
21402152
],
21412153
)
21422154

0 commit comments

Comments
 (0)