-
Notifications
You must be signed in to change notification settings - Fork 955
fix(agno): add streaming support for Agent.run() and Agent.arun() #3483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| _FunctionCallAExecuteWrapper, | ||
| ) | ||
| from opentelemetry.instrumentation.agno.config import Config | ||
| from opentelemetry.instrumentation.agno.streaming import AgnoAsyncStream, AgnoStream | ||
| from opentelemetry.instrumentation.agno.utils import ( | ||
| dont_throw, | ||
| should_send_prompts, | ||
|
|
@@ -149,12 +150,16 @@ def __call__(self, wrapped, instance, args, kwargs): | |
| ) or context_api.get_value("suppress_agno_instrumentation"): | ||
| return wrapped(*args, **kwargs) | ||
|
|
||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
| is_streaming = kwargs.get("stream", False) | ||
|
|
||
| if is_streaming: | ||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
|
|
||
| span = self._tracer.start_span( | ||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| ) | ||
|
Comment on lines
+155
to
+161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check streaming.py for span/context handling
rg -nP --type=py "start_span|set_current|attach|context|use_span|Token" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry Length of output: 47 🏁 Script executed: #!/bin/bash
# Check if streaming.py imports or uses any context-related OpenTelemetry modules
rg -nP --type=py "from.*context|import.*context" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry Length of output: 47 🏁 Script executed: #!/bin/bash
# Look at the full streaming.py file to understand the structure
wc -l packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry Length of output: 160 🏁 Script executed: #!/bin/bash
# Check how tool spans are created within the streaming context
rg -nP --type=py "tool.*span|_create_tool|wrap_tool" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py -A 5 | head -50Repository: traceloop/openllmetry Length of output: 47 🏁 Script executed: #!/bin/bash
# Read streaming.py to understand the implementation
cat -n packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry Length of output: 9702 🏁 Script executed: #!/bin/bash
# Also check the imports in __init__.py to understand span management
head -50 packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py | cat -nRepository: traceloop/openllmetry Length of output: 2160 🏁 Script executed: #!/bin/bash
# Look for tool execution in the streaming branch
sed -n '155,180p' packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py | cat -nRepository: traceloop/openllmetry Length of output: 1272 🏁 Script executed: #!/bin/bash
# Check tool wrapper implementation
head -80 packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/_tool_wrappers.py | cat -nRepository: traceloop/openllmetry Length of output: 4007 🏁 Script executed: #!/bin/bash
# Look for start_span calls in tool wrappers
rg -n "start_span|get_span|context" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/_tool_wrappers.py -A 3 | head -60Repository: traceloop/openllmetry Length of output: 970 Span context propagation is broken for tool calls in streaming mode. The streaming branch creates the agent span with The non-streaming branch uses |
||
|
|
||
| with self._tracer.start_as_current_span( | ||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| ) as span: | ||
| try: | ||
| span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno") | ||
| span.set_attribute( | ||
|
|
@@ -181,51 +186,100 @@ def __call__(self, wrapped, instance, args, kwargs): | |
|
|
||
| start_time = time.time() | ||
|
|
||
| result = wrapped(*args, **kwargs) | ||
| response = wrapped(*args, **kwargs) | ||
|
|
||
| duration = time.time() - start_time | ||
| return AgnoStream( | ||
| span, | ||
| response, | ||
| instance, | ||
| start_time, | ||
| self._duration_histogram, | ||
| self._token_histogram, | ||
| ) | ||
|
|
||
| if hasattr(result, "content") and should_send_prompts(): | ||
| except Exception as e: | ||
| span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| span.record_exception(e) | ||
| span.end() | ||
| raise | ||
| else: | ||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
|
|
||
| with self._tracer.start_as_current_span( | ||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| ) as span: | ||
| try: | ||
| span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno") | ||
| span.set_attribute( | ||
| SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content) | ||
| SpanAttributes.TRACELOOP_SPAN_KIND, | ||
| TraceloopSpanKindValues.AGENT.value, | ||
| ) | ||
|
|
||
| if hasattr(result, "run_id"): | ||
| span.set_attribute("agno.run.id", result.run_id) | ||
| if hasattr(instance, "name"): | ||
| span.set_attribute(GenAIAttributes.GEN_AI_AGENT_NAME, instance.name) | ||
|
|
||
| if hasattr(result, "metrics"): | ||
| metrics = result.metrics | ||
| if hasattr(metrics, "input_tokens"): | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS, | ||
| metrics.input_tokens, | ||
| if hasattr(instance, "model") and instance.model: | ||
| model_name = getattr( | ||
| instance.model, "id", getattr(instance.model, "name", "unknown") | ||
| ) | ||
| if hasattr(metrics, "output_tokens"): | ||
| span.set_attribute(GenAIAttributes.GEN_AI_REQUEST_MODEL, model_name) | ||
|
|
||
| if args and should_send_prompts(): | ||
| input_message = str(args[0]) | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, | ||
| metrics.output_tokens, | ||
| SpanAttributes.TRACELOOP_ENTITY_INPUT, input_message | ||
| ) | ||
| if hasattr(metrics, "total_tokens"): | ||
|
|
||
| import time | ||
|
|
||
| start_time = time.time() | ||
|
|
||
| result = wrapped(*args, **kwargs) | ||
|
|
||
| duration = time.time() - start_time | ||
|
|
||
| if hasattr(result, "content") and should_send_prompts(): | ||
| span.set_attribute( | ||
| SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens | ||
| SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content) | ||
| ) | ||
|
|
||
| span.set_status(Status(StatusCode.OK)) | ||
|
|
||
| self._duration_histogram.record( | ||
| duration, | ||
| attributes={ | ||
| GenAIAttributes.GEN_AI_SYSTEM: "agno", | ||
| SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value, | ||
| }, | ||
| ) | ||
| if hasattr(result, "run_id"): | ||
| span.set_attribute("agno.run.id", result.run_id) | ||
|
|
||
| if hasattr(result, "metrics"): | ||
| metrics = result.metrics | ||
| if hasattr(metrics, "input_tokens"): | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS, | ||
| metrics.input_tokens, | ||
| ) | ||
| if hasattr(metrics, "output_tokens"): | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, | ||
| metrics.output_tokens, | ||
| ) | ||
| if hasattr(metrics, "total_tokens"): | ||
| span.set_attribute( | ||
| SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens | ||
| ) | ||
|
|
||
| span.set_status(Status(StatusCode.OK)) | ||
|
|
||
| self._duration_histogram.record( | ||
| duration, | ||
| attributes={ | ||
| GenAIAttributes.GEN_AI_SYSTEM: "agno", | ||
| SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value, | ||
| }, | ||
| ) | ||
|
|
||
| return result | ||
| return result | ||
|
|
||
| except Exception as e: | ||
| span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| span.record_exception(e) | ||
| raise | ||
| except Exception as e: | ||
| span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| span.record_exception(e) | ||
| raise | ||
|
|
||
|
|
||
| class _AgentARunWrapper: | ||
|
|
@@ -238,19 +292,23 @@ def __init__(self, tracer, duration_histogram, token_histogram): | |
| self._token_histogram = token_histogram | ||
|
|
||
| @dont_throw | ||
| async def __call__(self, wrapped, instance, args, kwargs): | ||
| def __call__(self, wrapped, instance, args, kwargs): | ||
| """Wrap the Agent.arun() call with tracing instrumentation.""" | ||
| if context_api.get_value( | ||
| context_api._SUPPRESS_INSTRUMENTATION_KEY | ||
| ) or context_api.get_value("suppress_agno_instrumentation"): | ||
| return await wrapped(*args, **kwargs) | ||
| return wrapped(*args, **kwargs) | ||
|
|
||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
| is_streaming = kwargs.get("stream", False) | ||
|
|
||
| if is_streaming: | ||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
|
|
||
| span = self._tracer.start_span( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same question |
||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| ) | ||
|
|
||
| with self._tracer.start_as_current_span( | ||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| ) as span: | ||
| try: | ||
| span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno") | ||
| span.set_attribute( | ||
|
|
@@ -277,51 +335,103 @@ async def __call__(self, wrapped, instance, args, kwargs): | |
|
|
||
| start_time = time.time() | ||
|
|
||
| result = await wrapped(*args, **kwargs) | ||
| response = wrapped(*args, **kwargs) | ||
|
|
||
| duration = time.time() - start_time | ||
|
|
||
| if hasattr(result, "content") and should_send_prompts(): | ||
| span.set_attribute( | ||
| SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content) | ||
| ) | ||
|
|
||
| if hasattr(result, "run_id"): | ||
| span.set_attribute("agno.run.id", result.run_id) | ||
| return AgnoAsyncStream( | ||
| span, | ||
| response, | ||
| instance, | ||
| start_time, | ||
| self._duration_histogram, | ||
| self._token_histogram, | ||
| ) | ||
|
|
||
| if hasattr(result, "metrics"): | ||
| metrics = result.metrics | ||
| if hasattr(metrics, "input_tokens"): | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS, | ||
| metrics.input_tokens, | ||
| ) | ||
| if hasattr(metrics, "output_tokens"): | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, | ||
| metrics.output_tokens, | ||
| ) | ||
| if hasattr(metrics, "total_tokens"): | ||
| except Exception as e: | ||
| span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| span.record_exception(e) | ||
| span.end() | ||
| raise | ||
| else: | ||
| async def async_wrapper(): | ||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
|
|
||
| with self._tracer.start_as_current_span( | ||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| ) as span: | ||
| try: | ||
| span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno") | ||
| span.set_attribute( | ||
| SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens | ||
| SpanAttributes.TRACELOOP_SPAN_KIND, | ||
| TraceloopSpanKindValues.AGENT.value, | ||
| ) | ||
|
|
||
| span.set_status(Status(StatusCode.OK)) | ||
| if hasattr(instance, "name"): | ||
| span.set_attribute(GenAIAttributes.GEN_AI_AGENT_NAME, instance.name) | ||
|
|
||
| if hasattr(instance, "model") and instance.model: | ||
| model_name = getattr( | ||
| instance.model, "id", getattr(instance.model, "name", "unknown") | ||
| ) | ||
| span.set_attribute(GenAIAttributes.GEN_AI_REQUEST_MODEL, model_name) | ||
|
|
||
| if args and should_send_prompts(): | ||
| input_message = str(args[0]) | ||
| span.set_attribute( | ||
| SpanAttributes.TRACELOOP_ENTITY_INPUT, input_message | ||
| ) | ||
|
|
||
| import time | ||
|
|
||
| start_time = time.time() | ||
|
|
||
| result = await wrapped(*args, **kwargs) | ||
|
|
||
| duration = time.time() - start_time | ||
|
|
||
| if hasattr(result, "content") and should_send_prompts(): | ||
| span.set_attribute( | ||
| SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content) | ||
| ) | ||
|
|
||
| if hasattr(result, "run_id"): | ||
| span.set_attribute("agno.run.id", result.run_id) | ||
|
|
||
| if hasattr(result, "metrics"): | ||
| metrics = result.metrics | ||
| if hasattr(metrics, "input_tokens"): | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS, | ||
| metrics.input_tokens, | ||
| ) | ||
| if hasattr(metrics, "output_tokens"): | ||
| span.set_attribute( | ||
| GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, | ||
| metrics.output_tokens, | ||
| ) | ||
| if hasattr(metrics, "total_tokens"): | ||
| span.set_attribute( | ||
| SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens | ||
| ) | ||
|
|
||
| span.set_status(Status(StatusCode.OK)) | ||
|
|
||
| self._duration_histogram.record( | ||
| duration, | ||
| attributes={ | ||
| GenAIAttributes.GEN_AI_SYSTEM: "agno", | ||
| SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value, | ||
| }, | ||
| ) | ||
|
|
||
| self._duration_histogram.record( | ||
| duration, | ||
| attributes={ | ||
| GenAIAttributes.GEN_AI_SYSTEM: "agno", | ||
| SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value, | ||
| }, | ||
| ) | ||
| return result | ||
|
|
||
| return result | ||
| except Exception as e: | ||
| span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| span.record_exception(e) | ||
| raise | ||
|
|
||
| except Exception as e: | ||
| span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| span.record_exception(e) | ||
| raise | ||
| return async_wrapper() | ||
|
|
||
|
|
||
| class _TeamRunWrapper: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not
start_as_current_span?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@doronkopit5 bwcause it will be ended outside of this method (only when the stream ends)