Skip to content

Commit c07edca

Browse files
committed
Moved MetricCapture out of Tracer logic
1 parent 2dd3a0e commit c07edca

File tree

9 files changed

+42
-26
lines changed

9 files changed

+42
-26
lines changed

google/cloud/spanner_v1/batch.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from google.cloud.spanner_v1._helpers import _retry_on_aborted_exception
3333
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
3434
from google.api_core.exceptions import InternalServerError
35+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
3536
import time
3637

3738
DEFAULT_RETRY_TIMEOUT_SECS = 30
@@ -226,7 +227,7 @@ def commit(
226227
self._session,
227228
trace_attributes,
228229
observability_options=observability_options,
229-
):
230+
), MetricsCapture():
230231
method = functools.partial(
231232
api.commit,
232233
request=request,
@@ -348,7 +349,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
348349
self._session,
349350
trace_attributes,
350351
observability_options=observability_options,
351-
):
352+
), MetricsCapture():
352353
method = functools.partial(
353354
api.batch_write,
354355
request=request,

google/cloud/spanner_v1/database.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
get_current_span,
7373
trace_call,
7474
)
75+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
7576

7677

7778
SPANNER_DATA_SCOPE = "https://www.googleapis.com/auth/spanner.data"
@@ -702,7 +703,7 @@ def execute_pdml():
702703
with trace_call(
703704
"CloudSpanner.Database.execute_partitioned_pdml",
704705
observability_options=self.observability_options,
705-
) as span:
706+
) as span, MetricsCapture():
706707
with SessionCheckout(self._pool) as session:
707708
add_span_event(span, "Starting BeginTransaction")
708709
txn = api.begin_transaction(
@@ -897,7 +898,7 @@ def run_in_transaction(self, func, *args, **kw):
897898
with trace_call(
898899
"CloudSpanner.Database.run_in_transaction",
899900
observability_options=observability_options,
900-
):
901+
), MetricsCapture():
901902
# Sanity check: Is there a transaction already running?
902903
# If there is, then raise a red flag. Otherwise, mark that this one
903904
# is running.
@@ -1489,7 +1490,7 @@ def generate_read_batches(
14891490
f"CloudSpanner.{type(self).__name__}.generate_read_batches",
14901491
extra_attributes=dict(table=table, columns=columns),
14911492
observability_options=self.observability_options,
1492-
):
1493+
), MetricsCapture():
14931494
partitions = self._get_snapshot().partition_read(
14941495
table=table,
14951496
columns=columns,
@@ -1540,7 +1541,7 @@ def process_read_batch(
15401541
with trace_call(
15411542
f"CloudSpanner.{type(self).__name__}.process_read_batch",
15421543
observability_options=observability_options,
1543-
):
1544+
), MetricsCapture():
15441545
kwargs = copy.deepcopy(batch["read"])
15451546
keyset_dict = kwargs.pop("keyset")
15461547
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
@@ -1625,7 +1626,7 @@ def generate_query_batches(
16251626
f"CloudSpanner.{type(self).__name__}.generate_query_batches",
16261627
extra_attributes=dict(sql=sql),
16271628
observability_options=self.observability_options,
1628-
):
1629+
), MetricsCapture():
16291630
partitions = self._get_snapshot().partition_query(
16301631
sql=sql,
16311632
params=params,
@@ -1681,7 +1682,7 @@ def process_query_batch(
16811682
with trace_call(
16821683
f"CloudSpanner.{type(self).__name__}.process_query_batch",
16831684
observability_options=self.observability_options,
1684-
):
1685+
), MetricsCapture():
16851686
return self._get_snapshot().execute_sql(
16861687
partition=batch["partition"],
16871688
**batch["query"],
@@ -1746,7 +1747,7 @@ def run_partitioned_query(
17461747
f"CloudSpanner.${type(self).__name__}.run_partitioned_query",
17471748
extra_attributes=dict(sql=sql),
17481749
observability_options=self.observability_options,
1749-
):
1750+
), MetricsCapture():
17501751
partitions = list(
17511752
self.generate_query_batches(
17521753
sql,

google/cloud/spanner_v1/merged_result_set.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from threading import Lock, Event
1919

2020
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
21+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
2122

2223
if TYPE_CHECKING:
2324
from google.cloud.spanner_v1.database import BatchSnapshot
@@ -45,7 +46,7 @@ def run(self):
4546
with trace_call(
4647
"CloudSpanner.PartitionExecutor.run",
4748
observability_options=observability_options,
48-
):
49+
), MetricsCapture():
4950
self.__run()
5051

5152
def __run(self):

google/cloud/spanner_v1/pool.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
)
3333
from warnings import warn
3434

35+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
36+
3537
_NOW = datetime.datetime.utcnow # unit tests may replace
3638

3739

@@ -242,7 +244,7 @@ def bind(self, database):
242244
with trace_call(
243245
"CloudSpanner.FixedPool.BatchCreateSessions",
244246
observability_options=observability_options,
245-
) as span:
247+
) as span, MetricsCapture():
246248
returned_session_count = 0
247249
while not self._sessions.full():
248250
request.session_count = requested_session_count - self._sessions.qsize()
@@ -552,7 +554,7 @@ def bind(self, database):
552554
with trace_call(
553555
"CloudSpanner.PingingPool.BatchCreateSessions",
554556
observability_options=observability_options,
555-
) as span:
557+
) as span, MetricsCapture():
556558
returned_session_count = 0
557559
while returned_session_count < self.size:
558560
resp = api.batch_create_sessions(

google/cloud/spanner_v1/session.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
from google.cloud.spanner_v1.snapshot import Snapshot
4141
from google.cloud.spanner_v1.transaction import Transaction
4242

43+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
44+
4345

4446
DEFAULT_RETRY_TIMEOUT_SECS = 30
4547
"""Default timeout used by :meth:`Session.run_in_transaction`."""
@@ -165,7 +167,7 @@ def create(self):
165167
self,
166168
self._labels,
167169
observability_options=observability_options,
168-
):
170+
), MetricsCapture():
169171
session_pb = api.create_session(
170172
request=request,
171173
metadata=metadata,
@@ -205,7 +207,7 @@ def exists(self):
205207
observability_options = getattr(self._database, "observability_options", None)
206208
with trace_call(
207209
"CloudSpanner.GetSession", self, observability_options=observability_options
208-
) as span:
210+
) as span, MetricsCapture():
209211
try:
210212
api.get_session(name=self.name, metadata=metadata)
211213
if span:
@@ -248,7 +250,7 @@ def delete(self):
248250
"session.name": self.name,
249251
},
250252
observability_options=observability_options,
251-
):
253+
), MetricsCapture():
252254
api.delete_session(name=self.name, metadata=metadata)
253255

254256
def ping(self):
@@ -467,7 +469,7 @@ def run_in_transaction(self, func, *args, **kw):
467469
"CloudSpanner.Session.run_in_transaction",
468470
self,
469471
observability_options=observability_options,
470-
) as span:
472+
) as span, MetricsCapture():
471473
while True:
472474
if self._transaction is None:
473475
txn = self.transaction()

google/cloud/spanner_v1/snapshot.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
from google.cloud.spanner_v1.streamed import StreamedResultSet
4444
from google.cloud.spanner_v1 import RequestOptions
4545

46+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
47+
4648
_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
4749
"RST_STREAM",
4850
"Received unexpected EOS on DATA frame from server",
@@ -96,7 +98,7 @@ def _restart_on_unavailable(
9698
session,
9799
attributes,
98100
observability_options=observability_options,
99-
):
101+
), MetricsCapture():
100102
iterator = method(request=request)
101103
for item in iterator:
102104
item_buffer.append(item)
@@ -119,7 +121,7 @@ def _restart_on_unavailable(
119121
session,
120122
attributes,
121123
observability_options=observability_options,
122-
):
124+
), MetricsCapture():
123125
request.resume_token = resume_token
124126
if transaction is not None:
125127
transaction_selector = transaction._make_txn_selector()
@@ -139,7 +141,7 @@ def _restart_on_unavailable(
139141
session,
140142
attributes,
141143
observability_options=observability_options,
142-
):
144+
), MetricsCapture():
143145
request.resume_token = resume_token
144146
if transaction is not None:
145147
transaction_selector = transaction._make_txn_selector()
@@ -689,7 +691,7 @@ def partition_read(
689691
self._session,
690692
extra_attributes=trace_attributes,
691693
observability_options=getattr(database, "observability_options", None),
692-
):
694+
), MetricsCapture():
693695
method = functools.partial(
694696
api.partition_read,
695697
request=request,
@@ -792,7 +794,7 @@ def partition_query(
792794
self._session,
793795
trace_attributes,
794796
observability_options=getattr(database, "observability_options", None),
795-
):
797+
), MetricsCapture():
796798
method = functools.partial(
797799
api.partition_query,
798800
request=request,
@@ -938,7 +940,7 @@ def begin(self):
938940
f"CloudSpanner.{type(self).__name__}.begin",
939941
self._session,
940942
observability_options=getattr(database, "observability_options", None),
941-
):
943+
), MetricsCapture():
942944
method = functools.partial(
943945
api.begin_transaction,
944946
session=self._session.name,

google/cloud/spanner_v1/transaction.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from google.cloud.spanner_v1.batch import _BatchBase
3535
from google.cloud.spanner_v1._opentelemetry_tracing import add_span_event, trace_call
3636
from google.cloud.spanner_v1 import RequestOptions
37+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
3738
from google.api_core import gapic_v1
3839
from google.api_core.exceptions import InternalServerError
3940
from dataclasses import dataclass
@@ -118,7 +119,7 @@ def _execute_request(
118119
request.transaction = transaction
119120
with trace_call(
120121
trace_name, session, attributes, observability_options=observability_options
121-
):
122+
), MetricsCapture():
122123
method = functools.partial(method, request=request)
123124
response = _retry(
124125
method,
@@ -160,7 +161,7 @@ def begin(self):
160161
f"CloudSpanner.{type(self).__name__}.begin",
161162
self._session,
162163
observability_options=observability_options,
163-
) as span:
164+
) as span, MetricsCapture():
164165
method = functools.partial(
165166
api.begin_transaction,
166167
session=self._session.name,
@@ -202,7 +203,7 @@ def rollback(self):
202203
f"CloudSpanner.{type(self).__name__}.rollback",
203204
self._session,
204205
observability_options=observability_options,
205-
):
206+
), MetricsCapture():
206207
method = functools.partial(
207208
api.rollback,
208209
session=self._session.name,
@@ -250,7 +251,7 @@ def commit(
250251
self._session,
251252
trace_attributes,
252253
observability_options,
253-
) as span:
254+
) as span, MetricsCapture():
254255
self._check_state()
255256
if self._transaction_id is None and len(self._mutations) > 0:
256257
self.begin()

testing/constraints-3.7.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ protobuf==3.20.2
1717
deprecated==1.2.14
1818
grpc-interceptor==0.15.4
1919
google-cloud-monitoring==2.16.0
20+
mmh3==4.1.0

tests/unit/test_metrics.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import pytest
1516
from unittest.mock import MagicMock
1617
from google.api_core.exceptions import ServiceUnavailable
1718
from google.cloud.spanner_v1.client import Client
@@ -21,6 +22,10 @@
2122
SpannerMetricsTracerFactory,
2223
)
2324

25+
pytest.importorskip("opentelemetry")
26+
# Skip if semconv attributes are not present, as tracing wont' be enabled either
27+
# pytest.importorskip("opentelemetry.semconv.attributes.otel_attributes")
28+
2429

2530
def test_metrics_emission_with_failure_attempt(monkeypatch):
2631
monkeypatch.setenv("SPANNER_ENABLE_BUILTIN_METRICS", "true")

0 commit comments

Comments
 (0)