diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 5ce23cab74..81af6b5f57 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -33,6 +33,8 @@ except ImportError: HAS_OPENTELEMETRY_INSTALLED = False +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture + TRACER_NAME = "cloud.google.com/python/spanner" TRACER_VERSION = gapic_version.__version__ extended_tracing_globally_disabled = ( @@ -111,26 +113,27 @@ def trace_call(name, session=None, extra_attributes=None, observability_options= with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT, attributes=attributes ) as span: - try: - yield span - except Exception as error: - span.set_status(Status(StatusCode.ERROR, str(error))) - # OpenTelemetry-Python imposes invoking span.record_exception on __exit__ - # on any exception. We should file a bug later on with them to only - # invoke .record_exception if not already invoked, hence we should not - # invoke .record_exception on our own else we shall have 2 exceptions. - raise - else: - # All spans still have set_status available even if for example - # NonRecordingSpan doesn't have "_status". - absent_span_status = getattr(span, "_status", None) is None - if absent_span_status or span._status.status_code == StatusCode.UNSET: - # OpenTelemetry-Python only allows a status change - # if the current code is UNSET or ERROR. At the end - # of the generator's consumption, only set it to OK - # it wasn't previously set otherwise. - # https://github.com/googleapis/python-spanner/issues/1246 - span.set_status(Status(StatusCode.OK)) + with MetricsCapture(): + try: + yield span + except Exception as error: + span.set_status(Status(StatusCode.ERROR, str(error))) + # OpenTelemetry-Python imposes invoking span.record_exception on __exit__ + # on any exception. We should file a bug later on with them to only + # invoke .record_exception if not already invoked, hence we should not + # invoke .record_exception on our own else we shall have 2 exceptions. + raise + else: + # All spans still have set_status available even if for example + # NonRecordingSpan doesn't have "_status". + absent_span_status = getattr(span, "_status", None) is None + if absent_span_status or span._status.status_code == StatusCode.UNSET: + # OpenTelemetry-Python only allows a status change + # if the current code is UNSET or ERROR. At the end + # of the generator's consumption, only set it to OK + # it wasn't previously set otherwise. + # https://github.com/googleapis/python-spanner/issues/1246 + span.set_status(Status(StatusCode.OK)) def get_current_span(): diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 6a9f1f48f5..71550f4a0a 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -32,6 +32,7 @@ from google.cloud.spanner_v1._helpers import _retry_on_aborted_exception from google.cloud.spanner_v1._helpers import _check_rst_stream_error from google.api_core.exceptions import InternalServerError +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture import time DEFAULT_RETRY_TIMEOUT_SECS = 30 @@ -226,7 +227,7 @@ def commit( self._session, trace_attributes, observability_options=observability_options, - ): + ), MetricsCapture(): method = functools.partial( api.commit, request=request, @@ -348,7 +349,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals self._session, trace_attributes, observability_options=observability_options, - ): + ), MetricsCapture(): method = functools.partial( api.batch_write, request=request, diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index afe6264717..a8db70d3af 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -48,9 +48,30 @@ from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import _metadata_with_prefix from google.cloud.spanner_v1.instance import Instance +from google.cloud.spanner_v1.metrics.constants import ( + ENABLE_SPANNER_METRICS_ENV_VAR, + METRIC_EXPORT_INTERVAL_MS, +) +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) +from google.cloud.spanner_v1.metrics.metrics_exporter import ( + CloudMonitoringMetricsExporter, +) + +try: + from opentelemetry import metrics + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True +except ImportError: # pragma: NO COVER + HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False + _CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__) EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST" +ENABLE_BUILTIN_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS" _EMULATOR_HOST_HTTP_SCHEME = ( "%s contains a http scheme. When used with a scheme it may cause gRPC's " "DNS resolver to endlessly attempt to resolve. %s is intended to be used " @@ -73,6 +94,10 @@ def _get_spanner_optimizer_statistics_package(): return os.getenv(OPTIMIZER_STATISITCS_PACKAGE_ENV_VAR, "") +def _get_spanner_enable_builtin_metrics(): + return os.getenv(ENABLE_SPANNER_METRICS_ENV_VAR) == "true" + + class Client(ClientWithProject): """Client for interacting with Cloud Spanner API. @@ -195,6 +220,25 @@ def __init__( "http://" in self._emulator_host or "https://" in self._emulator_host ): warnings.warn(_EMULATOR_HOST_HTTP_SCHEME) + # Check flag to enable Spanner builtin metrics + if ( + _get_spanner_enable_builtin_metrics() + and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED + ): + meter_provider = metrics.NoOpMeterProvider() + if not _get_spanner_emulator_host(): + meter_provider = MeterProvider( + metric_readers=[ + PeriodicExportingMetricReader( + CloudMonitoringMetricsExporter(), + export_interval_millis=METRIC_EXPORT_INTERVAL_MS, + ) + ] + ) + metrics.set_meter_provider(meter_provider) + SpannerMetricsTracerFactory() + else: + SpannerMetricsTracerFactory(enabled=False) self._route_to_leader_enabled = route_to_leader_enabled self._directed_read_options = directed_read_options diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 963debdab8..cc21591a13 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -72,6 +72,7 @@ get_current_span, trace_call, ) +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture SPANNER_DATA_SCOPE = "https://www.googleapis.com/auth/spanner.data" @@ -702,7 +703,7 @@ def execute_pdml(): with trace_call( "CloudSpanner.Database.execute_partitioned_pdml", observability_options=self.observability_options, - ) as span: + ) as span, MetricsCapture(): with SessionCheckout(self._pool) as session: add_span_event(span, "Starting BeginTransaction") txn = api.begin_transaction( @@ -897,7 +898,7 @@ def run_in_transaction(self, func, *args, **kw): with trace_call( "CloudSpanner.Database.run_in_transaction", observability_options=observability_options, - ): + ), MetricsCapture(): # Sanity check: Is there a transaction already running? # If there is, then raise a red flag. Otherwise, mark that this one # is running. @@ -1489,7 +1490,7 @@ def generate_read_batches( f"CloudSpanner.{type(self).__name__}.generate_read_batches", extra_attributes=dict(table=table, columns=columns), observability_options=self.observability_options, - ): + ), MetricsCapture(): partitions = self._get_snapshot().partition_read( table=table, columns=columns, @@ -1540,7 +1541,7 @@ def process_read_batch( with trace_call( f"CloudSpanner.{type(self).__name__}.process_read_batch", observability_options=observability_options, - ): + ), MetricsCapture(): kwargs = copy.deepcopy(batch["read"]) keyset_dict = kwargs.pop("keyset") kwargs["keyset"] = KeySet._from_dict(keyset_dict) @@ -1625,7 +1626,7 @@ def generate_query_batches( f"CloudSpanner.{type(self).__name__}.generate_query_batches", extra_attributes=dict(sql=sql), observability_options=self.observability_options, - ): + ), MetricsCapture(): partitions = self._get_snapshot().partition_query( sql=sql, params=params, @@ -1681,7 +1682,7 @@ def process_query_batch( with trace_call( f"CloudSpanner.{type(self).__name__}.process_query_batch", observability_options=self.observability_options, - ): + ), MetricsCapture(): return self._get_snapshot().execute_sql( partition=batch["partition"], **batch["query"], @@ -1746,7 +1747,7 @@ def run_partitioned_query( f"CloudSpanner.${type(self).__name__}.run_partitioned_query", extra_attributes=dict(sql=sql), observability_options=self.observability_options, - ): + ), MetricsCapture(): partitions = list( self.generate_query_batches( sql, diff --git a/google/cloud/spanner_v1/merged_result_set.py b/google/cloud/spanner_v1/merged_result_set.py index bfecad1e46..7af989d696 100644 --- a/google/cloud/spanner_v1/merged_result_set.py +++ b/google/cloud/spanner_v1/merged_result_set.py @@ -18,6 +18,7 @@ from threading import Lock, Event from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture if TYPE_CHECKING: from google.cloud.spanner_v1.database import BatchSnapshot @@ -45,7 +46,7 @@ def run(self): with trace_call( "CloudSpanner.PartitionExecutor.run", observability_options=observability_options, - ): + ), MetricsCapture(): self.__run() def __run(self): diff --git a/google/cloud/spanner_v1/metrics/constants.py b/google/cloud/spanner_v1/metrics/constants.py index 5eca1fa83d..a47aecc9ed 100644 --- a/google/cloud/spanner_v1/metrics/constants.py +++ b/google/cloud/spanner_v1/metrics/constants.py @@ -1,4 +1,4 @@ -# Copyright 2025 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +15,12 @@ BUILT_IN_METRICS_METER_NAME = "gax-python" NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client" SPANNER_RESOURCE_TYPE = "spanner_instance_client" +SPANNER_SERVICE_NAME = "spanner-python" +GOOGLE_CLOUD_RESOURCE_KEY = "google-cloud-resource-prefix" +GOOGLE_CLOUD_REGION_KEY = "cloud.region" +GOOGLE_CLOUD_REGION_GLOBAL = "global" +SPANNER_METHOD_PREFIX = "/google.spanner.v1." +ENABLE_SPANNER_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS" # Monitored resource labels MONITORED_RES_LABEL_KEY_PROJECT = "project_id" @@ -61,3 +67,5 @@ METRIC_NAME_OPERATION_COUNT, METRIC_NAME_ATTEMPT_COUNT, ] + +METRIC_EXPORT_INTERVAL_MS = 60000 # 1 Minute diff --git a/google/cloud/spanner_v1/metrics/metrics_capture.py b/google/cloud/spanner_v1/metrics/metrics_capture.py new file mode 100644 index 0000000000..6197ae5257 --- /dev/null +++ b/google/cloud/spanner_v1/metrics/metrics_capture.py @@ -0,0 +1,75 @@ +# Copyright 2025 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +This module provides functionality for capturing metrics in Cloud Spanner operations. + +It includes a context manager class, MetricsCapture, which automatically handles the +start and completion of metrics tracing for a given operation. This ensures that metrics +are consistently recorded for Cloud Spanner operations, facilitating observability and +performance monitoring. +""" + +from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory + + +class MetricsCapture: + """Context manager for capturing metrics in Cloud Spanner operations. + + This class provides a context manager interface to automatically handle + the start and completion of metrics tracing for a given operation. + """ + + def __enter__(self): + """Enter the runtime context related to this object. + + This method initializes a new metrics tracer for the operation and + records the start of the operation. + + Returns: + MetricsCapture: The instance of the context manager. + """ + # Short circuit out if metrics are disabled + factory = SpannerMetricsTracerFactory() + if not factory.enabled: + return self + + # Define a new metrics tracer for the new operation + SpannerMetricsTracerFactory.current_metrics_tracer = ( + factory.create_metrics_tracer() + ) + if SpannerMetricsTracerFactory.current_metrics_tracer: + SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_start() + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the runtime context related to this object. + + This method records the completion of the operation. If an exception + occurred, it will be propagated after the metrics are recorded. + + Args: + exc_type (Type[BaseException]): The exception type. + exc_value (BaseException): The exception value. + traceback (TracebackType): The traceback object. + + Returns: + bool: False to propagate the exception if any occurred. + """ + # Short circuit out if metrics are disable + if not SpannerMetricsTracerFactory().enabled: + return False + + if SpannerMetricsTracerFactory.current_metrics_tracer: + SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_completion() + return False # Propagate the exception if any diff --git a/google/cloud/spanner_v1/metrics/metrics_exporter.py b/google/cloud/spanner_v1/metrics/metrics_exporter.py index fb32985365..e10cf6a2f1 100644 --- a/google/cloud/spanner_v1/metrics/metrics_exporter.py +++ b/google/cloud/spanner_v1/metrics/metrics_exporter.py @@ -23,7 +23,7 @@ ) import logging -from typing import Optional, List, Union, NoReturn, Tuple +from typing import Optional, List, Union, NoReturn, Tuple, Dict import google.auth from google.api.distribution_pb2 import ( # pylint: disable=no-name-in-module @@ -39,10 +39,6 @@ MonitoredResource, ) -from google.cloud.monitoring_v3.services.metric_service.transports.grpc import ( - MetricServiceGrpcTransport, -) - # pylint: disable=no-name-in-module from google.protobuf.timestamp_pb2 import Timestamp from google.cloud.spanner_v1.gapic_version import __version__ @@ -60,12 +56,9 @@ Sum, ) from opentelemetry.sdk.resources import Resource - - HAS_OPENTELEMETRY_INSTALLED = True -except ImportError: # pragma: NO COVER - HAS_OPENTELEMETRY_INSTALLED = False - -try: + from google.cloud.monitoring_v3.services.metric_service.transports.grpc import ( + MetricServiceGrpcTransport, + ) from google.cloud.monitoring_v3 import ( CreateTimeSeriesRequest, MetricServiceClient, @@ -75,13 +68,10 @@ TypedValue, ) - HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True -except ImportError: - HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False - -HAS_DEPENDENCIES_INSTALLED = ( - HAS_OPENTELEMETRY_INSTALLED and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED -) + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: # pragma: NO COVER + HAS_OPENTELEMETRY_INSTALLED = False + MetricExporter = object logger = logging.getLogger(__name__) MAX_BATCH_WRITE = 200 @@ -120,7 +110,7 @@ class CloudMonitoringMetricsExporter(MetricExporter): def __init__( self, project_id: Optional[str] = None, - client: Optional[MetricServiceClient] = None, + client: Optional["MetricServiceClient"] = None, ): """Initialize a custom exporter to send metrics for the Spanner Service Metrics.""" # Default preferred_temporality is all CUMULATIVE so need to customize @@ -144,7 +134,7 @@ def __init__( self.project_id = project_id self.project_name = self.client.common_project_path(self.project_id) - def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None: + def _batch_write(self, series: List["TimeSeries"], timeout_millis: float) -> None: """Cloud Monitoring allows writing up to 200 time series at once. :param series: ProtoBuf TimeSeries @@ -166,8 +156,8 @@ def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None: @staticmethod def _resource_to_monitored_resource_pb( - resource: Resource, labels: any - ) -> MonitoredResource: + resource: "Resource", labels: Dict[str, str] + ) -> "MonitoredResource": """ Convert the resource to a Google Cloud Monitoring monitored resource. @@ -182,7 +172,7 @@ def _resource_to_monitored_resource_pb( return monitored_resource @staticmethod - def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind: + def _to_metric_kind(metric: "Metric") -> MetricDescriptor.MetricKind: """ Convert the metric to a Google Cloud Monitoring metric kind. @@ -210,7 +200,7 @@ def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind: @staticmethod def _extract_metric_labels( - data_point: Union[NumberDataPoint, HistogramDataPoint] + data_point: Union["NumberDataPoint", "HistogramDataPoint"] ) -> Tuple[dict, dict]: """ Extract the metric labels from the data point. @@ -233,8 +223,8 @@ def _extract_metric_labels( @staticmethod def _to_point( kind: "MetricDescriptor.MetricKind.V", - data_point: Union[NumberDataPoint, HistogramDataPoint], - ) -> Point: + data_point: Union["NumberDataPoint", "HistogramDataPoint"], + ) -> "Point": # Create a Google Cloud Monitoring data point value based on the OpenTelemetry metric data point type ## For histograms, we need to calculate the mean and bucket counts if isinstance(data_point, HistogramDataPoint): @@ -281,7 +271,7 @@ def _data_point_to_timeseries_pb( metric, monitored_resource, labels, - ) -> TimeSeries: + ) -> "TimeSeries": """ Convert the data point to a Google Cloud Monitoring time series. @@ -308,8 +298,8 @@ def _data_point_to_timeseries_pb( @staticmethod def _resource_metrics_to_timeseries_pb( - metrics_data: MetricsData, - ) -> List[TimeSeries]: + metrics_data: "MetricsData", + ) -> List["TimeSeries"]: """ Convert the metrics data to a list of Google Cloud Monitoring time series. @@ -346,10 +336,10 @@ def _resource_metrics_to_timeseries_pb( def export( self, - metrics_data: MetricsData, + metrics_data: "MetricsData", timeout_millis: float = 10_000, **kwargs, - ) -> MetricExportResult: + ) -> "MetricExportResult": """ Export the metrics data to Google Cloud Monitoring. @@ -357,10 +347,9 @@ def export( :param timeout_millis: timeout in milliseconds :return: MetricExportResult """ - if not HAS_DEPENDENCIES_INSTALLED: + if not HAS_OPENTELEMETRY_INSTALLED: logger.warning("Metric exporter called without dependencies installed.") return False - time_series_list = self._resource_metrics_to_timeseries_pb(metrics_data) self._batch_write(time_series_list, timeout_millis) return True @@ -370,8 +359,8 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: return True def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - """Not implemented.""" - pass + """Safely shuts down the exporter and closes all opened GRPC channels.""" + self.client.transport.close() def _timestamp_from_nanos(nanos: int) -> Timestamp: diff --git a/google/cloud/spanner_v1/metrics/metrics_interceptor.py b/google/cloud/spanner_v1/metrics/metrics_interceptor.py new file mode 100644 index 0000000000..4b55056dab --- /dev/null +++ b/google/cloud/spanner_v1/metrics/metrics_interceptor.py @@ -0,0 +1,156 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Interceptor for collecting Cloud Spanner metrics.""" + +from grpc_interceptor import ClientInterceptor +from .constants import ( + GOOGLE_CLOUD_RESOURCE_KEY, + SPANNER_METHOD_PREFIX, +) + +from typing import Dict +from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory +import re + + +class MetricsInterceptor(ClientInterceptor): + """Interceptor that collects metrics for Cloud Spanner operations.""" + + @staticmethod + def _parse_resource_path(path: str) -> dict: + """Parse the resource path to extract project, instance and database. + + Args: + path (str): The resource path from the request + + Returns: + dict: Extracted resource components + """ + # Match paths like: + # projects/{project}/instances/{instance}/databases/{database}/sessions/{session} + # projects/{project}/instances/{instance}/databases/{database} + # projects/{project}/instances/{instance} + pattern = r"^projects/(?P[^/]+)(/instances/(?P[^/]+))?(/databases/(?P[^/]+))?(/sessions/(?P[^/]+))?.*$" + match = re.match(pattern, path) + if match: + return {k: v for k, v in match.groupdict().items() if v is not None} + return {} + + @staticmethod + def _extract_resource_from_path(metadata: Dict[str, str]) -> Dict[str, str]: + """ + Extracts resource information from the metadata based on the path. + + This method iterates through the metadata dictionary to find the first tuple containing the key 'google-cloud-resource-prefix'. It then extracts the path from this tuple and parses it to extract project, instance, and database information using the _parse_resource_path method. + + Args: + metadata (Dict[str, str]): A dictionary containing metadata information. + + Returns: + Dict[str, str]: A dictionary containing extracted project, instance, and database information. + """ + # Extract resource info from the first metadata tuple containing :path + path = next( + (value for key, value in metadata if key == GOOGLE_CLOUD_RESOURCE_KEY), "" + ) + + resources = MetricsInterceptor._parse_resource_path(path) + return resources + + @staticmethod + def _remove_prefix(s: str, prefix: str) -> str: + """ + This function removes the prefix from the given string. + + Args: + s (str): The string from which the prefix is to be removed. + prefix (str): The prefix to be removed from the string. + + Returns: + str: The string with the prefix removed. + + Note: + This function is used because the `removeprefix` method does not exist in Python 3.8. + """ + if s.startswith(prefix): + return s[len(prefix) :] + return s + + def _set_metrics_tracer_attributes(self, resources: Dict[str, str]) -> None: + """ + Sets the metric tracer attributes based on the provided resources. + + This method updates the current metric tracer's attributes with the project, instance, and database information extracted from the resources dictionary. If the current metric tracer is not set, the method does nothing. + + Args: + resources (Dict[str, str]): A dictionary containing project, instance, and database information. + """ + if SpannerMetricsTracerFactory.current_metrics_tracer is None: + return + + if resources: + if "project" in resources: + SpannerMetricsTracerFactory.current_metrics_tracer.set_project( + resources["project"] + ) + if "instance" in resources: + SpannerMetricsTracerFactory.current_metrics_tracer.set_instance( + resources["instance"] + ) + if "database" in resources: + SpannerMetricsTracerFactory.current_metrics_tracer.set_database( + resources["database"] + ) + + def intercept(self, invoked_method, request_or_iterator, call_details): + """Intercept gRPC calls to collect metrics. + + Args: + invoked_method: The RPC method + request_or_iterator: The RPC request + call_details: Details about the RPC call + + Returns: + The RPC response + """ + factory = SpannerMetricsTracerFactory() + if ( + SpannerMetricsTracerFactory.current_metrics_tracer is None + or not factory.enabled + ): + return invoked_method(request_or_iterator, call_details) + + # Setup Metric Tracer attributes from call details + ## Extract Project / Instance / Databse from header information + resources = self._extract_resource_from_path(call_details.metadata) + self._set_metrics_tracer_attributes(resources) + + ## Format method to be be spanner. + method_name = self._remove_prefix( + call_details.method, SPANNER_METHOD_PREFIX + ).replace("/", ".") + + SpannerMetricsTracerFactory.current_metrics_tracer.set_method(method_name) + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start() + response = invoked_method(request_or_iterator, call_details) + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion() + + # Process and send GFE metrics if enabled + if SpannerMetricsTracerFactory.current_metrics_tracer.gfe_enabled: + metadata = response.initial_metadata() + SpannerMetricsTracerFactory.current_metrics_trace.record_gfe_metrics( + metadata + ) + return response diff --git a/google/cloud/spanner_v1/metrics/metrics_tracer.py b/google/cloud/spanner_v1/metrics/metrics_tracer.py index 60525d6e4e..87035d9c22 100644 --- a/google/cloud/spanner_v1/metrics/metrics_tracer.py +++ b/google/cloud/spanner_v1/metrics/metrics_tracer.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -56,7 +55,7 @@ class MetricAttemptTracer: direct_path_used: bool status: str - def __init__(self): + def __init__(self) -> None: """ Initialize a MetricAttemptTracer instance with default values. @@ -177,37 +176,42 @@ class should not have any knowledge about the observability framework used for m """ _client_attributes: Dict[str, str] - _instrument_attempt_counter: Counter - _instrument_attempt_latency: Histogram - _instrument_operation_counter: Counter - _instrument_operation_latency: Histogram + _instrument_attempt_counter: "Counter" + _instrument_attempt_latency: "Histogram" + _instrument_operation_counter: "Counter" + _instrument_operation_latency: "Histogram" + _instrument_gfe_latency: "Histogram" + _instrument_gfe_missing_header_count: "Counter" current_op: MetricOpTracer enabled: bool + gfe_enabled: bool method: str def __init__( self, enabled: bool, - instrument_attempt_latency: Histogram, - instrument_attempt_counter: Counter, - instrument_operation_latency: Histogram, - instrument_operation_counter: Counter, + instrument_attempt_latency: "Histogram", + instrument_attempt_counter: "Counter", + instrument_operation_latency: "Histogram", + instrument_operation_counter: "Counter", client_attributes: Dict[str, str], + gfe_enabled: bool = False, ): """ Initialize a MetricsTracer instance with the given parameters. - This constructor initializes a MetricsTracer instance with the provided method name, enabled status, direct path enabled status, - instrumented metrics for attempt latency, attempt counter, operation latency, operation counter, and client attributes. - It sets up the necessary metrics tracing infrastructure for recording metrics related to RPC operations. + This constructor sets up a MetricsTracer instance with the specified parameters, including the enabled status, + instruments for measuring and counting attempt and operation metrics, and client attributes. It prepares the + infrastructure needed for recording metrics related to RPC operations. Args: - enabled (bool): A flag indicating if metrics tracing is enabled. - instrument_attempt_latency (Histogram): The instrument for measuring attempt latency. - instrument_attempt_counter (Counter): The instrument for counting attempts. - instrument_operation_latency (Histogram): The instrument for measuring operation latency. - instrument_operation_counter (Counter): The instrument for counting operations. - client_attributes (dict[str, str]): A dictionary of client attributes used for metrics tracing. + enabled (bool): Indicates if metrics tracing is enabled. + instrument_attempt_latency (Histogram): Instrument for measuring attempt latency. + instrument_attempt_counter (Counter): Instrument for counting attempts. + instrument_operation_latency (Histogram): Instrument for measuring operation latency. + instrument_operation_counter (Counter): Instrument for counting operations. + client_attributes (Dict[str, str]): Dictionary of client attributes used for metrics tracing. + gfe_enabled (bool, optional): Indicates if GFE metrics are enabled. Defaults to False. """ self.current_op = MetricOpTracer() self._client_attributes = client_attributes @@ -216,6 +220,7 @@ def __init__( self._instrument_operation_latency = instrument_operation_latency self._instrument_operation_counter = instrument_operation_counter self.enabled = enabled + self.gfe_enabled = gfe_enabled @staticmethod def _get_ms_time_diff(start: datetime, end: datetime) -> float: @@ -251,7 +256,7 @@ def client_attributes(self) -> Dict[str, str]: return self._client_attributes @property - def instrument_attempt_counter(self) -> Counter: + def instrument_attempt_counter(self) -> "Counter": """ Return the instrument for counting attempts. @@ -264,7 +269,7 @@ def instrument_attempt_counter(self) -> Counter: return self._instrument_attempt_counter @property - def instrument_attempt_latency(self) -> Histogram: + def instrument_attempt_latency(self) -> "Histogram": """ Return the instrument for measuring attempt latency. @@ -277,7 +282,7 @@ def instrument_attempt_latency(self) -> Histogram: return self._instrument_attempt_latency @property - def instrument_operation_counter(self) -> Counter: + def instrument_operation_counter(self) -> "Counter": """ Return the instrument for counting operations. @@ -290,7 +295,7 @@ def instrument_operation_counter(self) -> Counter: return self._instrument_operation_counter @property - def instrument_operation_latency(self) -> Histogram: + def instrument_operation_latency(self) -> "Histogram": """ Return the instrument for measuring operation latency. @@ -322,7 +327,7 @@ def record_attempt_completion(self, status: str = StatusCode.OK.name) -> None: If metrics tracing is not enabled, this method does not perform any operations. """ - if not self.enabled: + if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED: return self.current_op.current_attempt.status = status @@ -347,7 +352,7 @@ def record_operation_start(self) -> None: It is used to track the start time of an operation, which is essential for calculating operation latency and other metrics. If metrics tracing is not enabled, this method does not perform any operations. """ - if not self.enabled: + if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED: return self.current_op.start() @@ -360,7 +365,7 @@ def record_operation_completion(self) -> None: Additionally, it increments the operation count and records the attempt count for the operation. If metrics tracing is not enabled, this method does not perform any operations. """ - if not self.enabled: + if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED: return end_time = datetime.now() # Build Attributes @@ -385,6 +390,29 @@ def record_operation_completion(self) -> None: self.current_op.attempt_count, attributes=attempt_attributes ) + def record_gfe_latency(self, latency: int) -> None: + """ + Records the GFE latency using the Histogram instrument. + + Args: + latency (int): The latency duration to be recorded. + """ + if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED or not self.gfe_enabled: + return + self._instrument_gfe_latency.record( + amount=latency, attributes=self.client_attributes + ) + + def record_gfe_missing_header_count(self) -> None: + """ + Increments the counter for missing GFE headers. + """ + if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED or not self.gfe_enabled: + return + self._instrument_gfe_missing_header_count.add( + amount=1, attributes=self.client_attributes + ) + def _create_operation_otel_attributes(self) -> dict: """ Create additional attributes for operation metrics tracing. @@ -392,11 +420,11 @@ def _create_operation_otel_attributes(self) -> dict: This method populates the client attributes dictionary with the operation status if metrics tracing is enabled. It returns the updated client attributes dictionary. """ - if not self.enabled: + if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED: return {} - - self._client_attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.status - return self._client_attributes + attributes = self._client_attributes.copy() + attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.status + return attributes def _create_attempt_otel_attributes(self) -> dict: """ @@ -405,14 +433,16 @@ def _create_attempt_otel_attributes(self) -> dict: This method populates the attributes dictionary with the attempt status if metrics tracing is enabled and an attempt exists. It returns the updated attributes dictionary. """ - if not self.enabled: + if not self.enabled or not HAS_OPENTELEMETRY_INSTALLED: return {} - attributes = {} + attributes = self._client_attributes.copy() + # Short circuit out if we don't have an attempt - if self.current_op.current_attempt is not None: - attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.current_attempt.status + if self.current_op.current_attempt is None: + return attributes + attributes[METRIC_LABEL_KEY_STATUS] = self.current_op.current_attempt.status return attributes def set_project(self, project: str) -> "MetricsTracer": diff --git a/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py b/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py index f7a4088019..ed4b270f06 100644 --- a/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py +++ b/google/cloud/spanner_v1/metrics/metrics_tracer_factory.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -32,6 +31,8 @@ METRIC_LABEL_KEY_DATABASE, METRIC_LABEL_KEY_DIRECT_PATH_ENABLED, BUILT_IN_METRICS_METER_NAME, + METRIC_NAME_GFE_LATENCY, + METRIC_NAME_GFE_MISSING_HEADER_COUNT, ) from typing import Dict @@ -50,26 +51,29 @@ class MetricsTracerFactory: """Factory class for creating MetricTracer instances. This class facilitates the creation of MetricTracer objects, which are responsible for collecting and tracing metrics.""" enabled: bool - _instrument_attempt_latency: Histogram - _instrument_attempt_counter: Counter - _instrument_operation_latency: Histogram - _instrument_operation_counter: Counter + gfe_enabled: bool + _instrument_attempt_latency: "Histogram" + _instrument_attempt_counter: "Counter" + _instrument_operation_latency: "Histogram" + _instrument_operation_counter: "Counter" + _instrument_gfe_latency: "Histogram" + _instrument_gfe_missing_header_count: "Counter" _client_attributes: Dict[str, str] @property - def instrument_attempt_latency(self) -> Histogram: + def instrument_attempt_latency(self) -> "Histogram": return self._instrument_attempt_latency @property - def instrument_attempt_counter(self) -> Counter: + def instrument_attempt_counter(self) -> "Counter": return self._instrument_attempt_counter @property - def instrument_operation_latency(self) -> Histogram: + def instrument_operation_latency(self) -> "Histogram": return self._instrument_operation_latency @property - def instrument_operation_counter(self) -> Counter: + def instrument_operation_counter(self) -> "Counter": return self._instrument_operation_counter def __init__(self, enabled: bool, service_name: str): @@ -255,6 +259,9 @@ def create_metrics_tracer(self) -> MetricsTracer: Returns: MetricsTracer: A MetricsTracer instance with default settings and client attributes. """ + if not HAS_OPENTELEMETRY_INSTALLED: + return None + metrics_tracer = MetricsTracer( enabled=self.enabled and HAS_OPENTELEMETRY_INSTALLED, instrument_attempt_latency=self._instrument_attempt_latency, @@ -307,3 +314,15 @@ def _create_metric_instruments(self, service_name: str) -> None: unit="1", description="Number of operations.", ) + + self._instrument_gfe_latency = meter.create_histogram( + name=METRIC_NAME_GFE_LATENCY, + unit="ms", + description="GFE Latency.", + ) + + self._instrument_gfe_missing_header_count = meter.create_counter( + name=METRIC_NAME_GFE_MISSING_HEADER_COUNT, + unit="1", + description="GFE missing header count.", + ) diff --git a/google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py b/google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py new file mode 100644 index 0000000000..fd00c4de9c --- /dev/null +++ b/google/cloud/spanner_v1/metrics/spanner_metrics_tracer_factory.py @@ -0,0 +1,172 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""This module provides a singleton factory for creating SpannerMetricsTracer instances.""" + +from .metrics_tracer_factory import MetricsTracerFactory +import os +from .constants import ( + SPANNER_SERVICE_NAME, + GOOGLE_CLOUD_REGION_KEY, + GOOGLE_CLOUD_REGION_GLOBAL, +) + +try: + from opentelemetry.resourcedetector import gcp_resource_detector + + # Overwrite the requests timeout for the detector. + # This is necessary as the client will wait the full timeout if the + # code is not run in a GCP environment, with the location endpoints available. + gcp_resource_detector._TIMEOUT_SEC = 0.2 + + import mmh3 + + # Override Resource detector logging to not warn when GCP resources are not detected + import logging + + logging.getLogger("opentelemetry.resourcedetector.gcp_resource_detector").setLevel( + logging.ERROR + ) + + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: # pragma: NO COVER + HAS_OPENTELEMETRY_INSTALLED = False + +from .metrics_tracer import MetricsTracer +from google.cloud.spanner_v1 import __version__ +from uuid import uuid4 + + +class SpannerMetricsTracerFactory(MetricsTracerFactory): + """A factory for creating SpannerMetricsTracer instances.""" + + _metrics_tracer_factory: "SpannerMetricsTracerFactory" = None + current_metrics_tracer: MetricsTracer = None + + def __new__( + cls, enabled: bool = True, gfe_enabled: bool = False + ) -> "SpannerMetricsTracerFactory": + """ + Create a new instance of SpannerMetricsTracerFactory if it doesn't already exist. + + This method implements the singleton pattern for the SpannerMetricsTracerFactory class. + It initializes the factory with the necessary client attributes and configuration settings + if it hasn't been created yet. + + Args: + enabled (bool): A flag indicating whether metrics tracing is enabled. Defaults to True. + gfe_enabled (bool): A flag indicating whether GFE metrics are enabled. Defaults to False. + + Returns: + SpannerMetricsTracerFactory: The singleton instance of SpannerMetricsTracerFactory. + """ + if cls._metrics_tracer_factory is None: + cls._metrics_tracer_factory = MetricsTracerFactory( + enabled, SPANNER_SERVICE_NAME + ) + if not HAS_OPENTELEMETRY_INSTALLED: + return cls._metrics_tracer_factory + + client_uid = cls._generate_client_uid() + cls._metrics_tracer_factory.set_client_uid(client_uid) + cls._metrics_tracer_factory.set_instance_config(cls._get_instance_config()) + cls._metrics_tracer_factory.set_client_name(cls._get_client_name()) + cls._metrics_tracer_factory.set_client_hash( + cls._generate_client_hash(client_uid) + ) + cls._metrics_tracer_factory.set_location(cls._get_location()) + cls._metrics_tracer_factory.gfe_enabled = gfe_enabled + + if cls._metrics_tracer_factory.enabled != enabled: + cls._metrics_tracer_factory.enabeld = enabled + + return cls._metrics_tracer_factory + + @staticmethod + def _generate_client_uid() -> str: + """Generate a client UID in the form of uuidv4@pid@hostname. + + This method generates a unique client identifier (UID) by combining a UUID version 4, + the process ID (PID), and the hostname. The PID is limited to the first 10 characters. + + Returns: + str: A string representing the client UID in the format uuidv4@pid@hostname. + """ + try: + hostname = os.uname()[1] + pid = str(os.getpid())[0:10] # Limit PID to 10 characters + uuid = uuid4() + return f"{uuid}@{pid}@{hostname}" + except Exception: + return "" + + @staticmethod + def _get_instance_config() -> str: + """Get the instance configuration.""" + # TODO: unknown until there's a good way to get it. + return "unknown" + + @staticmethod + def _get_client_name() -> str: + """Get the client name.""" + return f"{SPANNER_SERVICE_NAME}/{__version__}" + + @staticmethod + def _generate_client_hash(client_uid: str) -> str: + """ + Generate a 6-digit zero-padded lowercase hexadecimal hash using the 10 most significant bits of a 64-bit hash value. + + The primary purpose of this function is to generate a hash value for the `client_hash` + resource label using `client_uid` metric field. The range of values is chosen to be small + enough to keep the cardinality of the Resource targets under control. Note: If at later time + the range needs to be increased, it can be done by increasing the value of `kPrefixLength` to + up to 24 bits without changing the format of the returned value. + + Args: + client_uid (str): The client UID used to generate the hash. + + Returns: + str: A 6-digit zero-padded lowercase hexadecimal hash. + """ + if not client_uid: + return "000000" + hashed_client = mmh3.hash64(client_uid) + + # Join the hashes back together since mmh3 splits into high and low 32bits + full_hash = (hashed_client[0] << 32) | (hashed_client[1] & 0xFFFFFFFF) + unsigned_hash = full_hash & 0xFFFFFFFFFFFFFFFF + + k_prefix_length = 10 + sig_figs = unsigned_hash >> (64 - k_prefix_length) + + # Return as 6 digit zero padded hex string + return f"{sig_figs:06x}" + + @staticmethod + def _get_location() -> str: + """Get the location of the resource. + + Returns: + str: The location of the resource. If OpenTelemetry is not installed, returns a global region. + """ + if not HAS_OPENTELEMETRY_INSTALLED: + return GOOGLE_CLOUD_REGION_GLOBAL + detector = gcp_resource_detector.GoogleCloudResourceDetector() + resources = detector.detect() + + if GOOGLE_CLOUD_REGION_KEY not in resources.attributes: + return GOOGLE_CLOUD_REGION_GLOBAL + else: + return resources[GOOGLE_CLOUD_REGION_KEY] diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 596f76a1f1..26de7a2bf8 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -32,6 +32,8 @@ ) from warnings import warn +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture + _NOW = datetime.datetime.utcnow # unit tests may replace @@ -242,7 +244,7 @@ def bind(self, database): with trace_call( "CloudSpanner.FixedPool.BatchCreateSessions", observability_options=observability_options, - ) as span: + ) as span, MetricsCapture(): returned_session_count = 0 while not self._sessions.full(): request.session_count = requested_session_count - self._sessions.qsize() @@ -552,7 +554,7 @@ def bind(self, database): with trace_call( "CloudSpanner.PingingPool.BatchCreateSessions", observability_options=observability_options, - ) as span: + ) as span, MetricsCapture(): returned_session_count = 0 while returned_session_count < self.size: resp = api.batch_create_sessions( diff --git a/google/cloud/spanner_v1/services/spanner/client.py b/google/cloud/spanner_v1/services/spanner/client.py index 2bf6d6ce90..e0768ce742 100644 --- a/google/cloud/spanner_v1/services/spanner/client.py +++ b/google/cloud/spanner_v1/services/spanner/client.py @@ -74,6 +74,7 @@ from .transports.grpc import SpannerGrpcTransport from .transports.grpc_asyncio import SpannerGrpcAsyncIOTransport from .transports.rest import SpannerRestTransport +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor class SpannerClientMeta(type): @@ -714,6 +715,7 @@ def __init__( client_info=client_info, always_use_jwt_access=True, api_audience=self._client_options.api_audience, + metrics_interceptor=MetricsInterceptor(), ) if "async" not in str(self._transport): diff --git a/google/cloud/spanner_v1/services/spanner/transports/base.py b/google/cloud/spanner_v1/services/spanner/transports/base.py index 14c8e8d02f..8fa85af24d 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/base.py +++ b/google/cloud/spanner_v1/services/spanner/transports/base.py @@ -30,6 +30,7 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( @@ -58,6 +59,7 @@ def __init__( client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, always_use_jwt_access: Optional[bool] = False, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, **kwargs, ) -> None: """Instantiate the transport. diff --git a/google/cloud/spanner_v1/services/spanner/transports/grpc.py b/google/cloud/spanner_v1/services/spanner/transports/grpc.py index 4c54921674..d325442dc9 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/grpc.py +++ b/google/cloud/spanner_v1/services/spanner/transports/grpc.py @@ -34,6 +34,8 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction + +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore from .base import SpannerTransport, DEFAULT_CLIENT_INFO @@ -147,6 +149,7 @@ def __init__( client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, always_use_jwt_access: Optional[bool] = False, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, ) -> None: """Instantiate the transport. @@ -202,6 +205,7 @@ def __init__( self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials self._stubs: Dict[str, Callable] = {} + self._metrics_interceptor = None if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -268,6 +272,13 @@ def __init__( ], ) + # Wrap the gRPC channel with the metric interceptor + if metrics_interceptor is not None: + self._metrics_interceptor = metrics_interceptor + self._grpc_channel = grpc.intercept_channel( + self._grpc_channel, metrics_interceptor + ) + self._interceptor = _LoggingClientInterceptor() self._logged_channel = grpc.intercept_channel( self._grpc_channel, self._interceptor diff --git a/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py b/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py index 6f6c4c91d5..475717ae2a 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py +++ b/google/cloud/spanner_v1/services/spanner/transports/grpc_asyncio.py @@ -37,6 +37,7 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore from .base import SpannerTransport, DEFAULT_CLIENT_INFO from .grpc import SpannerGrpcTransport @@ -195,6 +196,7 @@ def __init__( client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, always_use_jwt_access: Optional[bool] = False, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, ) -> None: """Instantiate the transport. diff --git a/google/cloud/spanner_v1/services/spanner/transports/rest.py b/google/cloud/spanner_v1/services/spanner/transports/rest.py index 7575772497..344416c265 100644 --- a/google/cloud/spanner_v1/services/spanner/transports/rest.py +++ b/google/cloud/spanner_v1/services/spanner/transports/rest.py @@ -36,9 +36,9 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import spanner from google.cloud.spanner_v1.types import transaction +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor from google.protobuf import empty_pb2 # type: ignore - from .rest_base import _BaseSpannerRestTransport from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO @@ -915,6 +915,7 @@ def __init__( url_scheme: str = "https", interceptor: Optional[SpannerRestInterceptor] = None, api_audience: Optional[str] = None, + metrics_interceptor: Optional[MetricsInterceptor] = None, ) -> None: """Instantiate the transport. diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index ccc0c4ebdc..8194359a58 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -40,6 +40,8 @@ from google.cloud.spanner_v1.snapshot import Snapshot from google.cloud.spanner_v1.transaction import Transaction +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture + DEFAULT_RETRY_TIMEOUT_SECS = 30 """Default timeout used by :meth:`Session.run_in_transaction`.""" @@ -165,7 +167,7 @@ def create(self): self, self._labels, observability_options=observability_options, - ): + ), MetricsCapture(): session_pb = api.create_session( request=request, metadata=metadata, @@ -205,7 +207,7 @@ def exists(self): observability_options = getattr(self._database, "observability_options", None) with trace_call( "CloudSpanner.GetSession", self, observability_options=observability_options - ) as span: + ) as span, MetricsCapture(): try: api.get_session(name=self.name, metadata=metadata) if span: @@ -248,7 +250,7 @@ def delete(self): "session.name": self.name, }, observability_options=observability_options, - ): + ), MetricsCapture(): api.delete_session(name=self.name, metadata=metadata) def ping(self): @@ -467,7 +469,7 @@ def run_in_transaction(self, func, *args, **kw): "CloudSpanner.Session.run_in_transaction", self, observability_options=observability_options, - ) as span: + ) as span, MetricsCapture(): while True: if self._transaction is None: txn = self.transaction() diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 314980f177..88ecfdd2b9 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -43,6 +43,8 @@ from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture + _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = ( "RST_STREAM", "Received unexpected EOS on DATA frame from server", @@ -96,7 +98,7 @@ def _restart_on_unavailable( session, attributes, observability_options=observability_options, - ): + ), MetricsCapture(): iterator = method(request=request) for item in iterator: item_buffer.append(item) @@ -119,7 +121,7 @@ def _restart_on_unavailable( session, attributes, observability_options=observability_options, - ): + ), MetricsCapture(): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -139,7 +141,7 @@ def _restart_on_unavailable( session, attributes, observability_options=observability_options, - ): + ), MetricsCapture(): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -704,7 +706,7 @@ def partition_read( self._session, extra_attributes=trace_attributes, observability_options=getattr(database, "observability_options", None), - ): + ), MetricsCapture(): method = functools.partial( api.partition_read, request=request, @@ -807,7 +809,7 @@ def partition_query( self._session, trace_attributes, observability_options=getattr(database, "observability_options", None), - ): + ), MetricsCapture(): method = functools.partial( api.partition_query, request=request, @@ -953,7 +955,7 @@ def begin(self): f"CloudSpanner.{type(self).__name__}.begin", self._session, observability_options=getattr(database, "observability_options", None), - ): + ), MetricsCapture(): method = functools.partial( api.begin_transaction, session=self._session.name, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 789e001275..a6a24c47ad 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -34,6 +34,7 @@ from google.cloud.spanner_v1.batch import _BatchBase from google.cloud.spanner_v1._opentelemetry_tracing import add_span_event, trace_call from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture from google.api_core import gapic_v1 from google.api_core.exceptions import InternalServerError from dataclasses import dataclass @@ -118,7 +119,7 @@ def _execute_request( request.transaction = transaction with trace_call( trace_name, session, attributes, observability_options=observability_options - ): + ), MetricsCapture(): method = functools.partial(method, request=request) response = _retry( method, @@ -160,7 +161,7 @@ def begin(self): f"CloudSpanner.{type(self).__name__}.begin", self._session, observability_options=observability_options, - ) as span: + ) as span, MetricsCapture(): method = functools.partial( api.begin_transaction, session=self._session.name, @@ -202,7 +203,7 @@ def rollback(self): f"CloudSpanner.{type(self).__name__}.rollback", self._session, observability_options=observability_options, - ): + ), MetricsCapture(): method = functools.partial( api.rollback, session=self._session.name, @@ -250,7 +251,7 @@ def commit( self._session, trace_attributes, observability_options, - ) as span: + ) as span, MetricsCapture(): self._check_state() if self._transaction_id is None and len(self._mutations) > 0: self.begin() diff --git a/setup.py b/setup.py index 619607b794..6d01b265cc 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,9 @@ "opentelemetry-api >= 1.22.0", "opentelemetry-sdk >= 1.22.0", "opentelemetry-semantic-conventions >= 0.43b0", + "opentelemetry-resourcedetector-gcp >= 1.8.0a0", "google-cloud-monitoring >= 2.16.0", + "mmh3 >= 4.1.0 ", ], "libcst": "libcst >= 0.2.5", } diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index af33b0c8e8..58482dcd03 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -17,3 +17,4 @@ protobuf==3.20.2 deprecated==1.2.14 grpc-interceptor==0.15.4 google-cloud-monitoring==2.16.0 +mmh3==4.1.0 diff --git a/tests/mockserver_tests/test_tags.py b/tests/mockserver_tests/test_tags.py index c84d69b7bd..f44a9fb9a9 100644 --- a/tests/mockserver_tests/test_tags.py +++ b/tests/mockserver_tests/test_tags.py @@ -181,10 +181,16 @@ def test_request_tag_is_cleared(self): # This query will not have a request tag. cursor.execute("select name from singers") requests = self.spanner_service.requests - self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) - self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) - self.assertEqual("my_tag", requests[1].request_options.request_tag) - self.assertEqual("", requests[2].request_options.request_tag) + + # Filter for SQL requests calls + sql_requests = [ + request for request in requests if isinstance(request, ExecuteSqlRequest) + ] + + self.assertTrue(isinstance(sql_requests[0], ExecuteSqlRequest)) + self.assertTrue(isinstance(sql_requests[1], ExecuteSqlRequest)) + self.assertEqual("my_tag", sql_requests[0].request_options.request_tag) + self.assertEqual("", sql_requests[1].request_options.request_tag) def _execute_and_verify_select_singers( self, connection: Connection, request_tag: str = "", transaction_tag: str = "" diff --git a/tests/unit/gapic/spanner_v1/test_spanner.py b/tests/unit/gapic/spanner_v1/test_spanner.py index 999daf2a8e..a1227d4861 100644 --- a/tests/unit/gapic/spanner_v1/test_spanner.py +++ b/tests/unit/gapic/spanner_v1/test_spanner.py @@ -485,6 +485,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is not provided and GOOGLE_API_USE_MTLS_ENDPOINT is @@ -505,6 +506,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is not provided and GOOGLE_API_USE_MTLS_ENDPOINT is @@ -523,6 +525,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is not provided and GOOGLE_API_USE_MTLS_ENDPOINT has @@ -563,6 +566,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case api_endpoint is provided options = client_options.ClientOptions( @@ -583,6 +587,7 @@ def test_spanner_client_client_options(client_class, transport_class, transport_ client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience="https://language.googleapis.com", + metrics_interceptor=mock.ANY, ) @@ -655,6 +660,7 @@ def test_spanner_client_mtls_env_auto( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case ADC client cert is provided. Whether client cert is used depends on @@ -692,6 +698,7 @@ def test_spanner_client_mtls_env_auto( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # Check the case client_cert_source and ADC client cert are not provided. @@ -717,6 +724,7 @@ def test_spanner_client_mtls_env_auto( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -932,6 +940,7 @@ def test_spanner_client_client_options_scopes( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -969,6 +978,7 @@ def test_spanner_client_client_options_credentials_file( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -988,6 +998,7 @@ def test_spanner_client_client_options_from_dict(): client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) @@ -1024,6 +1035,7 @@ def test_spanner_client_create_channel_credentials_file( client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) # test that the credentials from file are saved and used as the credentials. @@ -12717,4 +12729,5 @@ def test_api_key_credentials(client_class, transport_class): client_info=transports.base.DEFAULT_CLIENT_INFO, always_use_jwt_access=True, api_audience=None, + metrics_interceptor=mock.ANY, ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 174e5116c2..88033dae6f 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -14,6 +14,7 @@ import unittest +import os import mock from google.cloud.spanner_v1 import DirectedReadOptions @@ -158,6 +159,8 @@ def test_constructor_custom_client_info(self): creds = _make_credentials() self._constructor_test_helper(expected_scopes, creds, client_info=client_info) + # Disable metrics to avoid google.auth.default calls from Metric Exporter + @mock.patch.dict(os.environ, {"SPANNER_ENABLE_BUILTIN_METRICS": ""}) def test_constructor_implicit_credentials(self): from google.cloud.spanner_v1 import client as MUT diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py new file mode 100644 index 0000000000..6622bc3503 --- /dev/null +++ b/tests/unit/test_metrics.py @@ -0,0 +1,78 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest.mock import MagicMock +from google.api_core.exceptions import ServiceUnavailable +from google.cloud.spanner_v1.client import Client +from unittest.mock import patch +from grpc._interceptor import _UnaryOutcome +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) + +pytest.importorskip("opentelemetry") +# Skip if semconv attributes are not present, as tracing wont' be enabled either +# pytest.importorskip("opentelemetry.semconv.attributes.otel_attributes") + + +def test_metrics_emission_with_failure_attempt(monkeypatch): + monkeypatch.setenv("SPANNER_ENABLE_BUILTIN_METRICS", "true") + + # Remove the Tracer factory to avoid previously disabled factory polluting from other tests + if SpannerMetricsTracerFactory._metrics_tracer_factory is not None: + SpannerMetricsTracerFactory._metrics_tracer_factory = None + + client = Client() + instance = client.instance("test-instance") + database = instance.database("example-db") + factory = SpannerMetricsTracerFactory() + + assert factory.enabled + + transport = database.spanner_api._transport + metrics_interceptor = transport._metrics_interceptor + original_intercept = metrics_interceptor.intercept + first_attempt = True + + def mocked_raise(*args, **kwargs): + raise ServiceUnavailable("Service Unavailable") + + def mocked_call(*args, **kwargs): + return _UnaryOutcome(MagicMock(), MagicMock()) + + def intercept_wrapper(invoked_method, request_or_iterator, call_details): + nonlocal original_intercept + nonlocal first_attempt + invoked_method = mocked_call + if first_attempt: + first_attempt = False + invoked_method = mocked_raise + response = original_intercept( + invoked_method=invoked_method, + request_or_iterator=request_or_iterator, + call_details=call_details, + ) + return response + + metrics_interceptor.intercept = intercept_wrapper + patch_path = "google.cloud.spanner_v1.metrics.metrics_exporter.CloudMonitoringMetricsExporter.export" + with patch(patch_path): + with database.snapshot(): + pass + + # Verify that the attempt count increased from the failed initial attempt + assert ( + SpannerMetricsTracerFactory.current_metrics_tracer.current_op.attempt_count + ) == 2 diff --git a/tests/unit/test_metrics_capture.py b/tests/unit/test_metrics_capture.py new file mode 100644 index 0000000000..107e9daeb4 --- /dev/null +++ b/tests/unit/test_metrics_capture.py @@ -0,0 +1,50 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock +from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture +from google.cloud.spanner_v1.metrics.metrics_tracer_factory import MetricsTracerFactory +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) + + +@pytest.fixture +def mock_tracer_factory(): + SpannerMetricsTracerFactory(enabled=True) + with mock.patch.object( + MetricsTracerFactory, "create_metrics_tracer" + ) as mock_create: + yield mock_create + + +def test_metrics_capture_enter(mock_tracer_factory): + mock_tracer = mock.Mock() + mock_tracer_factory.return_value = mock_tracer + + with MetricsCapture() as capture: + assert capture is not None + mock_tracer_factory.assert_called_once() + mock_tracer.record_operation_start.assert_called_once() + + +def test_metrics_capture_exit(mock_tracer_factory): + mock_tracer = mock.Mock() + mock_tracer_factory.return_value = mock_tracer + + with MetricsCapture(): + pass + + mock_tracer.record_operation_completion.assert_called_once() diff --git a/tests/unit/test_metric_exporter.py b/tests/unit/test_metrics_exporter.py similarity index 99% rename from tests/unit/test_metric_exporter.py rename to tests/unit/test_metrics_exporter.py index 08ae9ecf21..62fb531345 100644 --- a/tests/unit/test_metric_exporter.py +++ b/tests/unit/test_metrics_exporter.py @@ -1,4 +1,4 @@ -# Copyright 2016 Google LLC All rights reserved. +# Copyright 2025 Google LLC All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -333,7 +333,7 @@ def create_tsr_side_effect(name, time_series): self.assertEqual(len(mockClient.create_service_time_series.mock_calls), 2) @patch( - "google.cloud.spanner_v1.metrics.metrics_exporter.HAS_DEPENDENCIES_INSTALLED", + "google.cloud.spanner_v1.metrics.metrics_exporter.HAS_OPENTELEMETRY_INSTALLED", False, ) def test_export_early_exit_if_extras_not_installed(self): diff --git a/tests/unit/test_metrics_interceptor.py b/tests/unit/test_metrics_interceptor.py new file mode 100644 index 0000000000..e32003537f --- /dev/null +++ b/tests/unit/test_metrics_interceptor.py @@ -0,0 +1,128 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from google.cloud.spanner_v1.metrics.metrics_interceptor import MetricsInterceptor +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) +from unittest.mock import MagicMock + + +@pytest.fixture +def interceptor(): + SpannerMetricsTracerFactory(enabled=True) + return MetricsInterceptor() + + +def test_parse_resource_path_valid(interceptor): + path = "projects/my_project/instances/my_instance/databases/my_database" + expected = { + "project": "my_project", + "instance": "my_instance", + "database": "my_database", + } + assert interceptor._parse_resource_path(path) == expected + + +def test_parse_resource_path_invalid(interceptor): + path = "invalid/path" + expected = {} + assert interceptor._parse_resource_path(path) == expected + + +def test_extract_resource_from_path(interceptor): + metadata = [ + ( + "google-cloud-resource-prefix", + "projects/my_project/instances/my_instance/databases/my_database", + ) + ] + expected = { + "project": "my_project", + "instance": "my_instance", + "database": "my_database", + } + assert interceptor._extract_resource_from_path(metadata) == expected + + +def test_set_metrics_tracer_attributes(interceptor): + SpannerMetricsTracerFactory.current_metrics_tracer = MockMetricTracer() + resources = { + "project": "my_project", + "instance": "my_instance", + "database": "my_database", + } + + interceptor._set_metrics_tracer_attributes(resources) + assert SpannerMetricsTracerFactory.current_metrics_tracer.project == "my_project" + assert SpannerMetricsTracerFactory.current_metrics_tracer.instance == "my_instance" + assert SpannerMetricsTracerFactory.current_metrics_tracer.database == "my_database" + + +def test_intercept_with_tracer(interceptor): + SpannerMetricsTracerFactory.current_metrics_tracer = MockMetricTracer() + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start = ( + MagicMock() + ) + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion = ( + MagicMock() + ) + SpannerMetricsTracerFactory.current_metrics_tracer.gfe_enabled = False + + invoked_response = MagicMock() + invoked_response.initial_metadata.return_value = {} + + mock_invoked_method = MagicMock(return_value=invoked_response) + call_details = MagicMock( + method="spanner.someMethod", + metadata=[ + ( + "google-cloud-resource-prefix", + "projects/my_project/instances/my_instance/databases/my_database", + ) + ], + ) + + response = interceptor.intercept(mock_invoked_method, "request", call_details) + assert response == invoked_response + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_start.assert_called_once() + SpannerMetricsTracerFactory.current_metrics_tracer.record_attempt_completion.assert_called_once() + mock_invoked_method.assert_called_once_with("request", call_details) + + +class MockMetricTracer: + def __init__(self): + self.project = None + self.instance = None + self.database = None + self.method = None + + def set_project(self, project): + self.project = project + + def set_instance(self, instance): + self.instance = instance + + def set_database(self, database): + self.database = database + + def set_method(self, method): + self.method = method + + def record_attempt_start(self): + pass + + def record_attempt_completion(self): + pass diff --git a/tests/unit/test_metrics_tracer.py b/tests/unit/test_metrics_tracer.py index 9b59c59a7c..70491ef5b2 100644 --- a/tests/unit/test_metrics_tracer.py +++ b/tests/unit/test_metrics_tracer.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -222,3 +221,45 @@ def test_set_method(metrics_tracer): # Ensure it does not overwrite metrics_tracer.set_method("new_method") assert metrics_tracer.client_attributes["method"] == "test_method" + + +def test_record_gfe_latency(metrics_tracer): + mock_gfe_latency = mock.create_autospec(Histogram, instance=True) + metrics_tracer._instrument_gfe_latency = mock_gfe_latency + metrics_tracer.gfe_enabled = True # Ensure GFE is enabled + + # Test when tracing is enabled + metrics_tracer.record_gfe_latency(100) + assert mock_gfe_latency.record.call_count == 1 + assert mock_gfe_latency.record.call_args[1]["amount"] == 100 + assert ( + mock_gfe_latency.record.call_args[1]["attributes"] + == metrics_tracer.client_attributes + ) + + # Test when tracing is disabled + metrics_tracer.enabled = False + metrics_tracer.record_gfe_latency(200) + assert mock_gfe_latency.record.call_count == 1 # Should not increment + metrics_tracer.enabled = True # Reset for next test + + +def test_record_gfe_missing_header_count(metrics_tracer): + mock_gfe_missing_header_count = mock.create_autospec(Counter, instance=True) + metrics_tracer._instrument_gfe_missing_header_count = mock_gfe_missing_header_count + metrics_tracer.gfe_enabled = True # Ensure GFE is enabled + + # Test when tracing is enabled + metrics_tracer.record_gfe_missing_header_count() + assert mock_gfe_missing_header_count.add.call_count == 1 + assert mock_gfe_missing_header_count.add.call_args[1]["amount"] == 1 + assert ( + mock_gfe_missing_header_count.add.call_args[1]["attributes"] + == metrics_tracer.client_attributes + ) + + # Test when tracing is disabled + metrics_tracer.enabled = False + metrics_tracer.record_gfe_missing_header_count() + assert mock_gfe_missing_header_count.add.call_count == 1 # Should not increment + metrics_tracer.enabled = True # Reset for next test diff --git a/tests/unit/test_metrics_tracer_factory.py b/tests/unit/test_metrics_tracer_factory.py index 637bc4c06a..64fb4d83d1 100644 --- a/tests/unit/test_metrics_tracer_factory.py +++ b/tests/unit/test_metrics_tracer_factory.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/unit/test_spanner_metrics_tracer_factory.py b/tests/unit/test_spanner_metrics_tracer_factory.py new file mode 100644 index 0000000000..8ee4d53d3d --- /dev/null +++ b/tests/unit/test_spanner_metrics_tracer_factory.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import ( + SpannerMetricsTracerFactory, +) + + +class TestSpannerMetricsTracerFactory: + def test_new_instance_creation(self): + factory1 = SpannerMetricsTracerFactory(enabled=True) + factory2 = SpannerMetricsTracerFactory(enabled=True) + assert factory1 is factory2 # Should return the same instance + + def test_generate_client_uid_format(self): + client_uid = SpannerMetricsTracerFactory._generate_client_uid() + assert isinstance(client_uid, str) + assert len(client_uid.split("@")) == 3 # Should contain uuid, pid, and hostname + + def test_generate_client_hash(self): + client_uid = "123e4567-e89b-12d3-a456-426614174000@1234@hostname" + client_hash = SpannerMetricsTracerFactory._generate_client_hash(client_uid) + assert isinstance(client_hash, str) + assert len(client_hash) == 6 # Should be a 6-digit hex string + + def test_get_instance_config(self): + instance_config = SpannerMetricsTracerFactory._get_instance_config() + assert instance_config == "unknown" # As per the current implementation + + def test_get_client_name(self): + client_name = SpannerMetricsTracerFactory._get_client_name() + assert isinstance(client_name, str) + assert "spanner-python" in client_name + + def test_get_location(self): + location = SpannerMetricsTracerFactory._get_location() + assert isinstance(location, str) + assert location # Simply asserting for non empty as this can change depending on the instance this test runs in.