Skip to content

Commit fb21d9a

Browse files
lszinvrahul2393
andauthored
feat: Add Attempt, Operation and GFE Metrics (#1302)
* Feat: Added Metric Interceptor integration with Attempt metrics * Feat: Added Operation and GFE Metrics * Removed warning from GCP Resource Detector * Added Attempt failure test * Moved MetricCapture out of Tracer logic * Adjustments to handle-disabled behaviour of MetricsCapture * Added higher-level short circuiting of metric logic when disabled --------- Co-authored-by: rahul2393 <[email protected]>
1 parent d025867 commit fb21d9a

33 files changed

+1029
-134
lines changed

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
except ImportError:
3434
HAS_OPENTELEMETRY_INSTALLED = False
3535

36+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
37+
3638
TRACER_NAME = "cloud.google.com/python/spanner"
3739
TRACER_VERSION = gapic_version.__version__
3840
extended_tracing_globally_disabled = (
@@ -111,26 +113,27 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
111113
with tracer.start_as_current_span(
112114
name, kind=trace.SpanKind.CLIENT, attributes=attributes
113115
) as span:
114-
try:
115-
yield span
116-
except Exception as error:
117-
span.set_status(Status(StatusCode.ERROR, str(error)))
118-
# OpenTelemetry-Python imposes invoking span.record_exception on __exit__
119-
# on any exception. We should file a bug later on with them to only
120-
# invoke .record_exception if not already invoked, hence we should not
121-
# invoke .record_exception on our own else we shall have 2 exceptions.
122-
raise
123-
else:
124-
# All spans still have set_status available even if for example
125-
# NonRecordingSpan doesn't have "_status".
126-
absent_span_status = getattr(span, "_status", None) is None
127-
if absent_span_status or span._status.status_code == StatusCode.UNSET:
128-
# OpenTelemetry-Python only allows a status change
129-
# if the current code is UNSET or ERROR. At the end
130-
# of the generator's consumption, only set it to OK
131-
# it wasn't previously set otherwise.
132-
# https://github.com/googleapis/python-spanner/issues/1246
133-
span.set_status(Status(StatusCode.OK))
116+
with MetricsCapture():
117+
try:
118+
yield span
119+
except Exception as error:
120+
span.set_status(Status(StatusCode.ERROR, str(error)))
121+
# OpenTelemetry-Python imposes invoking span.record_exception on __exit__
122+
# on any exception. We should file a bug later on with them to only
123+
# invoke .record_exception if not already invoked, hence we should not
124+
# invoke .record_exception on our own else we shall have 2 exceptions.
125+
raise
126+
else:
127+
# All spans still have set_status available even if for example
128+
# NonRecordingSpan doesn't have "_status".
129+
absent_span_status = getattr(span, "_status", None) is None
130+
if absent_span_status or span._status.status_code == StatusCode.UNSET:
131+
# OpenTelemetry-Python only allows a status change
132+
# if the current code is UNSET or ERROR. At the end
133+
# of the generator's consumption, only set it to OK
134+
# it wasn't previously set otherwise.
135+
# https://github.com/googleapis/python-spanner/issues/1246
136+
span.set_status(Status(StatusCode.OK))
134137

135138

136139
def get_current_span():

google/cloud/spanner_v1/batch.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from google.cloud.spanner_v1._helpers import _retry_on_aborted_exception
3333
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
3434
from google.api_core.exceptions import InternalServerError
35+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
3536
import time
3637

3738
DEFAULT_RETRY_TIMEOUT_SECS = 30
@@ -226,7 +227,7 @@ def commit(
226227
self._session,
227228
trace_attributes,
228229
observability_options=observability_options,
229-
):
230+
), MetricsCapture():
230231
method = functools.partial(
231232
api.commit,
232233
request=request,
@@ -348,7 +349,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
348349
self._session,
349350
trace_attributes,
350351
observability_options=observability_options,
351-
):
352+
), MetricsCapture():
352353
method = functools.partial(
353354
api.batch_write,
354355
request=request,

google/cloud/spanner_v1/client.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,30 @@
4848
from google.cloud.spanner_v1._helpers import _merge_query_options
4949
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
5050
from google.cloud.spanner_v1.instance import Instance
51+
from google.cloud.spanner_v1.metrics.constants import (
52+
ENABLE_SPANNER_METRICS_ENV_VAR,
53+
METRIC_EXPORT_INTERVAL_MS,
54+
)
55+
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import (
56+
SpannerMetricsTracerFactory,
57+
)
58+
from google.cloud.spanner_v1.metrics.metrics_exporter import (
59+
CloudMonitoringMetricsExporter,
60+
)
61+
62+
try:
63+
from opentelemetry import metrics
64+
from opentelemetry.sdk.metrics import MeterProvider
65+
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
66+
67+
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True
68+
except ImportError: # pragma: NO COVER
69+
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False
70+
5171

5272
_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
5373
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
74+
ENABLE_BUILTIN_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
5475
_EMULATOR_HOST_HTTP_SCHEME = (
5576
"%s contains a http scheme. When used with a scheme it may cause gRPC's "
5677
"DNS resolver to endlessly attempt to resolve. %s is intended to be used "
@@ -73,6 +94,10 @@ def _get_spanner_optimizer_statistics_package():
7394
return os.getenv(OPTIMIZER_STATISITCS_PACKAGE_ENV_VAR, "")
7495

7596

97+
def _get_spanner_enable_builtin_metrics():
98+
return os.getenv(ENABLE_SPANNER_METRICS_ENV_VAR) == "true"
99+
100+
76101
class Client(ClientWithProject):
77102
"""Client for interacting with Cloud Spanner API.
78103
@@ -195,6 +220,25 @@ def __init__(
195220
"http://" in self._emulator_host or "https://" in self._emulator_host
196221
):
197222
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)
223+
# Check flag to enable Spanner builtin metrics
224+
if (
225+
_get_spanner_enable_builtin_metrics()
226+
and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED
227+
):
228+
meter_provider = metrics.NoOpMeterProvider()
229+
if not _get_spanner_emulator_host():
230+
meter_provider = MeterProvider(
231+
metric_readers=[
232+
PeriodicExportingMetricReader(
233+
CloudMonitoringMetricsExporter(),
234+
export_interval_millis=METRIC_EXPORT_INTERVAL_MS,
235+
)
236+
]
237+
)
238+
metrics.set_meter_provider(meter_provider)
239+
SpannerMetricsTracerFactory()
240+
else:
241+
SpannerMetricsTracerFactory(enabled=False)
198242

199243
self._route_to_leader_enabled = route_to_leader_enabled
200244
self._directed_read_options = directed_read_options

google/cloud/spanner_v1/database.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
get_current_span,
7373
trace_call,
7474
)
75+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
7576

7677

7778
SPANNER_DATA_SCOPE = "https://www.googleapis.com/auth/spanner.data"
@@ -702,7 +703,7 @@ def execute_pdml():
702703
with trace_call(
703704
"CloudSpanner.Database.execute_partitioned_pdml",
704705
observability_options=self.observability_options,
705-
) as span:
706+
) as span, MetricsCapture():
706707
with SessionCheckout(self._pool) as session:
707708
add_span_event(span, "Starting BeginTransaction")
708709
txn = api.begin_transaction(
@@ -897,7 +898,7 @@ def run_in_transaction(self, func, *args, **kw):
897898
with trace_call(
898899
"CloudSpanner.Database.run_in_transaction",
899900
observability_options=observability_options,
900-
):
901+
), MetricsCapture():
901902
# Sanity check: Is there a transaction already running?
902903
# If there is, then raise a red flag. Otherwise, mark that this one
903904
# is running.
@@ -1489,7 +1490,7 @@ def generate_read_batches(
14891490
f"CloudSpanner.{type(self).__name__}.generate_read_batches",
14901491
extra_attributes=dict(table=table, columns=columns),
14911492
observability_options=self.observability_options,
1492-
):
1493+
), MetricsCapture():
14931494
partitions = self._get_snapshot().partition_read(
14941495
table=table,
14951496
columns=columns,
@@ -1540,7 +1541,7 @@ def process_read_batch(
15401541
with trace_call(
15411542
f"CloudSpanner.{type(self).__name__}.process_read_batch",
15421543
observability_options=observability_options,
1543-
):
1544+
), MetricsCapture():
15441545
kwargs = copy.deepcopy(batch["read"])
15451546
keyset_dict = kwargs.pop("keyset")
15461547
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
@@ -1625,7 +1626,7 @@ def generate_query_batches(
16251626
f"CloudSpanner.{type(self).__name__}.generate_query_batches",
16261627
extra_attributes=dict(sql=sql),
16271628
observability_options=self.observability_options,
1628-
):
1629+
), MetricsCapture():
16291630
partitions = self._get_snapshot().partition_query(
16301631
sql=sql,
16311632
params=params,
@@ -1681,7 +1682,7 @@ def process_query_batch(
16811682
with trace_call(
16821683
f"CloudSpanner.{type(self).__name__}.process_query_batch",
16831684
observability_options=self.observability_options,
1684-
):
1685+
), MetricsCapture():
16851686
return self._get_snapshot().execute_sql(
16861687
partition=batch["partition"],
16871688
**batch["query"],
@@ -1746,7 +1747,7 @@ def run_partitioned_query(
17461747
f"CloudSpanner.${type(self).__name__}.run_partitioned_query",
17471748
extra_attributes=dict(sql=sql),
17481749
observability_options=self.observability_options,
1749-
):
1750+
), MetricsCapture():
17501751
partitions = list(
17511752
self.generate_query_batches(
17521753
sql,

google/cloud/spanner_v1/merged_result_set.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from threading import Lock, Event
1919

2020
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
21+
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
2122

2223
if TYPE_CHECKING:
2324
from google.cloud.spanner_v1.database import BatchSnapshot
@@ -45,7 +46,7 @@ def run(self):
4546
with trace_call(
4647
"CloudSpanner.PartitionExecutor.run",
4748
observability_options=observability_options,
48-
):
49+
), MetricsCapture():
4950
self.__run()
5051

5152
def __run(self):

google/cloud/spanner_v1/metrics/constants.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2025 Google LLC
1+
# Copyright 2025 Google LLC
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -15,6 +15,12 @@
1515
BUILT_IN_METRICS_METER_NAME = "gax-python"
1616
NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client"
1717
SPANNER_RESOURCE_TYPE = "spanner_instance_client"
18+
SPANNER_SERVICE_NAME = "spanner-python"
19+
GOOGLE_CLOUD_RESOURCE_KEY = "google-cloud-resource-prefix"
20+
GOOGLE_CLOUD_REGION_KEY = "cloud.region"
21+
GOOGLE_CLOUD_REGION_GLOBAL = "global"
22+
SPANNER_METHOD_PREFIX = "/google.spanner.v1."
23+
ENABLE_SPANNER_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
1824

1925
# Monitored resource labels
2026
MONITORED_RES_LABEL_KEY_PROJECT = "project_id"
@@ -61,3 +67,5 @@
6167
METRIC_NAME_OPERATION_COUNT,
6268
METRIC_NAME_ATTEMPT_COUNT,
6369
]
70+
71+
METRIC_EXPORT_INTERVAL_MS = 60000 # 1 Minute
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright 2025 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""
15+
This module provides functionality for capturing metrics in Cloud Spanner operations.
16+
17+
It includes a context manager class, MetricsCapture, which automatically handles the
18+
start and completion of metrics tracing for a given operation. This ensures that metrics
19+
are consistently recorded for Cloud Spanner operations, facilitating observability and
20+
performance monitoring.
21+
"""
22+
23+
from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory
24+
25+
26+
class MetricsCapture:
27+
"""Context manager for capturing metrics in Cloud Spanner operations.
28+
29+
This class provides a context manager interface to automatically handle
30+
the start and completion of metrics tracing for a given operation.
31+
"""
32+
33+
def __enter__(self):
34+
"""Enter the runtime context related to this object.
35+
36+
This method initializes a new metrics tracer for the operation and
37+
records the start of the operation.
38+
39+
Returns:
40+
MetricsCapture: The instance of the context manager.
41+
"""
42+
# Short circuit out if metrics are disabled
43+
factory = SpannerMetricsTracerFactory()
44+
if not factory.enabled:
45+
return self
46+
47+
# Define a new metrics tracer for the new operation
48+
SpannerMetricsTracerFactory.current_metrics_tracer = (
49+
factory.create_metrics_tracer()
50+
)
51+
if SpannerMetricsTracerFactory.current_metrics_tracer:
52+
SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_start()
53+
return self
54+
55+
def __exit__(self, exc_type, exc_value, traceback):
56+
"""Exit the runtime context related to this object.
57+
58+
This method records the completion of the operation. If an exception
59+
occurred, it will be propagated after the metrics are recorded.
60+
61+
Args:
62+
exc_type (Type[BaseException]): The exception type.
63+
exc_value (BaseException): The exception value.
64+
traceback (TracebackType): The traceback object.
65+
66+
Returns:
67+
bool: False to propagate the exception if any occurred.
68+
"""
69+
# Short circuit out if metrics are disable
70+
if not SpannerMetricsTracerFactory().enabled:
71+
return False
72+
73+
if SpannerMetricsTracerFactory.current_metrics_tracer:
74+
SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_completion()
75+
return False # Propagate the exception if any

0 commit comments

Comments
 (0)