Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions crates/goose-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,17 @@ pub async fn cli() -> Result<()> {
provider,
model,
}) => {
// Capture recipe name before the match moves the recipe variable
let recipe_display_name = recipe
.as_ref()
.map(|r| {
std::path::Path::new(r)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(r)
})
.map(|s| s.to_string());

let (input_config, recipe_info) = match (instructions, input_text, recipe) {
(Some(file), _, _) if file == "-" => {
let mut input = String::new();
Expand Down Expand Up @@ -1108,7 +1119,31 @@ pub async fn cli() -> Result<()> {
"Headless session started"
);

// Get session ID for span tracking
let session_id = session
.session_id()
.map(|id| id.to_string())
.unwrap_or_else(|| "no-session".to_string());

// Create root span for recipe/run execution
let execution_span = if let Some(ref _info) = recipe_info {
let recipe_name = recipe_display_name.unwrap_or_else(|| "unknown".to_string());

tracing::info_span!(
"recipe.execute",
session.id = %session_id,
recipe.name = %recipe_name
)
} else {
tracing::info_span!(
"run.execute",
session.id = %session_id
)
};

let _enter = execution_span.enter();
let result = session.headless(contents).await;
drop(_enter);

let session_duration = session_start.elapsed();
let exit_type = if result.is_ok() { "normal" } else { "error" };
Expand Down
9 changes: 8 additions & 1 deletion crates/goose-cli/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,19 @@ fn setup_logging_internal(
}

if !force {
if let Ok((otlp_tracing_layer, otlp_metrics_layer)) = otlp_layer::init_otlp() {
// Initialize OpenTelemetry propagation for distributed tracing
otlp_layer::init_otel_propagation();

// Initialize traces and metrics independently so one can succeed even if the other fails
if let Ok(otlp_tracing_layer) = otlp_layer::create_otlp_tracing_layer() {
layers.push(
otlp_tracing_layer
.with_filter(otlp_layer::create_otlp_tracing_filter())
.boxed(),
);
}

if let Ok(otlp_metrics_layer) = otlp_layer::create_otlp_metrics_layer() {
layers.push(
otlp_metrics_layer
.with_filter(otlp_layer::create_otlp_metrics_filter())
Expand Down
26 changes: 5 additions & 21 deletions crates/goose-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,12 @@ async fn main() -> Result<()> {

let result = cli().await;

// Only wait for telemetry flush if OTLP is configured
let should_wait = goose::config::Config::global()
.get_param::<String>("otel_exporter_otlp_endpoint")
.is_ok();
// Shutdown OTLP providers if they were initialized
if goose::tracing::is_otlp_initialized() {
// Give batch exporters a moment to flush pending data
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

if should_wait {
// Use a shorter, dynamic wait with max timeout
let max_wait = tokio::time::Duration::from_millis(500);
let start = tokio::time::Instant::now();

// Give telemetry a chance to flush, but don't wait too long
while start.elapsed() < max_wait {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

// In future, we could check if there are pending spans/metrics here
// For now, we just do a quick wait to allow batch exports to complete
if start.elapsed() >= tokio::time::Duration::from_millis(200) {
break; // Most exports should complete within 200ms
}
}

// Then shutdown the providers
// Shutdown providers (calls shutdown on meter provider and tracer provider)
goose::tracing::shutdown_otlp();
}

Expand Down
3 changes: 3 additions & 0 deletions crates/goose-server/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub fn setup_logging(name: Option<&str>) -> Result<()> {
console_layer.with_filter(LevelFilter::INFO).boxed(),
];

// Initialize OpenTelemetry propagation for distributed tracing
otlp_layer::init_otel_propagation();

if let Ok((otlp_tracing_layer, otlp_metrics_layer)) = otlp_layer::init_otlp() {
layers.push(
otlp_tracing_layer
Expand Down
1 change: 1 addition & 0 deletions crates/goose/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ tracing-opentelemetry = "0.28"
opentelemetry = "0.27"
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "metrics"] }
opentelemetry-otlp = { version = "0.27", features = ["grpc-tonic", "http-proto", "reqwest-client"] }
opentelemetry-stdout = { version = "0.27", features = ["trace", "metrics"] }
tonic = "0.12"
keyring = { version = "3.6.2", features = ["apple-native", "windows-native", "sync-secret-service", "vendored"] }
serde_yaml = "0.9.34"
Expand Down
79 changes: 63 additions & 16 deletions crates/goose/src/agents/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use rmcp::model::{
use serde_json::Value;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, error, info, warn};

use super::final_output_tool::FinalOutputTool;
use super::model_selector::autopilot::AutoPilot;
Expand Down Expand Up @@ -131,13 +131,23 @@ pub type ToolStream = Pin<Box<dyn Stream<Item = ToolStreamItem<ToolResult<Vec<Co
// tool_stream combines a stream of ServerNotifications with a future representing the
// final result of the tool call. MCP notifications are not request-scoped, but
// this lets us capture all notifications emitted during the tool call for
// simpler consumption
pub fn tool_stream<S, F>(rx: S, done: F) -> ToolStream
// simpler consumption.
// If tool_name is provided, a "gen_ai.tool.call" span will be created and kept alive
// during the tool execution.
pub fn tool_stream<S, F>(rx: S, done: F, tool_name: Option<String>) -> ToolStream
where
S: Stream<Item = ServerNotification> + Send + Unpin + 'static,
F: Future<Output = ToolResult<Vec<Content>>> + Send + 'static,
{
Box::pin(async_stream::stream! {
// Create span if tool name is provided
let tool_span = tool_name.as_ref().map(|name| {
tracing::info_span!("gen_ai.tool.call", tool.name = %name)
});

// Enter span if it exists
let _span_guard = tool_span.as_ref().map(|span| span.enter());

tokio::pin!(done);
let mut rx = rx;

Expand Down Expand Up @@ -305,6 +315,7 @@ impl Agent {
// Handle pre-approved and read-only tools
for request in &permission_check_result.approved {
if let Ok(tool_call) = request.tool_call.clone() {
let tool_name = tool_call.name.to_string();
let (req_id, tool_result) = self
.dispatch_tool_call(
tool_call,
Expand All @@ -322,10 +333,13 @@ impl Agent {
.notification_stream
.unwrap_or_else(|| Box::new(stream::empty())),
result.result,
Some(tool_name.clone()),
),
Err(e) => tool_stream(
Box::new(stream::empty()),
futures::future::ready(Err(e)),
Some(tool_name),
),
Err(e) => {
tool_stream(Box::new(stream::empty()), futures::future::ready(Err(e)))
}
},
));
}
Expand Down Expand Up @@ -385,7 +399,8 @@ impl Agent {
}

/// Dispatch a single tool call to the appropriate client
#[instrument(skip(self, tool_call, request_id), fields(input, output))]
/// Note: Span instrumentation is handled in tool_stream where the future is actually consumed
#[allow(clippy::too_many_lines)]
pub async fn dispatch_tool_call(
&self,
tool_call: CallToolRequestParam,
Expand Down Expand Up @@ -956,7 +971,6 @@ impl Agent {
Ok(None)
}

#[instrument(skip(self, unfixed_conversation, session), fields(user_message))]
pub async fn reply(
&self,
unfixed_conversation: Conversation,
Expand Down Expand Up @@ -1005,7 +1019,25 @@ impl Agent {
initial_messages,
config,
} = context;
let reply_span = tracing::Span::current();

// Get model name for span attributes
let provider = self.provider().await?;
let model_name = provider.get_model_config().model_name.clone();

// Create a span for the agent reply loop (goose-specific, not semantic convention)
// Only add session.id attribute if a session exists
let reply_span = if let Some(session_config) = &session {
tracing::info_span!(
"agent.reply",
session.id = %session_config.id,
model = %model_name
)
} else {
tracing::info_span!(
"agent.reply",
model = %model_name
)
};
self.reset_retry_attempts().await;

// This will need further refactoring. In the ideal world we pass the new message into
Expand Down Expand Up @@ -1057,7 +1089,10 @@ impl Agent {
}

Ok(Box::pin(async_stream::try_stream! {
let _ = reply_span.enter();
// Enter span to set context for child spans (tool calls, LLM calls)
// Note: Holding guard across .await is not ideal per tracing docs, but necessary
// for context propagation in streams. The alternative (.instrument()) doesn't work for streams.
let _span_guard = reply_span.enter();
let mut turns_taken = 0u32;
let max_turns = session
.as_ref()
Expand Down Expand Up @@ -1501,18 +1536,29 @@ impl Agent {
}

pub async fn create_recipe(&self, mut messages: Conversation) -> Result<Recipe> {
tracing::info!("Starting recipe creation with {} messages", messages.len());

let extensions_info = self.extension_manager.get_extensions_info().await;
tracing::debug!("Retrieved {} extensions info", extensions_info.len());

// Get model name from provider
// Get model name from provider first for span creation
let provider = self.provider().await.map_err(|e| {
tracing::error!("Failed to get provider for recipe creation: {}", e);
e
})?;
let model_config = provider.get_model_config();
let model_name = &model_config.model_name;

// Create span following OpenTelemetry GenAI semantic conventions
let span = tracing::info_span!(
"recipe_creation",
otel.name = format!("recipe_creation {}", model_name),
gen_ai.request.model = %model_name,
gen_ai.system = "goose",
gen_ai.operation.name = "recipe_creation"
);
let _enter = span.enter();

tracing::info!("Starting recipe creation with {} messages", messages.len());

let extensions_info = self.extension_manager.get_extensions_info().await;
tracing::debug!("Retrieved {} extensions info", extensions_info.len());

tracing::debug!("Using model: {}", model_name);

let prompt_manager = self.prompt_manager.lock().await;
Expand Down Expand Up @@ -1556,6 +1602,7 @@ impl Agent {
);

tracing::info!("Calling provider to generate recipe content");

let (result, _usage) = self
.provider
.lock()
Expand Down
Loading
Loading