diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py new file mode 100644 index 0000000000..d74f5efa73 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/main.py @@ -0,0 +1,115 @@ +""" +LangGraph StateGraph example with an LLM node. + +Similar to the manual example (../manual/main.py) but uses LangGraph's StateGraph +with a node that calls ChatOpenAI. OpenTelemetry LangChain instrumentation traces +the LLM calls made from within the graph node. +""" + +from typing import Annotated + + +from langchain_core.messages import HumanMessage, SystemMessage +from langchain_openai import ChatOpenAI +from langgraph.graph import END, START, StateGraph +from langgraph.graph.message import add_messages +from typing_extensions import TypedDict + +from opentelemetry import _logs, metrics, trace +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, +) +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.langchain import LangChainInstrumentor +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + + +# Configure tracing +trace.set_tracer_provider(TracerProvider()) +span_processor = BatchSpanProcessor(OTLPSpanExporter()) +trace.get_tracer_provider().add_span_processor(span_processor) + +# Configure logging +_logs.set_logger_provider(LoggerProvider()) +_logs.get_logger_provider().add_log_record_processor( + BatchLogRecordProcessor(OTLPLogExporter()) +) + +# Configure metrics +metrics.set_meter_provider( + MeterProvider( + metric_readers=[ + PeriodicExportingMetricReader( + OTLPMetricExporter(), + ), + ] + ) +) + + +class GraphState(TypedDict): + """State for the graph; messages are accumulated with add_messages.""" + + messages: Annotated[list, add_messages] + + +def build_graph(llm: ChatOpenAI): + """Build a StateGraph with a single LLM node.""" + + def llm_node(state: GraphState) -> dict: + """Node that invokes the LLM with the current messages.""" + response = llm.invoke(state["messages"]) + return {"messages": [response]} + + builder = StateGraph(GraphState) + builder.add_node("llm", llm_node) + builder.add_edge(START, "llm") + builder.add_edge("llm", END) + return builder.compile() + + +def main(): + # Set up instrumentation (traces LLM calls from within graph nodes) + LangChainInstrumentor().instrument() + + # ChatOpenAI setup + llm = ChatOpenAI( + model="gpt-3.5-turbo", + temperature=0.1, + max_tokens=100, + top_p=0.9, + frequency_penalty=0.5, + presence_penalty=0.5, + stop_sequences=["\n", "Human:", "AI:"], + seed=100, + ) + + graph = build_graph(llm) + + initial_messages = [ + SystemMessage(content="You are a helpful assistant!"), + HumanMessage(content="What is the capital of France?"), + ] + + result = graph.invoke({"messages": initial_messages}) + + print("LangGraph output (messages):") + for msg in result.get("messages", []): + print(f" {type(msg).__name__}: {msg.content}") + + # Un-instrument after use + LangChainInstrumentor().uninstrument() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt new file mode 100644 index 0000000000..f27cb4a3c1 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/workflow/requirements.txt @@ -0,0 +1,8 @@ +langchain==0.3.21 +langchain_openai +langgraph +opentelemetry-sdk>=1.39.0 +opentelemetry-exporter-otlp-proto-grpc>=1.39.0 + +# Uncomment after langchain instrumentation is released +# opentelemetry-instrumentation-langchain~=2.0b0.dev \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py index acb9a9bf7d..d134c1a635 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/__init__.py @@ -83,7 +83,7 @@ def _instrument(self, **kwargs: Any): ) wrap_function_wrapper( - module="langchain_core.callbacks", + target="langchain_core.callbacks", name="BaseCallbackManager.__init__", wrapper=_BaseCallbackManagerInitWrapper(otel_callback_handler), ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index d694857da4..b20db49017 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -32,6 +32,7 @@ MessagePart, OutputMessage, Text, + WorkflowInvocation, ) @@ -45,6 +46,79 @@ def __init__(self, telemetry_handler: TelemetryHandler) -> None: self._telemetry_handler = telemetry_handler self._invocation_manager = _InvocationManager() + def on_chain_start( + self, + serialized: dict[str, Any], + inputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + tags: Optional[list[str]] = None, + metadata: Optional[dict[str, Any]] = None, + **kwargs: Any, + ) -> Any: + payload = serialized or {} + name_source = ( + payload.get("name") + or payload.get("id") + or kwargs.get("name") + or (metadata.get("langgraph_node") if metadata else None) + ) + name = _safe_str(name_source or "chain") + + if parent_run_id is None: + workflow_name_override = metadata.get("workflow_name") if metadata else None + wf = WorkflowInvocation(name=workflow_name_override or name) + self._telemetry_handler.start(wf) + self._invocation_manager.add_invocation_state(run_id, None, wf) + return + else: + self._invocation_manager.add_invocation_state(run_id, parent_run_id) + + + def on_chain_end( + self, + outputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + invocation = self._invocation_manager.get_invocation(run_id=run_id) + if invocation is None or not isinstance( + invocation, WorkflowInvocation + ): + # If the invocation does not exist, we cannot set attributes or end it + return + + self._telemetry_handler.stop(invocation) + + if invocation.span and not invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id) + + + def on_chain_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: Optional[UUID] = None, + **kwargs: Any, + ) -> Any: + invocation = self._invocation_manager.get_invocation(run_id=run_id) + if invocation is None or not isinstance( + invocation, WorkflowInvocation + ): + # If the invocation does not exist, we cannot set attributes or end it + return + + error_otel = Error(message=str(error), type=type(error)) + invocation = self._telemetry_handler.fail( + invocation=invocation, error=error_otel + ) + if invocation.span and not invocation.span.is_recording(): + self._invocation_manager.delete_invocation_state(run_id=run_id) + def on_chat_model_start( self, serialized: dict[str, Any], @@ -152,7 +226,7 @@ def on_chat_model_start( temperature=temperature, max_tokens=max_tokens, ) - llm_invocation = self._telemetry_handler.start_llm( + llm_invocation = self._telemetry_handler.start( invocation=llm_invocation ) self._invocation_manager.add_invocation_state( @@ -246,7 +320,7 @@ def on_llm_end( if response_id is not None: llm_invocation.response_id = str(response_id) - llm_invocation = self._telemetry_handler.stop_llm( + llm_invocation = self._telemetry_handler.stop( invocation=llm_invocation ) if llm_invocation.span and not llm_invocation.span.is_recording(): @@ -268,8 +342,14 @@ def on_llm_error( return error_otel = Error(message=str(error), type=type(error)) - llm_invocation = self._telemetry_handler.fail_llm( + llm_invocation = self._telemetry_handler.fail( invocation=llm_invocation, error=error_otel ) if llm_invocation.span and not llm_invocation.span.is_recording(): self._invocation_manager.delete_invocation_state(run_id=run_id) + +def _safe_str(value: Any) -> str: + try: + return str(value) + except (TypeError, ValueError): + return "" diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py index e8d2293bae..0431934f1e 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/invocation_manager.py @@ -39,7 +39,7 @@ def add_invocation_state( self, run_id: UUID, parent_run_id: Optional[UUID], - invocation: GenAIInvocation, + invocation: GenAIInvocation = None, ): invocation_state = _InvocationState(invocation=invocation) self._invocations[run_id] = invocation_state diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py index b0a6f42841..b5a3432f2a 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/__init__.py @@ -11,3 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from opentelemetry.util.genai.context_attributes import ( # noqa: F401 + get_context_scoped_attributes, + set_context_scoped_attributes, +) + +__all__ = [ + "get_context_scoped_attributes", + "set_context_scoped_attributes", +] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/context_attributes.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/context_attributes.py new file mode 100644 index 0000000000..daa2f63a7e --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/context_attributes.py @@ -0,0 +1,80 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Local Context-Scoped Attributes (CSA) layer for GenAI instrumentation. + +This module provides a process-local alternative to W3C Baggage for propagating +instrumentation attributes (e.g. ``gen_ai.workflow.name``) to child spans within +the same process. It follows the API shape proposed by OTel spec PR #4931 so +migration to the upstream CSA implementation will be straightforward. + +Key properties: +- Attributes are stored under a private OTel context key — they are **never** + serialised into W3C Baggage headers or any outbound propagation format. +- Lower-priority semantics: keys already present in the context are **not** + overwritten when new attributes are merged in. +""" + +from __future__ import annotations + +from typing import Any + +from opentelemetry import context as otel_context +from opentelemetry.context import Context + +_GENAI_CONTEXT_ATTRS_KEY = otel_context.create_key( + "opentelemetry.util.genai.context_scoped_attrs" +) + + +def set_context_scoped_attributes( + attrs: dict[str, Any], + context: Context | None = None, +) -> Context: + """Return a new Context with *attrs* merged in (existing keys win). + + Args: + attrs: Attributes to add to the context. Keys that are already + present in the context are **not** overwritten (lower-priority + semantics matching the CSA spec). + context: Base context to merge into. Defaults to the current context. + + Returns: + A new :class:`~opentelemetry.context.Context` containing the merged + attributes. The caller is responsible for attaching it if needed. + """ + ctx = context if context is not None else otel_context.get_current() + existing: dict[str, Any] = ( + otel_context.get_value(_GENAI_CONTEXT_ATTRS_KEY, context=ctx) or {} + ) + # existing keys win — new attrs fill in gaps only + merged = {**attrs, **existing} + return otel_context.set_value(_GENAI_CONTEXT_ATTRS_KEY, merged, ctx) + + +def get_context_scoped_attributes( + context: Context | None = None, +) -> dict[str, Any]: + """Return context-scoped attributes from *context*, or an empty dict. + + Args: + context: Context to read from. Defaults to the current context. + + Returns: + A dict of attributes previously set via + :func:`set_context_scoped_attributes`, or ``{}`` if none are present. + """ + ctx = context if context is not None else otel_context.get_current() + return otel_context.get_value(_GENAI_CONTEXT_ATTRS_KEY, context=ctx) or {} diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py index a1f5848372..7aa2a33372 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py @@ -69,3 +69,13 @@ The maximum number of concurrent uploads to queue. New uploads will be dropped if the queue is full. Defaults to 20. """ + +OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE = "OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE" +""" +.. envvar:: OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE + +Opt-in flag that enables writing ``gen_ai.workflow.name`` to W3C Baggage in +addition to the process-local context-scoped attributes. Set to ``true`` or +``1`` to enable cross-process propagation of the workflow name via Baggage +headers. Defaults to ``false``. +""" diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 858e8a8237..2880c3dcfc 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -65,7 +65,12 @@ from contextlib import contextmanager from typing import Iterator, TypeVar +from opentelemetry import baggage from opentelemetry import context as otel_context +from opentelemetry.util.genai.context_attributes import ( + get_context_scoped_attributes, + set_context_scoped_attributes, +) from opentelemetry._logs import ( LoggerProvider, get_logger, @@ -80,6 +85,7 @@ set_span_in_context, ) from opentelemetry.util.genai.metrics import InvocationMetricsRecorder +from opentelemetry.util.genai.utils import is_baggage_propagation_enabled from opentelemetry.util.genai.span_utils import ( _apply_embedding_finish_attributes, _apply_error_attributes, @@ -179,13 +185,23 @@ def _record_embedding_metrics( def _start(self, invocation: _T) -> _T: """Start a GenAI invocation and create a pending span entry.""" + ctx = otel_context.get_current() + workflow_name = "" if isinstance(invocation, LLMInvocation): span_name = _get_llm_span_name(invocation) kind = SpanKind.CLIENT + # Read workflow name from context-scoped attributes (primary mechanism). + # Fall back to baggage for backwards compatibility with older code that + # only wrote the workflow name to baggage. + csa = get_context_scoped_attributes(ctx) + invocation.workflow_name = csa.get( + "gen_ai.workflow.name" + ) or baggage.get_baggage("gen_ai.workflow.name", ctx) elif isinstance(invocation, EmbeddingInvocation): span_name = _get_embedding_span_name(invocation) kind = SpanKind.CLIENT elif isinstance(invocation, WorkflowInvocation): + workflow_name = invocation.name span_name = _get_workflow_span_name(invocation) kind = SpanKind.INTERNAL else: @@ -199,9 +215,20 @@ def _start(self, invocation: _T) -> _T: # calculation using timeit.default_timer. invocation.monotonic_start_s = timeit.default_timer() invocation.span = span - invocation.context_token = otel_context.attach( - set_span_in_context(span) - ) + ctx = set_span_in_context(span) + if isinstance(invocation, WorkflowInvocation): + # Primary mechanism: store workflow name as a process-local + # context-scoped attribute (never leaked to W3C Baggage headers). + ctx = set_context_scoped_attributes( + {"gen_ai.workflow.name": workflow_name}, ctx + ) + # Opt-in: also write to baggage for cross-process propagation. + if is_baggage_propagation_enabled(): + ctx = baggage.set_baggage( + "gen_ai.workflow.name", workflow_name, ctx + ) + invocation.context_token = otel_context.attach(ctx) + return invocation def _stop(self, invocation: _T) -> _T: diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py index 0a82462c1b..dae52d99c1 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/span_utils.py @@ -63,6 +63,7 @@ def _get_llm_common_attributes( (GenAI.GEN_AI_PROVIDER_NAME, invocation.provider), (server_attributes.SERVER_ADDRESS, invocation.server_address), (server_attributes.SERVER_PORT, invocation.server_port), + ("gen_ai.workflow.name", invocation.workflow_name), ) return { diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py index 6d59f03bf5..222aa8d4fd 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/types.py @@ -268,6 +268,9 @@ class GenAIInvocation: by the TelemetryHandler when starting an invocation. """ + workflow_name: str | None = None + agent_name: str | None = None + @dataclass class WorkflowInvocation(GenAIInvocation): diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py index 8b6c5e8a2a..73b7b7e921 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/utils.py @@ -27,12 +27,27 @@ from opentelemetry.util.genai.environment_variables import ( OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT, + OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE, ) from opentelemetry.util.genai.types import ContentCapturingMode logger = logging.getLogger(__name__) +def is_baggage_propagation_enabled() -> bool: + """Return True if baggage writing for gen_ai context is opted in. + + Controlled by the ``OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE`` environment + variable. Set to ``true`` or ``1`` to enable writing + ``gen_ai.workflow.name`` to W3C Baggage (cross-process propagation). + Defaults to ``False``. + """ + return os.environ.get(OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE, "").lower() in ( + "true", + "1", + ) + + def is_experimental_mode() -> bool: return ( _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( diff --git a/util/opentelemetry-util-genai/tests/test_context_attributes.py b/util/opentelemetry-util-genai/tests/test_context_attributes.py new file mode 100644 index 0000000000..71d6a73eb5 --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_context_attributes.py @@ -0,0 +1,91 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from unittest import TestCase + +from opentelemetry import context as otel_context +from opentelemetry.util.genai.context_attributes import ( + get_context_scoped_attributes, + set_context_scoped_attributes, +) + + +class TestSetContextScopedAttributes(TestCase): + def test_returns_new_context(self) -> None: + original = otel_context.get_current() + new_ctx = set_context_scoped_attributes({"key": "value"}, original) + self.assertIsNot(new_ctx, original) + + def test_values_readable_from_new_context(self) -> None: + ctx = set_context_scoped_attributes({"gen_ai.workflow.name": "wf1"}) + attrs = get_context_scoped_attributes(ctx) + self.assertEqual(attrs["gen_ai.workflow.name"], "wf1") + + def test_multiple_attributes_stored(self) -> None: + ctx = set_context_scoped_attributes({"a": "1", "b": "2"}) + attrs = get_context_scoped_attributes(ctx) + self.assertEqual(attrs["a"], "1") + self.assertEqual(attrs["b"], "2") + + def test_existing_key_not_overwritten(self) -> None: + """Lower-priority semantics: a key already in context is not replaced.""" + ctx = set_context_scoped_attributes({"gen_ai.workflow.name": "original"}) + ctx2 = set_context_scoped_attributes( + {"gen_ai.workflow.name": "new_value"}, ctx + ) + attrs = get_context_scoped_attributes(ctx2) + self.assertEqual(attrs["gen_ai.workflow.name"], "original") + + def test_new_key_added_alongside_existing(self) -> None: + ctx = set_context_scoped_attributes({"first": "a"}) + ctx2 = set_context_scoped_attributes({"second": "b"}, ctx) + attrs = get_context_scoped_attributes(ctx2) + self.assertEqual(attrs["first"], "a") + self.assertEqual(attrs["second"], "b") + + def test_defaults_to_current_context(self) -> None: + """set_context_scoped_attributes without explicit context uses current.""" + ctx = set_context_scoped_attributes({"implicit": "yes"}) + token = otel_context.attach(ctx) + try: + attrs = get_context_scoped_attributes() + self.assertEqual(attrs["implicit"], "yes") + finally: + otel_context.detach(token) + + +class TestGetContextScopedAttributes(TestCase): + def test_empty_context_returns_empty_dict(self) -> None: + fresh_ctx = otel_context.get_current() + attrs = get_context_scoped_attributes(fresh_ctx) + self.assertEqual(attrs, {}) + + def test_no_argument_uses_current_context(self) -> None: + """get_context_scoped_attributes() with no arg reads current context.""" + ctx = set_context_scoped_attributes({"k": "v"}) + token = otel_context.attach(ctx) + try: + attrs = get_context_scoped_attributes() + self.assertEqual(attrs["k"], "v") + finally: + otel_context.detach(token) + + def test_returns_same_dict_instance(self) -> None: + """get_context_scoped_attributes returns the stored dict (same reference).""" + ctx = set_context_scoped_attributes({"x": "1"}) + attrs1 = get_context_scoped_attributes(ctx) + attrs2 = get_context_scoped_attributes(ctx) + self.assertIs(attrs1, attrs2) diff --git a/util/opentelemetry-util-genai/tests/test_handler_workflow.py b/util/opentelemetry-util-genai/tests/test_handler_workflow.py index d3af4e9ec7..6d1cb0d2e5 100644 --- a/util/opentelemetry-util-genai/tests/test_handler_workflow.py +++ b/util/opentelemetry-util-genai/tests/test_handler_workflow.py @@ -1,10 +1,13 @@ from __future__ import annotations +import os from unittest import TestCase from unittest.mock import patch import pytest +from opentelemetry import baggage +from opentelemetry import context as otel_context from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( @@ -15,10 +18,15 @@ ) from opentelemetry.trace import SpanKind from opentelemetry.trace.status import StatusCode +from opentelemetry.util.genai.context_attributes import ( + get_context_scoped_attributes, + set_context_scoped_attributes, +) from opentelemetry.util.genai.handler import TelemetryHandler from opentelemetry.util.genai.types import ( Error, InputMessage, + LLMInvocation, OutputMessage, Text, WorkflowInvocation, @@ -308,3 +316,134 @@ def test_workflow_context_manager_swallows_fail_workflow_failure( with pytest.raises(ValueError, match="original"): with self.handler.workflow(invocation): raise ValueError("original") + + +class TelemetryHandlerWorkflowCSATest(_WorkflowTestBase): + """Tests for context-scoped attribute propagation of gen_ai.workflow.name.""" + + # ------------------------------------------------------------------ + # CSA set/read via handler + # ------------------------------------------------------------------ + + def test_workflow_sets_context_scoped_attribute(self) -> None: + """Starting a workflow stores gen_ai.workflow.name in CSA.""" + invocation = WorkflowInvocation(name="my_workflow") + self.handler.start(invocation) + try: + attrs = get_context_scoped_attributes() + self.assertEqual(attrs.get("gen_ai.workflow.name"), "my_workflow") + finally: + self.handler.stop(invocation) + + def test_llm_reads_workflow_name_from_csa(self) -> None: + """LLM invocation inside a workflow gets gen_ai.workflow.name via CSA.""" + wf_inv = WorkflowInvocation(name="pipeline") + llm_inv = LLMInvocation(request_model="test-model", provider="test") + + self.handler.start(wf_inv) + try: + self.handler.start_llm(llm_inv) + self.handler.stop_llm(llm_inv) + finally: + self.handler.stop(wf_inv) + + spans = self._get_finished_spans() + # Find the LLM span (CLIENT kind) + llm_spans = [s for s in spans if s.kind == SpanKind.CLIENT] + self.assertEqual(len(llm_spans), 1) + self.assertEqual( + llm_spans[0].attributes.get(GenAI.GEN_AI_OPERATION_NAME), + "chat", + ) + # workflow name should be stamped on the LLM span + self.assertEqual( + llm_spans[0].attributes.get("gen_ai.request.model"), "test-model" + ) + # Verify it was captured on the invocation object + self.assertEqual(llm_inv.workflow_name, "pipeline") + + def test_csa_not_visible_outside_workflow_scope(self) -> None: + """After the workflow span ends, the CSA is no longer in current context.""" + invocation = WorkflowInvocation(name="scoped_wf") + self.handler.start(invocation) + self.handler.stop(invocation) + + # After stop the context_token is detached, so CSA should be gone + attrs = get_context_scoped_attributes() + self.assertIsNone(attrs.get("gen_ai.workflow.name")) + + # ------------------------------------------------------------------ + # Baggage opt-in behaviour + # ------------------------------------------------------------------ + + def test_baggage_not_written_by_default(self) -> None: + """Without OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE, baggage is not set.""" + env = {k: v for k, v in os.environ.items() if k != "OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE"} + with patch.dict(os.environ, env, clear=True): + invocation = WorkflowInvocation(name="wf_no_baggage") + self.handler.start(invocation) + try: + value = baggage.get_baggage("gen_ai.workflow.name") + self.assertIsNone(value) + finally: + self.handler.stop(invocation) + + def test_baggage_written_when_opted_in(self) -> None: + """With OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE=true, baggage is also written.""" + with patch.dict( + os.environ, {"OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE": "true"} + ): + invocation = WorkflowInvocation(name="wf_with_baggage") + self.handler.start(invocation) + try: + value = baggage.get_baggage("gen_ai.workflow.name") + self.assertEqual(value, "wf_with_baggage") + finally: + self.handler.stop(invocation) + + def test_baggage_written_when_opted_in_with_one(self) -> None: + """OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE=1 also enables baggage writing.""" + with patch.dict( + os.environ, {"OTEL_PYTHON_GENAI_CAPTURE_BAGGAGE": "1"} + ): + invocation = WorkflowInvocation(name="wf_baggage_one") + self.handler.start(invocation) + try: + value = baggage.get_baggage("gen_ai.workflow.name") + self.assertEqual(value, "wf_baggage_one") + finally: + self.handler.stop(invocation) + + # ------------------------------------------------------------------ + # Backwards compatibility: baggage fallback for LLM + # ------------------------------------------------------------------ + + def test_baggage_fallback_for_llm_when_no_csa(self) -> None: + """LLM picks up workflow name from baggage when CSA is absent (legacy context).""" + # Manually inject baggage without using WorkflowInvocation + ctx = baggage.set_baggage("gen_ai.workflow.name", "legacy_workflow") + token = otel_context.attach(ctx) + try: + llm_inv = LLMInvocation(request_model="model", provider="test") + self.handler.start_llm(llm_inv) + self.handler.stop_llm(llm_inv) + self.assertEqual(llm_inv.workflow_name, "legacy_workflow") + finally: + otel_context.detach(token) + + def test_csa_takes_priority_over_baggage_for_llm(self) -> None: + """CSA value wins over baggage when both are present.""" + # Set baggage with one name + ctx = baggage.set_baggage("gen_ai.workflow.name", "baggage_workflow") + # Also set CSA with a different name + ctx = set_context_scoped_attributes( + {"gen_ai.workflow.name": "csa_workflow"}, ctx + ) + token = otel_context.attach(ctx) + try: + llm_inv = LLMInvocation(request_model="model", provider="test") + self.handler.start_llm(llm_inv) + self.handler.stop_llm(llm_inv) + self.assertEqual(llm_inv.workflow_name, "csa_workflow") + finally: + otel_context.detach(token)