Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import logging
from typing import Collection

from opentelemetry import context as context_api


from opentelemetry._events import get_event_logger
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.langchain.callback_handler import (
Expand All @@ -13,7 +16,7 @@
from opentelemetry.instrumentation.langchain.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.semconv_ai import Meters
from opentelemetry.semconv_ai import Meters, SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
from opentelemetry.trace import get_tracer
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.trace.propagation.tracecontext import (
Expand Down Expand Up @@ -183,8 +186,8 @@ def _uninstrument(self, **kwargs):


class _BaseCallbackManagerInitWrapper:
def __init__(self, callback_manager: "TraceloopCallbackHandler"):
self._callback_manager = callback_manager
def __init__(self, callback_handler: "TraceloopCallbackHandler"):
self._callback_handler = callback_handler

def __call__(
self,
Expand All @@ -195,10 +198,14 @@ def __call__(
) -> None:
wrapped(*args, **kwargs)
for handler in instance.inheritable_handlers:
if isinstance(handler, type(self._callback_manager)):
if isinstance(handler, type(self._callback_handler)):
break
else:
instance.add_handler(self._callback_manager, True)
# Add a property to the handler which indicates the CallbackManager instance.
# Since the CallbackHandler only propagates context for sync callbacks,
# we need a way to determine the type of CallbackManager being wrapped.
self._callback_handler._callback_manager = instance
instance.add_handler(self._callback_handler, True)


# This class wraps a function call to inject tracing information (trace headers) into
Expand Down Expand Up @@ -233,4 +240,10 @@ def __call__(
# Update kwargs to include the modified headers
kwargs["extra_headers"] = extra_headers

# In legacy chains like LLMChain, suppressing model instrumentations
# within create_llm_span doesn't work, so this should helps as a fallback
context_api.attach(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _OpenAITracingWrapper.call, a new suppression context is attached via context_api.attach(). Consider whether this token should be explicitly detached after the wrapped call to avoid any context leakage. The fallback looks intentional, but ensuring cleanup may prevent potential side effects.

context_api.set_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, True)
)

return wrapped(*args, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from langchain_core.callbacks import (
BaseCallbackHandler,
CallbackManager,
AsyncCallbackManager,
)
from langchain_core.messages import (
AIMessage,
Expand Down Expand Up @@ -85,19 +87,6 @@ def _extract_class_name_from_serialized(serialized: Optional[dict[str, Any]]) ->
return ""


def _message_type_to_role(message_type: str) -> str:
if message_type == "human":
return "user"
elif message_type == "system":
return "system"
elif message_type == "ai":
return "assistant"
elif message_type == "tool":
return "tool"
else:
return "unknown"
Comment on lines -88 to -98
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this function from this file, as it was moved to span_utils.py in PR #2889



def _sanitize_metadata_value(value: Any) -> Any:
"""Convert metadata values to OpenTelemetry-compatible types."""
if value is None:
Expand Down Expand Up @@ -163,6 +152,7 @@ def __init__(
self.token_histogram = token_histogram
self.spans: dict[UUID, SpanHolder] = {}
self.run_inline = True
self._callback_manager: CallbackManager | AsyncCallbackManager = None

@staticmethod
def _get_name_from_callback(
Expand Down Expand Up @@ -192,6 +182,9 @@ def _end_span(self, span: Span, run_id: UUID) -> None:
if child_span.end_time is None: # avoid warning on ended spans
child_span.end()
span.end()
token = self.spans[run_id].token
if token:
context_api.detach(token)

def _create_span(
self,
Expand Down Expand Up @@ -230,13 +223,17 @@ def _create_span(
else:
span = self.tracer.start_span(span_name, kind=kind)

token = None
# TODO: make this unconditional once attach/detach works properly with async callbacks.
# Currently, it doesn't work due to this - https://github.com/langchain-ai/langchain/issues/31398
# As a sidenote, OTel Python users also report similar issues -
# https://github.com/open-telemetry/opentelemetry-python/issues/2606
if self._callback_manager and not self._callback_manager.is_async:
token = context_api.attach(set_span_in_context(span))

_set_span_attribute(span, SpanAttributes.TRACELOOP_WORKFLOW_NAME, workflow_name)
_set_span_attribute(span, SpanAttributes.TRACELOOP_ENTITY_PATH, entity_path)

token = context_api.attach(
context_api.set_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, True)
)

self.spans[run_id] = SpanHolder(
span, token, None, [], workflow_name, entity_name, entity_path
)
Expand Down Expand Up @@ -300,6 +297,16 @@ def _create_llm_span(
_set_span_attribute(span, SpanAttributes.LLM_SYSTEM, vendor)
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_TYPE, request_type.value)

# we already have an LLM span by this point,
# so skip any downstream instrumentation from here
token = context_api.attach(
context_api.set_value(SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, True)
)

self.spans[run_id] = SpanHolder(
span, token, None, [], workflow_name, None, entity_path
)

return span

@dont_throw
Expand Down Expand Up @@ -464,7 +471,7 @@ def on_llm_end(
"model_name"
) or response.llm_output.get("model_id")
if model_name is not None:
_set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, model_name)
_set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, model_name or "unknown")

if self.spans[run_id].request_model is None:
_set_span_attribute(
Expand Down
Loading