Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/opentelemetry-tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ We also need to tell OpenTelemetry which exporter to use. To export Spanner trac
# can modify it though using the environment variable
# SPANNER_ENABLE_EXTENDED_TRACING=false.
enable_extended_tracing=False,

# By default end to end tracing is set to False. Set to True
# for getting spans for Spanner server.
enable_end_to_end_tracing=True,
)
spanner = spanner.NewClient(project_id, observability_options=observability_options)

Expand Down Expand Up @@ -71,3 +75,22 @@ leak. Sadly due to legacy behavior, we cannot simply turn off this behavior by d
SPANNER_ENABLE_EXTENDED_TRACING=false

to turn it off globally or when creating each SpannerClient, please set `observability_options.enable_extended_tracing=false`

End to end tracing
~~~~~~~~~~~~~~~~~~~~~~~~~

In addition to client-side tracing, you can opt in for end-to-end tracing. End-to-end tracing helps you understand and debug latency issues that are specific to Spanner. Refer [here](https://cloud.google.com/spanner/docs/tracing-overview) for more information.

To configure end-to-end tracing.

1. Opt in for end-to-end tracing. You can opt-in by either:
* Setting the environment variable `SPANNER_ENABLE_END_TO_END_TRACING=true` before your application is started
* In code, by setting `observability_options.enable_end_to_end_tracing=true` when creating each SpannerClient.

2. Set the trace context propagation in OpenTelemetry.

.. code:: python

from opentelemetry.propagate import set_global_textmap
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
set_global_textmap(TraceContextTextMapPropagator())
7 changes: 6 additions & 1 deletion examples/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry import trace
from opentelemetry.propagate import set_global_textmap
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


def main():
Expand All @@ -36,10 +38,13 @@ def main():
# Setup the Cloud Spanner Client.
spanner_client = spanner.Client(
project_id,
observability_options=dict(tracer_provider=tracer_provider, enable_extended_tracing=True),
observability_options=dict(tracer_provider=tracer_provider, enable_extended_tracing=True, enable_end_to_end_tracing=True),
)
instance = spanner_client.instance('test-instance')
database = instance.database('test-db')

# Set W3C Trace Context as the global propagator for end to end tracing.
set_global_textmap(TraceContextTextMapPropagator())

# Retrieve a tracer from our custom tracer provider.
tracer = tracer_provider.get_tracer('MyApp')
Expand Down
46 changes: 46 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
from google.cloud.spanner_v1.request_id_header import with_request_id
from google.rpc.error_details_pb2 import RetryInfo

try:
from opentelemetry.propagate import inject
from opentelemetry.propagators.textmap import Setter

HAS_OPENTELEMETRY_INSTALLED = True
except ImportError:
HAS_OPENTELEMETRY_INSTALLED = False
from typing import List, Tuple
import random

# Validation error messages
Expand All @@ -47,6 +55,29 @@
)


if HAS_OPENTELEMETRY_INSTALLED:

class OpenTelemetryContextSetter(Setter):
"""
Used by Open Telemetry for context propagation.
"""

def set(self, carrier: List[Tuple[str, str]], key: str, value: str) -> None:
"""
Injects trace context into Spanner metadata

Args:
carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry
data.
key(str): The key for which the Open Telemetry context data needs to be set.
value(str): The Open Telemetry context value to be set.

Returns:
None
"""
carrier.append((key, value))


def _try_to_coerce_bytes(bytestring):
"""Try to coerce a byte string into the right thing based on Python
version and whether or not it is base64 encoded.
Expand Down Expand Up @@ -550,6 +581,21 @@ def _metadata_with_leader_aware_routing(value, **kw):
return ("x-goog-spanner-route-to-leader", str(value).lower())


def _metadata_with_span_context(metadata: List[Tuple[str, str]], **kw) -> None:
"""
Appends metadata with end to end tracing header and OpenTelemetry span context .

Args:
metadata (list[tuple[str, str]]): The metadata carrier where the OpenTelemetry context
should be injected.
Returns:
None
"""
if HAS_OPENTELEMETRY_INSTALLED:
metadata.append(("x-goog-spanner-end-to-end-tracing", "true"))
inject(setter=OpenTelemetryContextSetter(), carrier=metadata)


def _delay_until_retry(exc, deadline, attempts):
"""Helper for :meth:`Session.run_in_transaction`.

Expand Down
20 changes: 19 additions & 1 deletion google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

from google.cloud.spanner_v1 import SpannerClient
from google.cloud.spanner_v1 import gapic_version
from google.cloud.spanner_v1._helpers import (
_metadata_with_span_context,
)

try:
from opentelemetry import trace
Expand All @@ -40,6 +43,9 @@
extended_tracing_globally_disabled = (
os.getenv("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false"
)
end_to_end_tracing_globally_enabled = (
os.getenv("SPANNER_ENABLE_END_TO_END_TRACING", "").lower() == "true"
)


def get_tracer(tracer_provider=None):
Expand All @@ -58,7 +64,9 @@ def get_tracer(tracer_provider=None):


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
def trace_call(
name, session=None, extra_attributes=None, observability_options=None, metadata=None
):
if session:
session._last_use_time = datetime.now()

Expand All @@ -74,6 +82,8 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
# on by default.
enable_extended_tracing = True

enable_end_to_end_tracing = False

db_name = ""
if session and getattr(session, "_database", None):
db_name = session._database.name
Expand All @@ -83,6 +93,9 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
enable_extended_tracing = observability_options.get(
"enable_extended_tracing", enable_extended_tracing
)
enable_end_to_end_tracing = observability_options.get(
"enable_end_to_end_tracing", enable_end_to_end_tracing
)
db_name = observability_options.get("db_name", db_name)

tracer = get_tracer(tracer_provider)
Expand Down Expand Up @@ -110,11 +123,16 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
if not enable_extended_tracing:
attributes.pop("db.statement", False)

if end_to_end_tracing_globally_enabled:
enable_end_to_end_tracing = True

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
with MetricsCapture():
try:
if enable_end_to_end_tracing:
_metadata_with_span_context(metadata)
yield span
except Exception as error:
span.set_status(Status(StatusCode.ERROR, str(error)))
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def commit(
self._session,
trace_attributes,
observability_options=observability_options,
metadata=metadata,
), MetricsCapture():
method = functools.partial(
api.commit,
Expand Down Expand Up @@ -349,6 +350,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
self._session,
trace_attributes,
observability_options=observability_options,
metadata=metadata,
), MetricsCapture():
method = functools.partial(
api.batch_write,
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ class Client(ClientWithProject):
Default `True`, please set it to `False` to turn it off
or you can use the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=<boolean>`
to control it.
enable_end_to_end_tracing: :type:boolean when set to true will allow for spans from Spanner server side.
Default `False`, please set it to `True` to turn it on
or you can use the environment variable `SPANNER_ENABLE_END_TO_END_TRACING=<boolean>`
to control it.

:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ def execute_pdml():
method=method,
trace_name="CloudSpanner.ExecuteStreamingSql",
request=request,
metadata=metadata,
transaction_selector=txn_selector,
observability_options=self.observability_options,
)
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def bind(self, database):
with trace_call(
"CloudSpanner.FixedPool.BatchCreateSessions",
observability_options=observability_options,
metadata=metadata,
) as span, MetricsCapture():
returned_session_count = 0
while not self._sessions.full():
Expand Down Expand Up @@ -554,6 +555,7 @@ def bind(self, database):
with trace_call(
"CloudSpanner.PingingPool.BatchCreateSessions",
observability_options=observability_options,
metadata=metadata,
) as span, MetricsCapture():
returned_session_count = 0
while returned_session_count < self.size:
Expand Down
7 changes: 6 additions & 1 deletion google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def create(self):
self,
self._labels,
observability_options=observability_options,
metadata=metadata,
), MetricsCapture():
session_pb = api.create_session(
request=request,
Expand Down Expand Up @@ -206,7 +207,10 @@ def exists(self):

observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.GetSession", self, observability_options=observability_options
"CloudSpanner.GetSession",
self,
observability_options=observability_options,
metadata=metadata,
) as span, MetricsCapture():
try:
api.get_session(name=self.name, metadata=metadata)
Expand Down Expand Up @@ -250,6 +254,7 @@ def delete(self):
"session.name": self.name,
},
observability_options=observability_options,
metadata=metadata,
), MetricsCapture():
api.delete_session(name=self.name, metadata=metadata)

Expand Down
15 changes: 14 additions & 1 deletion google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
def _restart_on_unavailable(
method,
request,
metadata=None,
trace_name=None,
session=None,
attributes=None,
Expand Down Expand Up @@ -98,8 +99,9 @@ def _restart_on_unavailable(
session,
attributes,
observability_options=observability_options,
metadata=metadata,
), MetricsCapture():
iterator = method(request=request)
iterator = method(request=request, metadata=metadata)
for item in iterator:
item_buffer.append(item)
# Setting the transaction id because the transaction begin was inlined for first rpc.
Expand All @@ -121,6 +123,7 @@ def _restart_on_unavailable(
session,
attributes,
observability_options=observability_options,
metadata=metadata,
), MetricsCapture():
request.resume_token = resume_token
if transaction is not None:
Expand All @@ -141,6 +144,7 @@ def _restart_on_unavailable(
session,
attributes,
observability_options=observability_options,
metadata=metadata,
), MetricsCapture():
request.resume_token = resume_token
if transaction is not None:
Expand Down Expand Up @@ -342,6 +346,7 @@ def read(
iterator = _restart_on_unavailable(
restart,
request,
metadata,
f"CloudSpanner.{type(self).__name__}.read",
self._session,
trace_attributes,
Expand All @@ -364,6 +369,7 @@ def read(
iterator = _restart_on_unavailable(
restart,
request,
metadata,
f"CloudSpanner.{type(self).__name__}.read",
self._session,
trace_attributes,
Expand Down Expand Up @@ -573,6 +579,7 @@ def execute_sql(
return self._get_streamed_result_set(
restart,
request,
metadata,
trace_attributes,
column_info,
observability_options,
Expand All @@ -582,6 +589,7 @@ def execute_sql(
return self._get_streamed_result_set(
restart,
request,
metadata,
trace_attributes,
column_info,
observability_options,
Expand All @@ -592,6 +600,7 @@ def _get_streamed_result_set(
self,
restart,
request,
metadata,
trace_attributes,
column_info,
observability_options=None,
Expand All @@ -600,6 +609,7 @@ def _get_streamed_result_set(
iterator = _restart_on_unavailable(
restart,
request,
metadata,
f"CloudSpanner.{type(self).__name__}.execute_sql",
self._session,
trace_attributes,
Expand Down Expand Up @@ -706,6 +716,7 @@ def partition_read(
self._session,
extra_attributes=trace_attributes,
observability_options=getattr(database, "observability_options", None),
metadata=metadata,
), MetricsCapture():
method = functools.partial(
api.partition_read,
Expand Down Expand Up @@ -809,6 +820,7 @@ def partition_query(
self._session,
trace_attributes,
observability_options=getattr(database, "observability_options", None),
metadata=metadata,
), MetricsCapture():
method = functools.partial(
api.partition_query,
Expand Down Expand Up @@ -955,6 +967,7 @@ def begin(self):
f"CloudSpanner.{type(self).__name__}.begin",
self._session,
observability_options=getattr(database, "observability_options", None),
metadata=metadata,
), MetricsCapture():
method = functools.partial(
api.begin_transaction,
Expand Down
Loading
Loading