From da102c4bb488400c3e2f00900cfa1fb4d0106e0b Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 9 Nov 2024 11:21:25 -0800 Subject: [PATCH 01/19] all: implement custom tracer_provider injection An important feature for observability is to allow the injection of a custom tracer_provider instead of always using the global tracer_provider by sending in observability_options=dict( tracer_provider=tracer_provider, enable_extended_tracing=True, ) --- docs/opentelemetry-tracing.rst | 10 ++++-- examples/trace.py | 11 ++++--- .../spanner_v1/_opentelemetry_tracing.py | 33 +++++++++++++++++-- google/cloud/spanner_v1/client.py | 10 ++++++ google/cloud/spanner_v1/database.py | 15 +++++++-- google/cloud/spanner_v1/instance.py | 10 ++++++ google/cloud/spanner_v1/pool.py | 33 ++++++++++++++++--- google/cloud/spanner_v1/session.py | 5 ++- tests/unit/test__opentelemetry_tracing.py | 8 ++++- 9 files changed, 117 insertions(+), 18 deletions(-) diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst index cb9a2b1350..8101b831ef 100644 --- a/docs/opentelemetry-tracing.rst +++ b/docs/opentelemetry-tracing.rst @@ -25,12 +25,16 @@ We also need to tell OpenTelemetry which exporter to use. To export Spanner trac # Create and export one trace every 1000 requests sampler = TraceIdRatioBased(1/1000) - # Use the default tracer provider - trace.set_tracer_provider(TracerProvider(sampler=sampler)) - trace.get_tracer_provider().add_span_processor( + tracer_provider = TracerProvider(sampler=sampler) + tracer_provider.add_span_processor( # Initialize the cloud tracing exporter BatchSpanProcessor(CloudTraceSpanExporter()) ) + observability_options = dict( + tracer_provider=tracer_provider, + enable_extended_tracing=True, + ) + spanner = spanner.NewClient(project_id, observability_options=observability_options) To get more fine-grained traces from gRPC, you can enable the gRPC instrumentation by the following diff --git a/examples/trace.py b/examples/trace.py index 791b6cd20b..e7659e13e2 100644 --- a/examples/trace.py +++ b/examples/trace.py @@ -32,15 +32,18 @@ def main(): tracer_provider = TracerProvider(sampler=ALWAYS_ON) trace_exporter = CloudTraceSpanExporter(project_id=project_id) tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) - trace.set_tracer_provider(tracer_provider) - # Retrieve a tracer from the global tracer provider. - tracer = tracer_provider.get_tracer('MyApp') # Setup the Cloud Spanner Client. - spanner_client = spanner.Client(project_id) + spanner_client = spanner.Client( + project_id, + observability_options=dict(tracer_provider=tracer_provider, enable_extended_tracing=True), + ) instance = spanner_client.instance('test-instance') database = instance.database('test-db') + # Retrieve a tracer from our custom tracer provider. + tracer = tracer_provider.get_tracer('MyApp') + # Now run our queries with tracer.start_as_current_span('QueryInformationSchema'): with database.snapshot() as snapshot: diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 51501a07a3..5cc17139b4 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -35,6 +35,20 @@ TRACER_VERSION = gapic_version.__version__ +class ObservabilityOptions: + def __init__(self, tracer_provider=None, enable_extended_tracing=False): + self.__tracer_provider = tracer_provider + self.__enable_extended_tracing = enable_extended_tracing + + @property + def tracer_provider(self): + return self.__tracer_provider + + @property + def enable_extended_tracing(self): + return self.__enable_extended_tracing + + def get_tracer(tracer_provider=None): """ get_tracer is a utility to unify and simplify retrieval of the tracer, without @@ -51,13 +65,21 @@ def get_tracer(tracer_provider=None): @contextmanager -def trace_call(name, session, extra_attributes=None): +def trace_call(name, session, extra_attributes=None, observability_options=None): if not HAS_OPENTELEMETRY_INSTALLED or not session: # Empty context manager. Users will have to check if the generated value is None or a span yield None return - tracer = get_tracer() + tracer_provider = None + enable_extended_tracing = False + if getattr(session, "_observability_options", None): + opts = session._observability_options + if opts: + tracer_provider = opts.tracer_provider + enable_extended_tracing = opts.enable_extended_tracing + + tracer = get_tracer(tracer_provider) # Set base attributes that we know for every trace created attributes = { @@ -72,6 +94,13 @@ def trace_call(name, session, extra_attributes=None): if extra_attributes: attributes.update(extra_attributes) + # TODO(@odeke-em) enable after discussion with team and agreement + # over extended tracing changes as the legacy default is always to + # record SQL statements on spans. + if False and not enable_extended_tracing: + attributes.pop("db.statement", False) + attributes.pop("sql", False) + with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT, attributes=attributes ) as span: diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index f8f3fdb72c..172f396fc2 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -126,6 +126,13 @@ class Client(ClientWithProject): for all ReadRequests and ExecuteSqlRequests that indicates which replicas or regions should be used for non-transactional reads or queries. + :type labels: dict (str -> any) or None + :param observability_options: (Optional) the configuration to control + the tracer's behavior. + tracer_provider is the injected tracer provider + enable_extended_tracing: :type:boolean when set to true will allow for + spans that issue SQL statements to be annotated with SQL. + :raises: :class:`ValueError ` if both ``read_only`` and ``admin`` are :data:`True` """ @@ -146,6 +153,7 @@ def __init__( query_options=None, route_to_leader_enabled=True, directed_read_options=None, + observability_options=None, ): self._emulator_host = _get_spanner_emulator_host() @@ -187,6 +195,7 @@ def __init__( self._route_to_leader_enabled = route_to_leader_enabled self._directed_read_options = directed_read_options + self._observability_options = observability_options @property def credentials(self): @@ -371,6 +380,7 @@ def instance( self._emulator_host, labels, processing_units, + observability_options=self._observability_options, ) def list_instances(self, filter_="", page_size=None): diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index f6c4ceb667..cca0a8b4bf 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -156,6 +156,7 @@ def __init__( database_role=None, enable_drop_protection=False, proto_descriptors=None, + observability_options=None, ): self.database_id = database_id self._instance = instance @@ -178,11 +179,16 @@ def __init__( self._reconciling = False self._directed_read_options = self._instance._client.directed_read_options self._proto_descriptors = proto_descriptors + self._observability_options = observability_options if pool is None: - pool = BurstyPool(database_role=database_role) + pool = BurstyPool( + database_role=database_role, + observability_options=self._observability_options, + ) self._pool = pool + self._pool._observability_options = observability_options pool.bind(self) @classmethod @@ -742,7 +748,12 @@ def session(self, labels=None, database_role=None): # If role is specified in param, then that role is used # instead. role = database_role or self._database_role - return Session(self, labels=labels, database_role=role) + return Session( + self, + labels=labels, + database_role=role, + observability_options=self._observability_options, + ) def snapshot(self, **kw): """Return an object which wraps a snapshot. diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index a67e0e630b..0df145d842 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -110,6 +110,13 @@ class Instance(object): :type labels: dict (str -> str) or None :param labels: (Optional) User-assigned labels for this instance. + + :type labels: dict (str -> any) or None + :param observability_options: (Optional) the configuration to control + the tracer's behavior. + tracer_provider is the injected tracer provider + enable_extended_tracing: :type:boolean when set to true will allow for + spans that issue SQL statements to be annotated with SQL. """ def __init__( @@ -122,6 +129,7 @@ def __init__( emulator_host=None, labels=None, processing_units=None, + observability_options=None, ): self.instance_id = instance_id self._client = client @@ -145,6 +153,7 @@ def __init__( if labels is None: labels = {} self.labels = labels + self._observability_options = observability_options def _update_from_pb(self, instance_pb): """Refresh self from the server-provided protobuf. @@ -499,6 +508,7 @@ def database( database_role=database_role, enable_drop_protection=enable_drop_protection, proto_descriptors=proto_descriptors, + observability_options=self._observability_options, ) else: return TestDatabase( diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 56837bfc0b..408c122443 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -42,11 +42,12 @@ class AbstractSessionPool(object): _database = None - def __init__(self, labels=None, database_role=None): + def __init__(self, labels=None, database_role=None, observability_options=None): if labels is None: labels = {} self._labels = labels self._database_role = database_role + self._observability_options = observability_options @property def labels(self): @@ -178,8 +179,13 @@ def __init__( default_timeout=DEFAULT_TIMEOUT, labels=None, database_role=None, + observability_options=None, ): - super(FixedSizePool, self).__init__(labels=labels, database_role=database_role) + super(FixedSizePool, self).__init__( + labels=labels, + database_role=database_role, + observability_options=observability_options, + ) self.size = size self.default_timeout = default_timeout self._sessions = queue.LifoQueue(size) @@ -284,8 +290,18 @@ class BurstyPool(AbstractSessionPool): :param database_role: (Optional) user-assigned database_role for the session. """ - def __init__(self, target_size=10, labels=None, database_role=None): - super(BurstyPool, self).__init__(labels=labels, database_role=database_role) + def __init__( + self, + target_size=10, + labels=None, + database_role=None, + observability_options=None, + ): + super(BurstyPool, self).__init__( + labels=labels, + database_role=database_role, + observability_options=observability_options, + ) self.target_size = target_size self._database = None self._sessions = queue.LifoQueue(target_size) @@ -392,8 +408,13 @@ def __init__( ping_interval=3000, labels=None, database_role=None, + observability_options=None, ): - super(PingingPool, self).__init__(labels=labels, database_role=database_role) + super(PingingPool, self).__init__( + labels=labels, + database_role=database_role, + observability_options=observability_options, + ) self.size = size self.default_timeout = default_timeout self._delta = datetime.timedelta(seconds=ping_interval) @@ -546,6 +567,7 @@ def __init__( ping_interval=3000, labels=None, database_role=None, + observability_options=None, ): """This throws a deprecation warning on initialization.""" warn( @@ -561,6 +583,7 @@ def __init__( ping_interval, labels=labels, database_role=database_role, + observability_options=observability_options, ) self.begin_pending_transactions() diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 28280282f4..c49ec627c2 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -63,12 +63,15 @@ class Session(object): _session_id = None _transaction = None - def __init__(self, database, labels=None, database_role=None): + def __init__( + self, database, labels=None, database_role=None, observability_options=None + ): self._database = database if labels is None: labels = {} self._labels = labels self._database_role = database_role + self._observability_options = observability_options def __lt__(self, other): return self._session_id < other._session_id diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py index 20e31d9ea6..dd1d5640b0 100644 --- a/tests/unit/test__opentelemetry_tracing.py +++ b/tests/unit/test__opentelemetry_tracing.py @@ -30,7 +30,13 @@ def _make_rpc_error(error_cls, trailing_metadata=None): def _make_session(): from google.cloud.spanner_v1.session import Session - return mock.Mock(autospec=Session, instance=True) + session = mock.Mock(autospec=Session, instance=True) + # Setting _observability_options to None is to avoid the nasty spill-over + # of mock._tracer_provider spuriously failing tests, because per + # unittest.mock.Mock's definition invoking any attribute or method + # returns another mock. + setattr(session, "_observability_options", None) + return session # Skip all of these tests if we don't have OpenTelemetry From 2181ca982656b02f56f6c77ee7836ee9629b5525 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 11 Nov 2024 22:02:19 -0800 Subject: [PATCH 02/19] Address review feedback by attaching observability_options to Client only --- .../spanner_v1/_opentelemetry_tracing.py | 27 ++++++------------- google/cloud/spanner_v1/client.py | 1 - google/cloud/spanner_v1/database.py | 9 +++---- google/cloud/spanner_v1/instance.py | 14 +++------- google/cloud/spanner_v1/pool.py | 11 +------- google/cloud/spanner_v1/session.py | 5 +--- tests/unit/test__opentelemetry_tracing.py | 3 ++- 7 files changed, 20 insertions(+), 50 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 5cc17139b4..c854f42c0c 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -35,20 +35,6 @@ TRACER_VERSION = gapic_version.__version__ -class ObservabilityOptions: - def __init__(self, tracer_provider=None, enable_extended_tracing=False): - self.__tracer_provider = tracer_provider - self.__enable_extended_tracing = enable_extended_tracing - - @property - def tracer_provider(self): - return self.__tracer_provider - - @property - def enable_extended_tracing(self): - return self.__enable_extended_tracing - - def get_tracer(tracer_provider=None): """ get_tracer is a utility to unify and simplify retrieval of the tracer, without @@ -73,11 +59,14 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) tracer_provider = None enable_extended_tracing = False - if getattr(session, "_observability_options", None): - opts = session._observability_options - if opts: - tracer_provider = opts.tracer_provider - enable_extended_tracing = opts.enable_extended_tracing + if observability_options is None and getattr(session, "_database", None): + observability_options = getattr( + session._database, "observability_options", None + ) + + if observability_options: + tracer_provider = observability_options.tracer_provider + enable_extended_tracing = observability_options.enable_extended_tracing tracer = get_tracer(tracer_provider) diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index 172f396fc2..84d0bc7a8f 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -380,7 +380,6 @@ def instance( self._emulator_host, labels, processing_units, - observability_options=self._observability_options, ) def list_instances(self, filter_="", page_size=None): diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index cca0a8b4bf..d297ca15af 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -156,7 +156,6 @@ def __init__( database_role=None, enable_drop_protection=False, proto_descriptors=None, - observability_options=None, ): self.database_id = database_id self._instance = instance @@ -179,16 +178,13 @@ def __init__( self._reconciling = False self._directed_read_options = self._instance._client.directed_read_options self._proto_descriptors = proto_descriptors - self._observability_options = observability_options if pool is None: pool = BurstyPool( database_role=database_role, - observability_options=self._observability_options, ) self._pool = pool - self._pool._observability_options = observability_options pool.bind(self) @classmethod @@ -752,7 +748,6 @@ def session(self, labels=None, database_role=None): self, labels=labels, database_role=role, - observability_options=self._observability_options, ) def snapshot(self, **kw): @@ -1722,6 +1717,10 @@ def close(self): if self._session is not None: self._session.delete() + @property + def observability_options(self): + return self._instance.observability_options + def _check_ddl_statements(value): """Validate DDL Statements used to define database schema. diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index 0df145d842..f16d2088b9 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -110,13 +110,6 @@ class Instance(object): :type labels: dict (str -> str) or None :param labels: (Optional) User-assigned labels for this instance. - - :type labels: dict (str -> any) or None - :param observability_options: (Optional) the configuration to control - the tracer's behavior. - tracer_provider is the injected tracer provider - enable_extended_tracing: :type:boolean when set to true will allow for - spans that issue SQL statements to be annotated with SQL. """ def __init__( @@ -129,7 +122,6 @@ def __init__( emulator_host=None, labels=None, processing_units=None, - observability_options=None, ): self.instance_id = instance_id self._client = client @@ -153,7 +145,6 @@ def __init__( if labels is None: labels = {} self.labels = labels - self._observability_options = observability_options def _update_from_pb(self, instance_pb): """Refresh self from the server-provided protobuf. @@ -508,7 +499,6 @@ def database( database_role=database_role, enable_drop_protection=enable_drop_protection, proto_descriptors=proto_descriptors, - observability_options=self._observability_options, ) else: return TestDatabase( @@ -743,3 +733,7 @@ def _item_to_operation(self, operation_pb): return google.api_core.operation.from_gapic( operation_pb, operations_client, response_type, metadata_type=metadata_type ) + + @property + def observability_options(self): + return self._client.observability_options diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 408c122443..9e3d569d4a 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -42,12 +42,11 @@ class AbstractSessionPool(object): _database = None - def __init__(self, labels=None, database_role=None, observability_options=None): + def __init__(self, labels=None, database_role=None): if labels is None: labels = {} self._labels = labels self._database_role = database_role - self._observability_options = observability_options @property def labels(self): @@ -179,12 +178,10 @@ def __init__( default_timeout=DEFAULT_TIMEOUT, labels=None, database_role=None, - observability_options=None, ): super(FixedSizePool, self).__init__( labels=labels, database_role=database_role, - observability_options=observability_options, ) self.size = size self.default_timeout = default_timeout @@ -295,12 +292,10 @@ def __init__( target_size=10, labels=None, database_role=None, - observability_options=None, ): super(BurstyPool, self).__init__( labels=labels, database_role=database_role, - observability_options=observability_options, ) self.target_size = target_size self._database = None @@ -408,12 +403,10 @@ def __init__( ping_interval=3000, labels=None, database_role=None, - observability_options=None, ): super(PingingPool, self).__init__( labels=labels, database_role=database_role, - observability_options=observability_options, ) self.size = size self.default_timeout = default_timeout @@ -567,7 +560,6 @@ def __init__( ping_interval=3000, labels=None, database_role=None, - observability_options=None, ): """This throws a deprecation warning on initialization.""" warn( @@ -583,7 +575,6 @@ def __init__( ping_interval, labels=labels, database_role=database_role, - observability_options=observability_options, ) self.begin_pending_transactions() diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index c49ec627c2..28280282f4 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -63,15 +63,12 @@ class Session(object): _session_id = None _transaction = None - def __init__( - self, database, labels=None, database_role=None, observability_options=None - ): + def __init__(self, database, labels=None, database_role=None): self._database = database if labels is None: labels = {} self._labels = labels self._database_role = database_role - self._observability_options = observability_options def __lt__(self, other): return self._session_id < other._session_id diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py index dd1d5640b0..50e3a69aba 100644 --- a/tests/unit/test__opentelemetry_tracing.py +++ b/tests/unit/test__opentelemetry_tracing.py @@ -35,7 +35,8 @@ def _make_session(): # of mock._tracer_provider spuriously failing tests, because per # unittest.mock.Mock's definition invoking any attribute or method # returns another mock. - setattr(session, "_observability_options", None) + db = session._database + setattr(db, "observability_options", None) return session From 540d0b8f1cce58da05e28c5412305f7abeb0bd4b Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 11 Nov 2024 23:26:01 -0800 Subject: [PATCH 03/19] Attach observability_options directly before trace_call --- .../spanner_v1/_opentelemetry_tracing.py | 8 ++--- google/cloud/spanner_v1/batch.py | 16 ++++++++-- google/cloud/spanner_v1/database.py | 12 ++------ google/cloud/spanner_v1/instance.py | 4 --- google/cloud/spanner_v1/session.py | 20 +++++++++++-- google/cloud/spanner_v1/snapshot.py | 9 +++++- google/cloud/spanner_v1/transaction.py | 29 ++++++++++++++++--- 7 files changed, 70 insertions(+), 28 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index c854f42c0c..7175ef60dd 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -59,10 +59,6 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) tracer_provider = None enable_extended_tracing = False - if observability_options is None and getattr(session, "_database", None): - observability_options = getattr( - session._database, "observability_options", None - ) if observability_options: tracer_provider = observability_options.tracer_provider @@ -85,7 +81,9 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) # TODO(@odeke-em) enable after discussion with team and agreement # over extended tracing changes as the legacy default is always to - # record SQL statements on spans. + # record SQL statements on spans, because the prior behavior was + # to always record the SQL statement and changing it is 's considered + # a breaking change by the Python-Spanner team. if False and not enable_extended_tracing: attributes.pop("db.statement", False) attributes.pop("sql", False) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index e3d681189c..948740d7d4 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -205,7 +205,13 @@ def commit( max_commit_delay=max_commit_delay, request_options=request_options, ) - with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + observability_options = getattr(database, "observability_options", None) + with trace_call( + "CloudSpanner.Commit", + self._session, + trace_attributes, + observability_options=observability_options, + ): method = functools.partial( api.commit, request=request, @@ -318,7 +324,13 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals request_options=request_options, exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) - with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes): + observability_options = getattr(database, "observability_options", None) + with trace_call( + "CloudSpanner.BatchWrite", + self._session, + trace_attributes, + observability_options=observability_options, + ): method = functools.partial( api.batch_write, request=request, diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index d297ca15af..e0e66a38e9 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -180,9 +180,7 @@ def __init__( self._proto_descriptors = proto_descriptors if pool is None: - pool = BurstyPool( - database_role=database_role, - ) + pool = BurstyPool(database_role=database_role) self._pool = pool pool.bind(self) @@ -744,11 +742,7 @@ def session(self, labels=None, database_role=None): # If role is specified in param, then that role is used # instead. role = database_role or self._database_role - return Session( - self, - labels=labels, - database_role=role, - ) + return Session(self, labels=labels, database_role=role) def snapshot(self, **kw): """Return an object which wraps a snapshot. @@ -1719,7 +1713,7 @@ def close(self): @property def observability_options(self): - return self._instance.observability_options + return self._instance._observability_options def _check_ddl_statements(value): diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index f16d2088b9..a67e0e630b 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -733,7 +733,3 @@ def _item_to_operation(self, operation_pb): return google.api_core.operation.from_gapic( operation_pb, operations_client, response_type, metadata_type=metadata_type ) - - @property - def observability_options(self): - return self._client.observability_options diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 28280282f4..6281148590 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -142,7 +142,13 @@ def create(self): if self._labels: request.session.labels = self._labels - with trace_call("CloudSpanner.CreateSession", self, self._labels): + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.CreateSession", + self, + self._labels, + observability_options=observability_options, + ): session_pb = api.create_session( request=request, metadata=metadata, @@ -169,7 +175,10 @@ def exists(self): ) ) - with trace_call("CloudSpanner.GetSession", self) as span: + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.GetSession", self, observability_options=observability_options + ) as span: try: api.get_session(name=self.name, metadata=metadata) if span: @@ -194,7 +203,12 @@ def delete(self): raise ValueError("Session ID not set by back-end") api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - with trace_call("CloudSpanner.DeleteSession", self): + observability_options = getattr(self._database, "observability_options", None) + with trace_call( + "CloudSpanner.DeleteSession", + self, + observability_options=observability_options, + ): api.delete_session(name=self.name, metadata=metadata) def ping(self): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 3bc1a746bd..a20e494f78 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -84,7 +84,14 @@ def _restart_on_unavailable( ) request.transaction = transaction_selector - with trace_call(trace_name, session, attributes): + observability_options = None + if session and session._database: + observability_options = getattr( + session._database, "observability_options", None + ) + with trace_call( + trace_name, session, attributes, observability_options=observability_options + ): iterator = method(request=request) while True: try: diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index c872cc380d..b12141427e 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -110,7 +110,12 @@ def _execute_request( """ transaction = self._make_txn_selector() request.transaction = transaction - with trace_call(trace_name, session, attributes): + observability_options = getattr( + self._session._database, "observability_options", None + ) + with trace_call( + trace_name, session, attributes, observability_options=observability_options + ): method = functools.partial(method, request=request) response = _retry( method, @@ -147,7 +152,12 @@ def begin(self): read_write=TransactionOptions.ReadWrite(), exclude_txn_from_change_streams=self.exclude_txn_from_change_streams, ) - with trace_call("CloudSpanner.BeginTransaction", self._session): + observability_options = getattr(database, "observability_options", None) + with trace_call( + "CloudSpanner.BeginTransaction", + self._session, + observability_options=observability_options, + ): method = functools.partial( api.begin_transaction, session=self._session.name, @@ -175,7 +185,12 @@ def rollback(self): database._route_to_leader_enabled ) ) - with trace_call("CloudSpanner.Rollback", self._session): + observability_options = getattr(database, "observability_options", None) + with trace_call( + "CloudSpanner.Rollback", + self._session, + observability_options=observability_options, + ): method = functools.partial( api.rollback, session=self._session.name, @@ -248,7 +263,13 @@ def commit( max_commit_delay=max_commit_delay, request_options=request_options, ) - with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + observability_options = getattr(database, "observability_options", None) + with trace_call( + "CloudSpanner.Commit", + self._session, + trace_attributes, + observability_options=observability_options, + ): method = functools.partial( api.commit, request=request, From 0b9ca49e1ddff92b9e072c444af395937d047611 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 11 Nov 2024 23:50:41 -0800 Subject: [PATCH 04/19] More reverts for formatting --- google/cloud/spanner_v1/pool.py | 22 ++++------------------ google/cloud/spanner_v1/transaction.py | 2 +- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 9e3d569d4a..56837bfc0b 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -179,10 +179,7 @@ def __init__( labels=None, database_role=None, ): - super(FixedSizePool, self).__init__( - labels=labels, - database_role=database_role, - ) + super(FixedSizePool, self).__init__(labels=labels, database_role=database_role) self.size = size self.default_timeout = default_timeout self._sessions = queue.LifoQueue(size) @@ -287,16 +284,8 @@ class BurstyPool(AbstractSessionPool): :param database_role: (Optional) user-assigned database_role for the session. """ - def __init__( - self, - target_size=10, - labels=None, - database_role=None, - ): - super(BurstyPool, self).__init__( - labels=labels, - database_role=database_role, - ) + def __init__(self, target_size=10, labels=None, database_role=None): + super(BurstyPool, self).__init__(labels=labels, database_role=database_role) self.target_size = target_size self._database = None self._sessions = queue.LifoQueue(target_size) @@ -404,10 +393,7 @@ def __init__( labels=None, database_role=None, ): - super(PingingPool, self).__init__( - labels=labels, - database_role=database_role, - ) + super(PingingPool, self).__init__(labels=labels, database_role=database_role) self.size = size self.default_timeout = default_timeout self._delta = datetime.timedelta(seconds=ping_interval) diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index b12141427e..fbe8b379eb 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -268,7 +268,7 @@ def commit( "CloudSpanner.Commit", self._session, trace_attributes, - observability_options=observability_options, + observability_options, ): method = functools.partial( api.commit, From 854a3ffd61123cd2566f59064baafdf03b1516e6 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 12 Nov 2024 16:55:44 -0800 Subject: [PATCH 05/19] Plumb observability_options into _restart_on_unavailable --- google/cloud/spanner_v1/database.py | 7 ++++- google/cloud/spanner_v1/snapshot.py | 47 +++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index e0e66a38e9..cc058a367a 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -718,6 +718,7 @@ def execute_pdml(): method=method, request=request, transaction_selector=txn_selector, + observability_options=self.observability_options, ) result_set = StreamedResultSet(iterator) @@ -1713,7 +1714,11 @@ def close(self): @property def observability_options(self): - return self._instance._observability_options + """ + Returns the observability options that you set when creating + the SpannerClient. + """ + return self._instance._client._observability_options def _check_ddl_statements(value): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index a20e494f78..4035c57f28 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -56,6 +56,7 @@ def _restart_on_unavailable( attributes=None, transaction=None, transaction_selector=None, + observability_options=None, ): """Restart iteration after :exc:`.ServiceUnavailable`. @@ -84,11 +85,7 @@ def _restart_on_unavailable( ) request.transaction = transaction_selector - observability_options = None - if session and session._database: - observability_options = getattr( - session._database, "observability_options", None - ) + with trace_call( trace_name, session, attributes, observability_options=observability_options ): @@ -111,7 +108,12 @@ def _restart_on_unavailable( break except ServiceUnavailable: del item_buffer[:] - with trace_call(trace_name, session, attributes): + with trace_call( + trace_name, + session, + attributes, + observability_options=observability_options, + ): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -126,7 +128,12 @@ def _restart_on_unavailable( if not resumable_error: raise del item_buffer[:] - with trace_call(trace_name, session, attributes): + with trace_call( + trace_name, + session, + attributes, + observability_options=observability_options, + ): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -317,6 +324,7 @@ def read( self._session, trace_attributes, transaction=self, + observability_options=session_observability_options(self._session), ) self._read_request_count += 1 if self._multi_use: @@ -333,6 +341,7 @@ def read( self._session, trace_attributes, transaction=self, + observability_options=session_observability_options(self._session), ) self._read_request_count += 1 @@ -516,6 +525,7 @@ def _get_streamed_result_set(self, restart, request, trace_attributes, column_in self._session, trace_attributes, transaction=self, + observability_options=session_observability_options(self._session), ) self._read_request_count += 1 self._execute_sql_count += 1 @@ -605,7 +615,10 @@ def partition_read( trace_attributes = {"table_id": table, "columns": columns} with trace_call( - "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes + "CloudSpanner.PartitionReadOnlyTransaction", + self._session, + trace_attributes, + observability_options=observability_options, ): method = functools.partial( api.partition_read, @@ -708,6 +721,7 @@ def partition_query( "CloudSpanner.PartitionReadWriteTransaction", self._session, trace_attributes, + observability_options=observability_options, ): method = functools.partial( api.partition_query, @@ -850,7 +864,11 @@ def begin(self): (_metadata_with_leader_aware_routing(database._route_to_leader_enabled)) ) txn_selector = self._make_txn_selector() - with trace_call("CloudSpanner.BeginTransaction", self._session): + with trace_call( + "CloudSpanner.BeginTransaction", + self._session, + observability_options=observability_options, + ): method = functools.partial( api.begin_transaction, session=self._session.name, @@ -864,3 +882,14 @@ def begin(self): self._transaction_id = response.id self._transaction_read_timestamp = response.read_timestamp return self._transaction_id + + +def session_observability_options(session): + if not session: + return None + + db = getattr(session, "_database", None) + if not db: + return None + + return getattr(db, "observability_options", None) From 3e9a8caae501140e22032fc4b31ab034660fb02c Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 12 Nov 2024 17:55:03 -0800 Subject: [PATCH 06/19] completely decouple observability_options from session --- google/cloud/spanner_v1/database.py | 7 +++-- google/cloud/spanner_v1/snapshot.py | 42 ++++++++++++++------------ google/cloud/spanner_v1/transaction.py | 19 +++++++++--- 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index cc058a367a..626124eefb 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -718,7 +718,7 @@ def execute_pdml(): method=method, request=request, transaction_selector=txn_selector, - observability_options=self.observability_options, + observability_options=getattr(self, "observability_options", None), ) result_set = StreamedResultSet(iterator) @@ -1718,7 +1718,10 @@ def observability_options(self): Returns the observability options that you set when creating the SpannerClient. """ - return self._instance._client._observability_options + if not self._instance: + return None + + return getattr(self._instance._client, "observability_options", None) def _check_ddl_statements(value): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 4035c57f28..0d7984e54e 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -324,7 +324,9 @@ def read( self._session, trace_attributes, transaction=self, - observability_options=session_observability_options(self._session), + observability_options=getattr( + database, "observability_options", None + ), ) self._read_request_count += 1 if self._multi_use: @@ -341,7 +343,7 @@ def read( self._session, trace_attributes, transaction=self, - observability_options=session_observability_options(self._session), + observability_options=getattr(database, "observability_options", None), ) self._read_request_count += 1 @@ -505,19 +507,30 @@ def execute_sql( ) trace_attributes = {"db.statement": sql} + observability_options = getattr(database, "observability_options", None) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc with self._lock: return self._get_streamed_result_set( - restart, request, trace_attributes, column_info + restart, + request, + trace_attributes, + column_info, + observability_options, ) else: return self._get_streamed_result_set( - restart, request, trace_attributes, column_info + restart, + request, + trace_attributes, + column_info, + observability_options, ) - def _get_streamed_result_set(self, restart, request, trace_attributes, column_info): + def _get_streamed_result_set( + self, restart, request, trace_attributes, column_info, observability_options + ): iterator = _restart_on_unavailable( restart, request, @@ -525,7 +538,7 @@ def _get_streamed_result_set(self, restart, request, trace_attributes, column_in self._session, trace_attributes, transaction=self, - observability_options=session_observability_options(self._session), + observability_options=observability_options, ) self._read_request_count += 1 self._execute_sql_count += 1 @@ -618,7 +631,7 @@ def partition_read( "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes, - observability_options=observability_options, + observability_options=getattr(database, "observability_options", None), ): method = functools.partial( api.partition_read, @@ -721,7 +734,7 @@ def partition_query( "CloudSpanner.PartitionReadWriteTransaction", self._session, trace_attributes, - observability_options=observability_options, + observability_options=getattr(database, "observability_options", None), ): method = functools.partial( api.partition_query, @@ -867,7 +880,7 @@ def begin(self): with trace_call( "CloudSpanner.BeginTransaction", self._session, - observability_options=observability_options, + observability_options=getattr(database, "observability_options", None), ): method = functools.partial( api.begin_transaction, @@ -882,14 +895,3 @@ def begin(self): self._transaction_id = response.id self._transaction_read_timestamp = response.read_timestamp return self._transaction_id - - -def session_observability_options(session): - if not session: - return None - - db = getattr(session, "_database", None) - if not db: - return None - - return getattr(db, "observability_options", None) diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index fbe8b379eb..4ff824f192 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -98,7 +98,13 @@ def _make_txn_selector(self): return TransactionSelector(id=self._transaction_id) def _execute_request( - self, method, request, trace_name=None, session=None, attributes=None + self, + method, + request, + trace_name=None, + session=None, + attributes=None, + observability_options=None, ): """Helper method to execute request after fetching transaction selector. @@ -110,9 +116,6 @@ def _execute_request( """ transaction = self._make_txn_selector() request.transaction = transaction - observability_options = getattr( - self._session._database, "observability_options", None - ) with trace_call( trace_name, session, attributes, observability_options=observability_options ): @@ -383,6 +386,9 @@ def execute_update( # environment-level options default_query_options = database._instance._client._query_options query_options = _merge_query_options(default_query_options, query_options) + observability_options = getattr( + database._instance._client, "observability_options", None + ) if request_options is None: request_options = RequestOptions() @@ -420,6 +426,7 @@ def execute_update( "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes, + observability_options, ) # Setting the transaction id because the transaction begin was inlined for first rpc. if ( @@ -436,6 +443,7 @@ def execute_update( "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes, + observability_options, ) return response.stats.row_count_exact @@ -502,6 +510,7 @@ def batch_update( _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) api = database.spanner_api + observability_options = getattr(database, "observability_options", None) seqno, self._execute_sql_count = ( self._execute_sql_count, @@ -542,6 +551,7 @@ def batch_update( "CloudSpanner.DMLTransaction", self._session, trace_attributes, + observability_options=observability_options, ) # Setting the transaction id because the transaction begin was inlined for first rpc. for result_set in response.result_sets: @@ -559,6 +569,7 @@ def batch_update( "CloudSpanner.DMLTransaction", self._session, trace_attributes, + observability_options=observability_options, ) row_counts = [ From 8af9e75dbdd11226699ba18f2ca7d68f708150e3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 12 Nov 2024 18:43:32 -0800 Subject: [PATCH 07/19] apply SPANNER_ENABLE_EXTENDED_TRACING but in inverse due to compatibility --- docs/opentelemetry-tracing.rst | 7 ++++++- .../spanner_v1/_opentelemetry_tracing.py | 20 ++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst index 8101b831ef..df507cb648 100644 --- a/docs/opentelemetry-tracing.rst +++ b/docs/opentelemetry-tracing.rst @@ -32,7 +32,12 @@ We also need to tell OpenTelemetry which exporter to use. To export Spanner trac ) observability_options = dict( tracer_provider=tracer_provider, - enable_extended_tracing=True, + + # By default extended_tracing is set to True due + # to legacy reasons to avoid breaking changes, you + # can modify it though using the environment variable + # SPANNER_ENABLE_EXTENDED_TRACING=false. + enable_extended_tracing=False, ) spanner = spanner.NewClient(project_id, observability_options=observability_options) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 7175ef60dd..b8e940b7c3 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -15,6 +15,7 @@ """Manages OpenTelemetry trace creation and handling""" from contextlib import contextmanager +import os from google.cloud.spanner_v1 import SpannerClient from google.cloud.spanner_v1 import gapic_version @@ -33,6 +34,9 @@ TRACER_NAME = "cloud.google.com/python/spanner" TRACER_VERSION = gapic_version.__version__ +extendedTracingGloballyDisabled = ( + os.environ.get("SPANNER_ENABLE_EXTENDED_TRACING", None) == "false" +) def get_tracer(tracer_provider=None): @@ -58,7 +62,11 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) return tracer_provider = None - enable_extended_tracing = False + + # By default enable_extended_tracing=True because in a bid to minimize + # breaking changes and preserve legacy behavior, we are keeping it turned + # on by default. + enable_extended_tracing = True if observability_options: tracer_provider = observability_options.tracer_provider @@ -79,12 +87,10 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) if extra_attributes: attributes.update(extra_attributes) - # TODO(@odeke-em) enable after discussion with team and agreement - # over extended tracing changes as the legacy default is always to - # record SQL statements on spans, because the prior behavior was - # to always record the SQL statement and changing it is 's considered - # a breaking change by the Python-Spanner team. - if False and not enable_extended_tracing: + if extendedTracingGloballyDisabled: + enable_extended_tracing = False + + if not enable_extended_tracing: attributes.pop("db.statement", False) attributes.pop("sql", False) From 7adab38a63c04d8376226facd3f950496027c353 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 12 Nov 2024 18:55:36 -0800 Subject: [PATCH 08/19] Document SPANNER_ENABLE_EXTENDED_TRACING in environment --- docs/opentelemetry-tracing.rst | 10 ++++++++++ google/cloud/spanner_v1/_opentelemetry_tracing.py | 3 +-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst index df507cb648..c715ad58ad 100644 --- a/docs/opentelemetry-tracing.rst +++ b/docs/opentelemetry-tracing.rst @@ -61,3 +61,13 @@ Generated spanner traces should now be available on `Cloud Trace `_ + +Annotating spans with SQL +~~~~~~~~~~~~~~~~~~~~~~~~~ + +By default your spans will be annotated with SQL statements where appropriate, but that can be a PII (Personally Identifiable Information) +leak. Sadly due to legacy behavior, we cannot simply turn off this behavior by default. However you can control this behavior by setting + + SPANNER_ENABLE_EXTENDED_TRACING=false + +to turn it off globally or when creating each SpannerClient, please set `observability_options.enable_extended_tracing=false` diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index b8e940b7c3..9c658278e1 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -35,7 +35,7 @@ TRACER_NAME = "cloud.google.com/python/spanner" TRACER_VERSION = gapic_version.__version__ extendedTracingGloballyDisabled = ( - os.environ.get("SPANNER_ENABLE_EXTENDED_TRACING", None) == "false" + os.environ.get("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false" ) @@ -92,7 +92,6 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) if not enable_extended_tracing: attributes.pop("db.statement", False) - attributes.pop("sql", False) with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT, attributes=attributes From afd01e1bea9f2e97b3fc3057181eb50de57f2c03 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 12 Nov 2024 19:24:00 -0800 Subject: [PATCH 09/19] Revert a vestige of mock --- tests/unit/test__opentelemetry_tracing.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py index 50e3a69aba..20e31d9ea6 100644 --- a/tests/unit/test__opentelemetry_tracing.py +++ b/tests/unit/test__opentelemetry_tracing.py @@ -30,14 +30,7 @@ def _make_rpc_error(error_cls, trailing_metadata=None): def _make_session(): from google.cloud.spanner_v1.session import Session - session = mock.Mock(autospec=Session, instance=True) - # Setting _observability_options to None is to avoid the nasty spill-over - # of mock._tracer_provider spuriously failing tests, because per - # unittest.mock.Mock's definition invoking any attribute or method - # returns another mock. - db = session._database - setattr(db, "observability_options", None) - return session + return mock.Mock(autospec=Session, instance=True) # Skip all of these tests if we don't have OpenTelemetry From ca8d598cfa60db5fd23b11eb8e78256270d7542c Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 12 Nov 2024 20:14:23 -0800 Subject: [PATCH 10/19] tests: add unit test for propagating TracerProvider --- tests/unit/test_client.py | 48 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 174e5116c2..2acde11aab 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -17,6 +17,20 @@ import mock from google.cloud.spanner_v1 import DirectedReadOptions +hasOtelInstalled = False + +try: + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + from opentelemetry import trace + hasOtelInstalled = True +except ImportError: + pass + def _make_credentials(): import google.auth.credentials @@ -686,3 +700,37 @@ def test_list_instances_w_options(self): retry=mock.ANY, timeout=mock.ANY, ) + + def test_observability_options(self): + if not hasOtelInstalled: + return + + global_tracer_provider = TracerProvider(sampler=ALWAYS_ON) + trace.set_tracer_provider(global_tracer_provider) + global_trace_exporter = InMemorySpanExporter() + global_tracer_provider.add_span_processor(SimpleSpanProcessor(global_trace_exporter)) + + inject_tracer_provider = TracerProvider(sampler=ALWAYS_ON) + inject_trace_exporter = InMemorySpanExporter() + inject_tracer_provider.add_span_processor(SimpleSpanProcessor(inject_trace_exporter)) + observability_options = dict(tracer_provder=inject_tracer_provider, enable_extended_tracing=True) + credentials = _make_credentials() + client = self._make_one(project=self.PROJECT, credentials=credentials, observability_options=observability_options) + + instance = client.instance( + self.INSTANCE_ID, + self.CONFIGURATION_NAME, + display_name=self.DISPLAY_NAME, + node_count=self.NODE_COUNT, + labels=self.LABELS, + ) + + db = instance.database(self.DATABASE_ID, enable_interceptors_in_tests=True) + response = dict() + db.execute_sql.return_value = response + db.execute_sql('SELECT 1') + + from_global_spans = global_trace_exporter.get_finished_spans() + from_inject_spans = inject_trace_exporter.get_finished_spans() + self.assertEqual(len(from_global_spans), 0, 'Expecting no spans from the global trace exporter') + self.assertEqual(len(from_global_spans) > 0, 'Expecting at least 1 span from the injected trace exporter') From 6fb046c91a3138d1d18b66286f953da84e6df15b Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 01:15:25 -0800 Subject: [PATCH 11/19] Add preliminary end-to-end test to check for injection of observability_options --- .../spanner_v1/_opentelemetry_tracing.py | 6 +- google/cloud/spanner_v1/client.py | 9 +++ google/cloud/spanner_v1/database.py | 24 +++--- tests/unit/test_client.py | 77 ++++++++++++++++--- 4 files changed, 91 insertions(+), 25 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 9c658278e1..db53f4eb48 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -69,8 +69,10 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) enable_extended_tracing = True if observability_options: - tracer_provider = observability_options.tracer_provider - enable_extended_tracing = observability_options.enable_extended_tracing + tracer_provider = observability_options.get("tracer_provider", None) + enable_extended_tracing = observability_options.get( + "enable_extended_tracing", enable_extended_tracing + ) tracer = get_tracer(tracer_provider) diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index 84d0bc7a8f..b60ec7fb77 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -277,6 +277,15 @@ def route_to_leader_enabled(self): """ return self._route_to_leader_enabled + @property + def observability_options(self): + """Getter for observability_options. + + :rtype: dict + :returns: The configured observability_options if set. + """ + return self._observability_options + @property def directed_read_options(self): """Getter for directed_read_options. diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 626124eefb..3329bd6e69 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -718,7 +718,7 @@ def execute_pdml(): method=method, request=request, transaction_selector=txn_selector, - observability_options=getattr(self, "observability_options", None), + observability_options=self.observability_options, ) result_set = StreamedResultSet(iterator) @@ -1107,6 +1107,17 @@ def set_iam_policy(self, policy): response = api.set_iam_policy(request=request, metadata=metadata) return response + @property + def observability_options(self): + """ + Returns the observability options that you set when creating + the SpannerClient. + """ + if not self._instance: + return None + + return self._instance._client.observability_options + class BatchCheckout(object): """Context manager for using a batch from a database. @@ -1712,17 +1723,6 @@ def close(self): if self._session is not None: self._session.delete() - @property - def observability_options(self): - """ - Returns the observability options that you set when creating - the SpannerClient. - """ - if not self._instance: - return None - - return getattr(self._instance._client, "observability_options", None) - def _check_ddl_statements(value): """Validate DDL Statements used to define database schema. diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 2acde11aab..5d816e3416 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import unittest import mock @@ -27,6 +28,7 @@ from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.sampling import ALWAYS_ON from opentelemetry import trace + hasOtelInstalled = True except ImportError: pass @@ -50,6 +52,7 @@ class TestClient(unittest.TestCase): INSTANCE_ID = "instance-id" INSTANCE_NAME = "%s/instances/%s" % (PATH, INSTANCE_ID) DISPLAY_NAME = "display-name" + DATABASE_ID = "database" NODE_COUNT = 5 PROCESSING_UNITS = 5000 LABELS = {"test": "true"} @@ -701,21 +704,45 @@ def test_list_instances_w_options(self): timeout=mock.ANY, ) - def test_observability_options(self): + def test_observability_options_propagated_extended_tracing_off(self): + self.__test_observability_options(True) + + def test_observability_options_propagated(self): + self.__test_observability_options(False) + + def __test_observability_options(self, enable_extended_tracing): if not hasOtelInstalled: return + # This test needs the spanner emulator setup so as to run. + # TODO: This test should be fully enabled to not use the Spanner Emulator + # once Python mockSpanner is available as it tests end-to-end that + # observability_options were propagated. + if os.environ.get("SPANNER_EMULATOR_HOST", None) is None: + return + global_tracer_provider = TracerProvider(sampler=ALWAYS_ON) trace.set_tracer_provider(global_tracer_provider) global_trace_exporter = InMemorySpanExporter() - global_tracer_provider.add_span_processor(SimpleSpanProcessor(global_trace_exporter)) + global_tracer_provider.add_span_processor( + SimpleSpanProcessor(global_trace_exporter) + ) inject_tracer_provider = TracerProvider(sampler=ALWAYS_ON) inject_trace_exporter = InMemorySpanExporter() - inject_tracer_provider.add_span_processor(SimpleSpanProcessor(inject_trace_exporter)) - observability_options = dict(tracer_provder=inject_tracer_provider, enable_extended_tracing=True) + inject_tracer_provider.add_span_processor( + SimpleSpanProcessor(inject_trace_exporter) + ) + observability_options = dict( + tracer_provider=inject_tracer_provider, + enable_extended_tracing=enable_extended_tracing, + ) credentials = _make_credentials() - client = self._make_one(project=self.PROJECT, credentials=credentials, observability_options=observability_options) + client = self._make_one( + project=self.PROJECT, + credentials=credentials, + observability_options=observability_options, + ) instance = client.instance( self.INSTANCE_ID, @@ -725,12 +752,40 @@ def test_observability_options(self): labels=self.LABELS, ) - db = instance.database(self.DATABASE_ID, enable_interceptors_in_tests=True) - response = dict() - db.execute_sql.return_value = response - db.execute_sql('SELECT 1') + db = instance.database(self.DATABASE_ID) + self.assertEqual(db.observability_options, observability_options) + with db.snapshot() as snapshot: + res = snapshot.execute_sql("SELECT 1") + for val in res: + _ = val from_global_spans = global_trace_exporter.get_finished_spans() from_inject_spans = inject_trace_exporter.get_finished_spans() - self.assertEqual(len(from_global_spans), 0, 'Expecting no spans from the global trace exporter') - self.assertEqual(len(from_global_spans) > 0, 'Expecting at least 1 span from the injected trace exporter') + self.assertEqual( + len(from_global_spans), + 0, + "Expecting no spans from the global trace exporter", + ) + self.assertEqual( + len(from_inject_spans) >= 2, + True, + "Expecting at least 2 spans from the injected trace exporter", + ) + gotNames = [span.name for span in from_inject_spans] + wantNames = ["CloudSpanner.CreateSession", "CloudSpanner.ReadWriteTransaction"] + self.assertEqual( + gotNames, + wantNames, + "Span names mismatch", + ) + + # Check for conformance of enable_extended_tracing + lastSpan = from_inject_spans[len(from_inject_spans) - 1] + wantAnnotatedSQL = "SELECT 1" + if not enable_extended_tracing: + wantAnnotatedSQL = None + self.assertEqual( + lastSpan.attributes.get("db.statement", None), + wantAnnotatedSQL, + "Mismatch in annotated sql", + ) From fb3e8eaea0a800f6c797013b963b2aa9440bdc69 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 10:16:57 -0800 Subject: [PATCH 12/19] Document default enable_extended_tracing value --- google/cloud/spanner_v1/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index b60ec7fb77..bc6cbb076c 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -132,6 +132,9 @@ class Client(ClientWithProject): tracer_provider is the injected tracer provider enable_extended_tracing: :type:boolean when set to true will allow for spans that issue SQL statements to be annotated with SQL. + Default `True`, please set it to `False` to turn it off + or you can use the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=` + to control it. :raises: :class:`ValueError ` if both ``read_only`` and ``admin`` are :data:`True` From 762390c5511e0e74f03bde06b8341e76cdd52fce Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 10:48:07 -0800 Subject: [PATCH 13/19] Carve out observability_options test --- .../spanner_v1/_opentelemetry_tracing.py | 4 +- tests/system/test_observability.py | 149 ++++++++++++++++++ tests/unit/test_client.py | 103 ------------ 3 files changed, 151 insertions(+), 105 deletions(-) create mode 100644 tests/system/test_observability.py diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index db53f4eb48..4adef9d289 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -34,7 +34,7 @@ TRACER_NAME = "cloud.google.com/python/spanner" TRACER_VERSION = gapic_version.__version__ -extendedTracingGloballyDisabled = ( +extended_tracing_globally_disabled = ( os.environ.get("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false" ) @@ -89,7 +89,7 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) if extra_attributes: attributes.update(extra_attributes) - if extendedTracingGloballyDisabled: + if extended_tracing_globally_disabled: enable_extended_tracing = False if not enable_extended_tracing: diff --git a/tests/system/test_observability.py b/tests/system/test_observability.py new file mode 100644 index 0000000000..6c74c12a1a --- /dev/null +++ b/tests/system/test_observability.py @@ -0,0 +1,149 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import unittest +import mock + +from . import _helpers +from google.cloud.spanner_v1 import Client, DirectedReadOptions + +HAS_OTEL_INSTALLED = False + +try: + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import ALWAYS_ON + from opentelemetry import trace + + HAS_OTEL_INSTALLED = True +except ImportError: + pass + + +@pytest.mark.skipif(not HAS_OTEL_INSTALLED, reason="OpenTelemetry needed.") +@pytest.mark.skipif(not _helpers.USE_EMULATOR, reason="Emulator needed.") +class TestObservability(unittest.TestCase): + PROJECT = "PROJECT" + PATH = "projects/%s" % (PROJECT,) + CONFIGURATION_NAME = "config-name" + INSTANCE_ID = "instance-id" + INSTANCE_NAME = "%s/instances/%s" % (PATH, INSTANCE_ID) + DISPLAY_NAME = "display-name" + DATABASE_ID = "database" + NODE_COUNT = 5 + PROCESSING_UNITS = 5000 + LABELS = {"test": "true"} + TIMEOUT_SECONDS = 80 + LEADER_OPTIONS = ["leader1", "leader2"] + DIRECTED_READ_OPTIONS = { + "include_replicas": { + "replica_selections": [ + { + "location": "us-west1", + "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, + }, + ], + "auto_failover_disabled": True, + }, + } + + def test_observability_options_propagated_extended_tracing_off(self): + self.__test_observability_options(True) + + def test_observability_options_propagated(self): + self.__test_observability_options(False) + + def __test_observability_options(self, enable_extended_tracing): + global_tracer_provider = TracerProvider(sampler=ALWAYS_ON) + trace.set_tracer_provider(global_tracer_provider) + global_trace_exporter = InMemorySpanExporter() + global_tracer_provider.add_span_processor( + SimpleSpanProcessor(global_trace_exporter) + ) + + inject_tracer_provider = TracerProvider(sampler=ALWAYS_ON) + inject_trace_exporter = InMemorySpanExporter() + inject_tracer_provider.add_span_processor( + SimpleSpanProcessor(inject_trace_exporter) + ) + observability_options = dict( + tracer_provider=inject_tracer_provider, + enable_extended_tracing=enable_extended_tracing, + ) + client = Client( + project=self.PROJECT, + observability_options=observability_options, + credentials=_make_credentials(), + ) + + instance = client.instance( + self.INSTANCE_ID, + self.CONFIGURATION_NAME, + display_name=self.DISPLAY_NAME, + node_count=self.NODE_COUNT, + labels=self.LABELS, + ) + + db = instance.database(self.DATABASE_ID) + self.assertEqual(db.observability_options, observability_options) + with db.snapshot() as snapshot: + res = snapshot.execute_sql("SELECT 1") + for val in res: + _ = val + + from_global_spans = global_trace_exporter.get_finished_spans() + from_inject_spans = inject_trace_exporter.get_finished_spans() + self.assertEqual( + len(from_global_spans), + 0, + "Expecting no spans from the global trace exporter", + ) + self.assertEqual( + len(from_inject_spans) >= 2, + True, + "Expecting at least 2 spans from the injected trace exporter", + ) + gotNames = [span.name for span in from_inject_spans] + wantNames = ["CloudSpanner.CreateSession", "CloudSpanner.ReadWriteTransaction"] + self.assertEqual( + gotNames, + wantNames, + "Span names mismatch", + ) + + # Check for conformance of enable_extended_tracing + lastSpan = from_inject_spans[len(from_inject_spans) - 1] + wantAnnotatedSQL = "SELECT 1" + if not enable_extended_tracing: + wantAnnotatedSQL = None + self.assertEqual( + lastSpan.attributes.get("db.statement", None), + wantAnnotatedSQL, + "Mismatch in annotated sql", + ) + + +def _make_credentials(): + import google.auth.credentials + + class _CredentialsWithScopes( + google.auth.credentials.Credentials, google.auth.credentials.Scoped + ): + pass + + return mock.Mock(spec=_CredentialsWithScopes) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 5d816e3416..174e5116c2 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -12,27 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import unittest import mock from google.cloud.spanner_v1 import DirectedReadOptions -hasOtelInstalled = False - -try: - from opentelemetry.sdk.trace.export import SimpleSpanProcessor - from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, - ) - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.sampling import ALWAYS_ON - from opentelemetry import trace - - hasOtelInstalled = True -except ImportError: - pass - def _make_credentials(): import google.auth.credentials @@ -52,7 +36,6 @@ class TestClient(unittest.TestCase): INSTANCE_ID = "instance-id" INSTANCE_NAME = "%s/instances/%s" % (PATH, INSTANCE_ID) DISPLAY_NAME = "display-name" - DATABASE_ID = "database" NODE_COUNT = 5 PROCESSING_UNITS = 5000 LABELS = {"test": "true"} @@ -703,89 +686,3 @@ def test_list_instances_w_options(self): retry=mock.ANY, timeout=mock.ANY, ) - - def test_observability_options_propagated_extended_tracing_off(self): - self.__test_observability_options(True) - - def test_observability_options_propagated(self): - self.__test_observability_options(False) - - def __test_observability_options(self, enable_extended_tracing): - if not hasOtelInstalled: - return - - # This test needs the spanner emulator setup so as to run. - # TODO: This test should be fully enabled to not use the Spanner Emulator - # once Python mockSpanner is available as it tests end-to-end that - # observability_options were propagated. - if os.environ.get("SPANNER_EMULATOR_HOST", None) is None: - return - - global_tracer_provider = TracerProvider(sampler=ALWAYS_ON) - trace.set_tracer_provider(global_tracer_provider) - global_trace_exporter = InMemorySpanExporter() - global_tracer_provider.add_span_processor( - SimpleSpanProcessor(global_trace_exporter) - ) - - inject_tracer_provider = TracerProvider(sampler=ALWAYS_ON) - inject_trace_exporter = InMemorySpanExporter() - inject_tracer_provider.add_span_processor( - SimpleSpanProcessor(inject_trace_exporter) - ) - observability_options = dict( - tracer_provider=inject_tracer_provider, - enable_extended_tracing=enable_extended_tracing, - ) - credentials = _make_credentials() - client = self._make_one( - project=self.PROJECT, - credentials=credentials, - observability_options=observability_options, - ) - - instance = client.instance( - self.INSTANCE_ID, - self.CONFIGURATION_NAME, - display_name=self.DISPLAY_NAME, - node_count=self.NODE_COUNT, - labels=self.LABELS, - ) - - db = instance.database(self.DATABASE_ID) - self.assertEqual(db.observability_options, observability_options) - with db.snapshot() as snapshot: - res = snapshot.execute_sql("SELECT 1") - for val in res: - _ = val - - from_global_spans = global_trace_exporter.get_finished_spans() - from_inject_spans = inject_trace_exporter.get_finished_spans() - self.assertEqual( - len(from_global_spans), - 0, - "Expecting no spans from the global trace exporter", - ) - self.assertEqual( - len(from_inject_spans) >= 2, - True, - "Expecting at least 2 spans from the injected trace exporter", - ) - gotNames = [span.name for span in from_inject_spans] - wantNames = ["CloudSpanner.CreateSession", "CloudSpanner.ReadWriteTransaction"] - self.assertEqual( - gotNames, - wantNames, - "Span names mismatch", - ) - - # Check for conformance of enable_extended_tracing - lastSpan = from_inject_spans[len(from_inject_spans) - 1] - wantAnnotatedSQL = "SELECT 1" - if not enable_extended_tracing: - wantAnnotatedSQL = None - self.assertEqual( - lastSpan.attributes.get("db.statement", None), - wantAnnotatedSQL, - "Mismatch in annotated sql", - ) From 21bfbcd91cfaeb9e705a5a457a53ba64c38de655 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 12:10:49 -0800 Subject: [PATCH 14/19] Ensure that observability_options test sets up and deletes database --- google/cloud/spanner_v1/snapshot.py | 14 +++++++++----- google/cloud/spanner_v1/transaction.py | 4 ++-- tests/system/test_observability.py | 16 +++++++++++++--- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 0d7984e54e..a02776b27c 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -313,6 +313,7 @@ def read( ) trace_attributes = {"table_id": table, "columns": columns} + observability_options = getattr(database, "observability_options", None) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc @@ -324,9 +325,7 @@ def read( self._session, trace_attributes, transaction=self, - observability_options=getattr( - database, "observability_options", None - ), + observability_options=observability_options, ) self._read_request_count += 1 if self._multi_use: @@ -343,7 +342,7 @@ def read( self._session, trace_attributes, transaction=self, - observability_options=getattr(database, "observability_options", None), + observability_options=observability_options, ) self._read_request_count += 1 @@ -529,7 +528,12 @@ def execute_sql( ) def _get_streamed_result_set( - self, restart, request, trace_attributes, column_info, observability_options + self, + restart, + request, + trace_attributes, + column_info, + observability_options=None, ): iterator = _restart_on_unavailable( restart, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 4ff824f192..beb3e46edb 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -426,7 +426,7 @@ def execute_update( "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes, - observability_options, + observability_options=observability_options, ) # Setting the transaction id because the transaction begin was inlined for first rpc. if ( @@ -443,7 +443,7 @@ def execute_update( "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes, - observability_options, + observability_options=observability_options, ) return response.stats.row_count_exact diff --git a/tests/system/test_observability.py b/tests/system/test_observability.py index 6c74c12a1a..1c73302f9e 100644 --- a/tests/system/test_observability.py +++ b/tests/system/test_observability.py @@ -38,13 +38,13 @@ @pytest.mark.skipif(not HAS_OTEL_INSTALLED, reason="OpenTelemetry needed.") @pytest.mark.skipif(not _helpers.USE_EMULATOR, reason="Emulator needed.") class TestObservability(unittest.TestCase): - PROJECT = "PROJECT" + PROJECT = _helpers.EMULATOR_PROJECT PATH = "projects/%s" % (PROJECT,) CONFIGURATION_NAME = "config-name" - INSTANCE_ID = "instance-id" + INSTANCE_ID = _helpers.INSTANCE_ID INSTANCE_NAME = "%s/instances/%s" % (PATH, INSTANCE_ID) DISPLAY_NAME = "display-name" - DATABASE_ID = "database" + DATABASE_ID = _helpers.unique_id("temp_db") NODE_COUNT = 5 PROCESSING_UNITS = 5000 LABELS = {"test": "true"} @@ -100,6 +100,11 @@ def __test_observability_options(self, enable_extended_tracing): ) db = instance.database(self.DATABASE_ID) + try: + db.create() + except: + pass + self.assertEqual(db.observability_options, observability_options) with db.snapshot() as snapshot: res = snapshot.execute_sql("SELECT 1") @@ -137,6 +142,11 @@ def __test_observability_options(self, enable_extended_tracing): "Mismatch in annotated sql", ) + try: + db.delete() + except: + pass + def _make_credentials(): import google.auth.credentials From 797241fa53d3535972240f8fe12de556d2733294 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 19:29:01 -0800 Subject: [PATCH 15/19] Ensure instance.create() is invoked in system tests --- tests/system/test_observability.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/system/test_observability.py b/tests/system/test_observability.py index 1c73302f9e..0ee5b413ab 100644 --- a/tests/system/test_observability.py +++ b/tests/system/test_observability.py @@ -99,6 +99,11 @@ def __test_observability_options(self, enable_extended_tracing): labels=self.LABELS, ) + try: + instance.create() + except: + pass + db = instance.database(self.DATABASE_ID) try: db.create() @@ -144,6 +149,7 @@ def __test_observability_options(self, enable_extended_tracing): try: db.delete() + instance.delete() except: pass From 25edeaae84b7aa0fdf7fcedf88733ba3a7e6ec77 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 21:02:20 -0800 Subject: [PATCH 16/19] Use getattr for mock _Client --- google/cloud/spanner_v1/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 3329bd6e69..4b6ff4bb4c 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -1113,10 +1113,10 @@ def observability_options(self): Returns the observability options that you set when creating the SpannerClient. """ - if not self._instance: + if not (self._instance and self._instance._client): return None - return self._instance._client.observability_options + return getattr(self._instance._client, 'observability_options', None) class BatchCheckout(object): From 42b9a4c2fd2861e8def2de93649c6b01fd5eaf95 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 22:30:27 -0800 Subject: [PATCH 17/19] Update with code review suggestions --- google/cloud/spanner_v1/database.py | 2 +- ...ility.py => test_observability_options.py} | 62 ++++++++----------- 2 files changed, 26 insertions(+), 38 deletions(-) rename tests/system/{test_observability.py => test_observability_options.py} (76%) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 4b6ff4bb4c..abddd5d97d 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -1116,7 +1116,7 @@ def observability_options(self): if not (self._instance and self._instance._client): return None - return getattr(self._instance._client, 'observability_options', None) + return getattr(self._instance._client, "observability_options", None) class BatchCheckout(object): diff --git a/tests/system/test_observability.py b/tests/system/test_observability_options.py similarity index 76% rename from tests/system/test_observability.py rename to tests/system/test_observability_options.py index 0ee5b413ab..1f9058b2d5 100644 --- a/tests/system/test_observability.py +++ b/tests/system/test_observability_options.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock import pytest import unittest -import mock from . import _helpers from google.cloud.spanner_v1 import Client, DirectedReadOptions @@ -37,7 +37,7 @@ @pytest.mark.skipif(not HAS_OTEL_INSTALLED, reason="OpenTelemetry needed.") @pytest.mark.skipif(not _helpers.USE_EMULATOR, reason="Emulator needed.") -class TestObservability(unittest.TestCase): +def test_observability_options_propagation(): PROJECT = _helpers.EMULATOR_PROJECT PATH = "projects/%s" % (PROJECT,) CONFIGURATION_NAME = "config-name" @@ -62,13 +62,7 @@ class TestObservability(unittest.TestCase): }, } - def test_observability_options_propagated_extended_tracing_off(self): - self.__test_observability_options(True) - - def test_observability_options_propagated(self): - self.__test_observability_options(False) - - def __test_observability_options(self, enable_extended_tracing): + def test_propagation(enable_extended_tracing): global_tracer_provider = TracerProvider(sampler=ALWAYS_ON) trace.set_tracer_provider(global_tracer_provider) global_trace_exporter = InMemorySpanExporter() @@ -86,17 +80,17 @@ def __test_observability_options(self, enable_extended_tracing): enable_extended_tracing=enable_extended_tracing, ) client = Client( - project=self.PROJECT, + project=PROJECT, observability_options=observability_options, credentials=_make_credentials(), ) instance = client.instance( - self.INSTANCE_ID, - self.CONFIGURATION_NAME, - display_name=self.DISPLAY_NAME, - node_count=self.NODE_COUNT, - labels=self.LABELS, + INSTANCE_ID, + CONFIGURATION_NAME, + display_name=DISPLAY_NAME, + node_count=NODE_COUNT, + labels=LABELS, ) try: @@ -104,13 +98,13 @@ def __test_observability_options(self, enable_extended_tracing): except: pass - db = instance.database(self.DATABASE_ID) + db = instance.database(DATABASE_ID) try: db.create() except: pass - self.assertEqual(db.observability_options, observability_options) + assert db.observability_options == observability_options with db.snapshot() as snapshot: res = snapshot.execute_sql("SELECT 1") for val in res: @@ -118,34 +112,24 @@ def __test_observability_options(self, enable_extended_tracing): from_global_spans = global_trace_exporter.get_finished_spans() from_inject_spans = inject_trace_exporter.get_finished_spans() - self.assertEqual( - len(from_global_spans), - 0, - "Expecting no spans from the global trace exporter", - ) - self.assertEqual( - len(from_inject_spans) >= 2, - True, - "Expecting at least 2 spans from the injected trace exporter", - ) + assert ( + len(from_global_spans) == 0 + ) # "Expecting no spans from the global trace exporter" + assert ( + len(from_inject_spans) >= 2 + ) # "Expecting at least 2 spans from the injected trace exporter" gotNames = [span.name for span in from_inject_spans] wantNames = ["CloudSpanner.CreateSession", "CloudSpanner.ReadWriteTransaction"] - self.assertEqual( - gotNames, - wantNames, - "Span names mismatch", - ) + assert gotNames == wantNames # Check for conformance of enable_extended_tracing lastSpan = from_inject_spans[len(from_inject_spans) - 1] wantAnnotatedSQL = "SELECT 1" if not enable_extended_tracing: wantAnnotatedSQL = None - self.assertEqual( - lastSpan.attributes.get("db.statement", None), - wantAnnotatedSQL, - "Mismatch in annotated sql", - ) + assert ( + lastSpan.attributes.get("db.statement", None) == wantAnnotatedSQL + ) # "Mismatch in annotated sql" try: db.delete() @@ -153,6 +137,10 @@ def __test_observability_options(self, enable_extended_tracing): except: pass + # Test the respective options for enable_extended_tracing + test_propagation(True) + test_propagation(False) + def _make_credentials(): import google.auth.credentials From 6c8bb88b5b06950e436691964b32063c3a1c574d Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 13 Nov 2024 22:53:01 -0800 Subject: [PATCH 18/19] Deal with mock.Mock false positives failing tests --- google/cloud/spanner_v1/_opentelemetry_tracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 4adef9d289..8e2e6b7de7 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -68,7 +68,7 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) # on by default. enable_extended_tracing = True - if observability_options: + if type(observability_options) == dict: # Avoid false positives with mock.Mock tracer_provider = observability_options.get("tracer_provider", None) enable_extended_tracing = observability_options.get( "enable_extended_tracing", enable_extended_tracing From 196e9a0fe1e711c33a3fb4ae73819a73cf1c9181 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 14 Nov 2024 06:20:37 -0800 Subject: [PATCH 19/19] Address review feedback --- .../spanner_v1/_opentelemetry_tracing.py | 4 +- google/cloud/spanner_v1/client.py | 2 +- tests/system/test_observability_options.py | 43 ++++++------------- 3 files changed, 15 insertions(+), 34 deletions(-) diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 8e2e6b7de7..feb3b92756 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -35,7 +35,7 @@ TRACER_NAME = "cloud.google.com/python/spanner" TRACER_VERSION = gapic_version.__version__ extended_tracing_globally_disabled = ( - os.environ.get("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false" + os.getenv("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false" ) @@ -68,7 +68,7 @@ def trace_call(name, session, extra_attributes=None, observability_options=None) # on by default. enable_extended_tracing = True - if type(observability_options) == dict: # Avoid false positives with mock.Mock + if isinstance(observability_options, dict): # Avoid false positives with mock.Mock tracer_provider = observability_options.get("tracer_provider", None) enable_extended_tracing = observability_options.get( "enable_extended_tracing", enable_extended_tracing diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index bc6cbb076c..afe6264717 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -126,7 +126,7 @@ class Client(ClientWithProject): for all ReadRequests and ExecuteSqlRequests that indicates which replicas or regions should be used for non-transactional reads or queries. - :type labels: dict (str -> any) or None + :type observability_options: dict (str -> any) or None :param observability_options: (Optional) the configuration to control the tracer's behavior. tracer_provider is the injected tracer provider diff --git a/tests/system/test_observability_options.py b/tests/system/test_observability_options.py index 1f9058b2d5..8382255c15 100644 --- a/tests/system/test_observability_options.py +++ b/tests/system/test_observability_options.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import mock import pytest -import unittest from . import _helpers -from google.cloud.spanner_v1 import Client, DirectedReadOptions +from google.cloud.spanner_v1 import Client HAS_OTEL_INSTALLED = False @@ -35,32 +33,20 @@ pass -@pytest.mark.skipif(not HAS_OTEL_INSTALLED, reason="OpenTelemetry needed.") -@pytest.mark.skipif(not _helpers.USE_EMULATOR, reason="Emulator needed.") +@pytest.mark.skipif( + not HAS_OTEL_INSTALLED, reason="OpenTelemetry is necessary to test traces." +) +@pytest.mark.skipif( + not _helpers.USE_EMULATOR, reason="mulator is necessary to test traces." +) def test_observability_options_propagation(): PROJECT = _helpers.EMULATOR_PROJECT - PATH = "projects/%s" % (PROJECT,) CONFIGURATION_NAME = "config-name" INSTANCE_ID = _helpers.INSTANCE_ID - INSTANCE_NAME = "%s/instances/%s" % (PATH, INSTANCE_ID) DISPLAY_NAME = "display-name" DATABASE_ID = _helpers.unique_id("temp_db") NODE_COUNT = 5 - PROCESSING_UNITS = 5000 LABELS = {"test": "true"} - TIMEOUT_SECONDS = 80 - LEADER_OPTIONS = ["leader1", "leader2"] - DIRECTED_READ_OPTIONS = { - "include_replicas": { - "replica_selections": [ - { - "location": "us-west1", - "type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY, - }, - ], - "auto_failover_disabled": True, - }, - } def test_propagation(enable_extended_tracing): global_tracer_provider = TracerProvider(sampler=ALWAYS_ON) @@ -95,13 +81,13 @@ def test_propagation(enable_extended_tracing): try: instance.create() - except: + except Exception: pass db = instance.database(DATABASE_ID) try: db.create() - except: + except Exception: pass assert db.observability_options == observability_options @@ -134,7 +120,7 @@ def test_propagation(enable_extended_tracing): try: db.delete() instance.delete() - except: + except Exception: pass # Test the respective options for enable_extended_tracing @@ -143,11 +129,6 @@ def test_propagation(enable_extended_tracing): def _make_credentials(): - import google.auth.credentials + from google.auth.credentials import AnonymousCredentials - class _CredentialsWithScopes( - google.auth.credentials.Credentials, google.auth.credentials.Scoped - ): - pass - - return mock.Mock(spec=_CredentialsWithScopes) + return AnonymousCredentials()