diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 7c20e7b..0e70202 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -220,8 +220,8 @@ async def perform_request( # type: ignore[override] method, endpoint_id=resolve_default(endpoint_id, None), path_parts=resolve_default(path_parts, {}), - ): - return await self._perform_request( + ) as span: + response = await self._perform_request( method, target, body=body, @@ -232,6 +232,8 @@ async def perform_request( # type: ignore[override] request_timeout=request_timeout, client_meta=client_meta, ) + span.set_elastic_cloud_metadata(response.meta.headers) + return response async def _perform_request( # type: ignore[override,return] self, diff --git a/elastic_transport/_otel.py b/elastic_transport/_otel.py index a413252..7e3e98b 100644 --- a/elastic_transport/_otel.py +++ b/elastic_transport/_otel.py @@ -23,6 +23,7 @@ try: from opentelemetry import trace + from opentelemetry.trace import Span _tracer: trace.Tracer | None = trace.get_tracer("elastic-transport") except ModuleNotFoundError: @@ -32,6 +33,23 @@ ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED" +class OpenTelemetrySpan: + def __init__(self, otel_span: Optional[Span]): + self.otel_span = otel_span + + def set_attribute(self, key: str, value: str) -> None: + if self.otel_span is not None: + self.otel_span.set_attribute(key, value) + + def set_elastic_cloud_metadata(self, headers: Mapping[str, str]) -> None: + cluster_name = headers.get("X-Found-Handling-Cluster") + if cluster_name is not None: + self.set_attribute("db.elasticsearch.cluster.name", cluster_name) + node_name = headers.get("X-Found-Handling-Instance") + if node_name is not None: + self.set_attribute("db.elasticsearch.node.name", node_name) + + class OpenTelemetry: def __init__(self, enabled: bool | None = None, tracer: trace.Tracer | None = None): if enabled is None: @@ -46,17 +64,17 @@ def span( *, endpoint_id: Optional[str], path_parts: Mapping[str, str], - ) -> Generator[None, None, None]: + ) -> Generator[OpenTelemetrySpan, None, None]: if not self.enabled or self.tracer is None: - yield + yield OpenTelemetrySpan(None) return span_name = endpoint_id or method - with self.tracer.start_as_current_span(span_name) as span: - span.set_attribute("http.request.method", method) - span.set_attribute("db.system", "elasticsearch") + with self.tracer.start_as_current_span(span_name) as otel_span: + otel_span.set_attribute("http.request.method", method) + otel_span.set_attribute("db.system", "elasticsearch") if endpoint_id is not None: - span.set_attribute("db.operation", endpoint_id) + otel_span.set_attribute("db.operation", endpoint_id) for key, value in path_parts.items(): - span.set_attribute(f"db.elasticsearch.path_parts.{key}", value) - yield + otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value) + yield OpenTelemetrySpan(otel_span) diff --git a/elastic_transport/_transport.py b/elastic_transport/_transport.py index 23afadc..ed013f2 100644 --- a/elastic_transport/_transport.py +++ b/elastic_transport/_transport.py @@ -303,8 +303,8 @@ def perform_request( method, endpoint_id=resolve_default(endpoint_id, None), path_parts=resolve_default(path_parts, {}), - ): - return self._perform_request( + ) as span: + api_response = self._perform_request( method, target, body=body, @@ -315,6 +315,8 @@ def perform_request( request_timeout=request_timeout, client_meta=client_meta, ) + span.set_elastic_cloud_metadata(api_response.meta.headers) + return api_response def _perform_request( # type: ignore[return] self, diff --git a/tests/test_otel.py b/tests/test_otel.py index f404779..e6cb36a 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -53,8 +53,13 @@ def test_detailed_span(): otel = OpenTelemetry(enabled=True, tracer=tracer) with otel.span( "GET", endpoint_id="ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"} - ): - pass + ) as span: + span.set_elastic_cloud_metadata( + { + "X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f", + "X-Found-Handling-Instance": "instance-0000000001", + } + ) spans = memory_exporter.get_finished_spans() assert len(spans) == 1 @@ -65,4 +70,6 @@ def test_detailed_span(): "db.operation": "ml.close_job", "db.elasticsearch.path_parts.job_id": "my-job", "db.elasticsearch.path_parts.foo": "bar", + "db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f", + "db.elasticsearch.node.name": "instance-0000000001", }