Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
TraceloopCallbackHandler,
)
from opentelemetry.instrumentation.langchain.config import Config
from opentelemetry.instrumentation.langchain.patch import (
create_graph_invocation_wrapper,
create_command_init_wrapper,
create_middleware_hook_wrapper,
create_async_middleware_hook_wrapper,
create_agent_wrapper,
)
from opentelemetry.instrumentation.langchain.utils import is_package_available
from opentelemetry.instrumentation.langchain.version import __version__
from opentelemetry.instrumentation.utils import unwrap
Expand Down Expand Up @@ -96,6 +103,9 @@ def _instrument(self, **kwargs):
wrapper=_BaseCallbackManagerInitWrapper(traceloopCallbackHandler),
)

# Wrap LangGraph components if available
self._wrap_langgraph_components(tracer)

if not self.disable_trace_context_propagation:
self._wrap_openai_functions_for_tracing(traceloopCallbackHandler)

Expand Down Expand Up @@ -179,8 +189,160 @@ def _wrap_openai_functions_for_tracing(self, traceloopCallbackHandler):
# wrapper=openai_tracing_wrapper,
# )

def _wrap_langgraph_components(self, tracer):
"""Wrap LangGraph components for instrumentation."""
# Wrap Pregel.stream and Pregel.astream (graph invocation)
if is_package_available("langgraph"):
try:
wrap_function_wrapper(
module="langgraph.pregel",
name="Pregel.stream",
wrapper=create_graph_invocation_wrapper(tracer, is_async=False),
)
wrap_function_wrapper(
module="langgraph.pregel",
name="Pregel.astream",
wrapper=create_graph_invocation_wrapper(tracer, is_async=True),
)
except Exception as e:
logger.debug("Failed to wrap Pregel methods: %s", e)

# Wrap Command.__init__ to capture routing commands
try:
wrap_function_wrapper(
module="langgraph.types",
name="Command.__init__",
wrapper=create_command_init_wrapper(tracer),
)
except Exception as e:
logger.debug("Failed to wrap Command.__init__: %s", e)

# Wrap AgentMiddleware hooks if langchain is available
if is_package_available("langchain"):
self._wrap_middleware_hooks(tracer)

# Wrap agent factories (method checks langgraph/langchain availability internally)
self._wrap_agent_factories(tracer)

def _wrap_agent_factories(self, tracer):
"""Wrap agent factory functions for instrumentation."""
# LangGraph prebuilt agents - patch both actual module and re-export location
if is_package_available("langgraph"):
langgraph_agent_wrapper = create_agent_wrapper(tracer, provider_name="langgraph")
# Patch the actual module where the function is defined
try:
wrap_function_wrapper(
module="langgraph.prebuilt.chat_agent_executor",
name="create_react_agent",
wrapper=langgraph_agent_wrapper,
)
except Exception as e:
logger.debug("Failed to wrap langgraph.prebuilt.chat_agent_executor.create_react_agent: %s", e)
# Also patch the re-export location for imports from langgraph.prebuilt
try:
wrap_function_wrapper(
module="langgraph.prebuilt",
name="create_react_agent",
wrapper=langgraph_agent_wrapper,
)
except Exception as e:
logger.debug("Failed to wrap langgraph.prebuilt.create_react_agent: %s", e)

# LangChain agents - patch both actual module and re-export location
if is_package_available("langchain"):
agent_wrapper = create_agent_wrapper(tracer, provider_name="langchain")
# Patch the actual module where the function is defined
try:
wrap_function_wrapper(
module="langchain.agents.factory",
name="create_agent",
wrapper=agent_wrapper,
)
except Exception as e:
logger.debug("Failed to wrap langchain.agents.factory.create_agent: %s", e)
# Also patch the re-export location for imports from langchain.agents
try:
wrap_function_wrapper(
module="langchain.agents",
name="create_agent",
wrapper=agent_wrapper,
)
except Exception as e:
logger.debug("Failed to wrap langchain.agents.create_agent: %s", e)

def _wrap_middleware_hooks(self, tracer):
"""Wrap AgentMiddleware hook methods for instrumentation."""
# Sync hooks
sync_hooks = ["before_model", "after_model", "before_agent", "after_agent"]
for hook_name in sync_hooks:
try:
wrap_function_wrapper(
module="langchain.agents.middleware.types",
name=f"AgentMiddleware.{hook_name}",
wrapper=create_middleware_hook_wrapper(tracer, hook_name),
)
except Exception as e:
logger.debug("Failed to wrap AgentMiddleware.%s: %s", hook_name, e)

# Async hooks
async_hooks = ["abefore_model", "aafter_model", "abefore_agent", "aafter_agent"]
for hook_name in async_hooks:
try:
wrap_function_wrapper(
module="langchain.agents.middleware.types",
name=f"AgentMiddleware.{hook_name}",
wrapper=create_async_middleware_hook_wrapper(tracer, hook_name),
)
except Exception as e:
logger.debug("Failed to wrap AgentMiddleware.%s: %s", hook_name, e)

def _uninstrument(self, **kwargs):
unwrap("langchain_core.callbacks", "BaseCallbackManager.__init__")

# Unwrap LangGraph components
if is_package_available("langgraph"):
try:
unwrap("langgraph.pregel", "Pregel.stream")
unwrap("langgraph.pregel", "Pregel.astream")
except Exception:
pass
try:
unwrap("langgraph.types", "Command.__init__")
except Exception:
pass

# Unwrap AgentMiddleware hooks
if is_package_available("langchain"):
sync_hooks = ["before_model", "after_model", "before_agent", "after_agent"]
async_hooks = ["abefore_model", "aafter_model", "abefore_agent", "aafter_agent"]
for hook_name in sync_hooks + async_hooks:
try:
unwrap("langchain.agents.middleware.types", f"AgentMiddleware.{hook_name}")
except Exception:
pass

# Unwrap LangGraph agent factories (both actual module and re-export)
if is_package_available("langgraph"):
try:
unwrap("langgraph.prebuilt.chat_agent_executor", "create_react_agent")
except Exception:
pass
try:
unwrap("langgraph.prebuilt", "create_react_agent")
except Exception:
pass

# Unwrap LangChain agent factories (both actual module and re-export)
if is_package_available("langchain"):
try:
unwrap("langchain.agents.factory", "create_agent")
except Exception:
pass
try:
unwrap("langchain.agents", "create_agent")
except Exception:
pass

if not self.disable_trace_context_propagation:
if is_package_available("langchain_community"):
unwrap("langchain_community.llms.openai", "BaseOpenAI._generate")
Expand Down
Loading