diff --git a/crates/goose-cli/src/cli.rs b/crates/goose-cli/src/cli.rs index 346715d04804..2187a3c19ab8 100644 --- a/crates/goose-cli/src/cli.rs +++ b/crates/goose-cli/src/cli.rs @@ -16,7 +16,6 @@ use crate::commands::schedule::{ handle_schedule_sessions, }; use crate::commands::session::{handle_session_list, handle_session_remove}; -use crate::logging::setup_logging; use crate::recipes::extract_from_cli::extract_recipe_info_from_cli; use crate::recipes::recipe::{explain_recipe, render_recipe_as_yaml}; use crate::session; @@ -718,7 +717,7 @@ pub async fn cli() -> Result<()> { }; tracing::info!( - monotonic_counter.goose.cli_commands = 1, + counter.goose.cli_commands = 1, command = command_name, "CLI command executed" ); @@ -779,6 +778,16 @@ pub async fn cli() -> Result<()> { Ok(()) } None => { + let session_start = std::time::Instant::now(); + let session_type = if resume { "resumed" } else { "new" }; + + tracing::info!( + counter.goose.session_starts = 1, + session_type, + interactive = true, + "Session started" + ); + // Run session command by default let mut session: crate::Session = build_session(SessionBuilderConfig { identifier: identifier.map(extract_identifier), @@ -804,21 +813,46 @@ pub async fn cli() -> Result<()> { retry_config: None, }) .await; - setup_logging( - session - .session_file() - .as_ref() - .and_then(|p| p.file_stem()) - .and_then(|s| s.to_str()), - None, - )?; // Render previous messages if resuming a session and history flag is set if resume && history { session.render_message_history(); } - let _ = session.interactive(None).await; + let result = session.interactive(None).await; + + let session_duration = session_start.elapsed(); + let exit_type = if result.is_ok() { "normal" } else { "error" }; + + let (total_tokens, message_count) = session + .get_metadata() + .map(|m| (m.total_tokens.unwrap_or(0), m.message_count)) + .unwrap_or((0, 0)); + + tracing::info!( + counter.goose.session_completions = 1, + session_type, + exit_type, + duration_ms = session_duration.as_millis() as u64, + total_tokens, + message_count, + "Session completed" + ); + + tracing::info!( + counter.goose.session_duration_ms = session_duration.as_millis() as u64, + session_type, + "Session duration" + ); + + if total_tokens > 0 { + tracing::info!( + counter.goose.session_tokens = total_tokens, + session_type, + "Session tokens" + ); + } + Ok(()) } }; @@ -897,8 +931,13 @@ pub async fn cli() -> Result<()> { (input_config, None) } (_, _, Some(recipe_name)) => { - tracing::info!(monotonic_counter.goose.recipe_runs = 1, - recipe_name = %recipe_name, + let recipe_display_name = std::path::Path::new(&recipe_name) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or(&recipe_name); + + tracing::info!(counter.goose.recipe_runs = 1, + recipe_name = %recipe_display_name, "Recipe execution started" ); @@ -952,19 +991,59 @@ pub async fn cli() -> Result<()> { }) .await; - setup_logging( - session - .session_file() - .as_ref() - .and_then(|p| p.file_stem()) - .and_then(|s| s.to_str()), - None, - )?; - if interactive { let _ = session.interactive(input_config.contents).await; } else if let Some(contents) = input_config.contents { - let _ = session.headless(contents).await; + let session_start = std::time::Instant::now(); + let session_type = if recipe_info.is_some() { + "recipe" + } else { + "run" + }; + + tracing::info!( + counter.goose.session_starts = 1, + session_type, + interactive = false, + "Headless session started" + ); + + let result = session.headless(contents).await; + + let session_duration = session_start.elapsed(); + let exit_type = if result.is_ok() { "normal" } else { "error" }; + + let (total_tokens, message_count) = session + .get_metadata() + .map(|m| (m.total_tokens.unwrap_or(0), m.message_count)) + .unwrap_or((0, 0)); + + tracing::info!( + counter.goose.session_completions = 1, + session_type, + exit_type, + duration_ms = session_duration.as_millis() as u64, + total_tokens, + message_count, + interactive = false, + "Headless session completed" + ); + + tracing::info!( + counter.goose.session_duration_ms = session_duration.as_millis() as u64, + session_type, + "Headless session duration" + ); + + if total_tokens > 0 { + tracing::info!( + counter.goose.session_tokens = total_tokens, + session_type, + "Headless session tokens" + ); + } + + result?; } else { eprintln!("Error: no text provided for prompt in headless mode"); std::process::exit(1); @@ -1083,14 +1162,6 @@ pub async fn cli() -> Result<()> { retry_config: None, }) .await; - setup_logging( - session - .session_file() - .as_ref() - .and_then(|p| p.file_stem()) - .and_then(|s| s.to_str()), - None, - )?; if let Err(e) = session.interactive(None).await { eprintln!("Session ended with error: {}", e); } diff --git a/crates/goose-cli/src/main.rs b/crates/goose-cli/src/main.rs index 278a32624212..0f54e53b76ca 100644 --- a/crates/goose-cli/src/main.rs +++ b/crates/goose-cli/src/main.rs @@ -3,5 +3,32 @@ use goose_cli::cli::cli; #[tokio::main] async fn main() -> Result<()> { - cli().await + if let Err(e) = goose_cli::logging::setup_logging(None, None) { + eprintln!("Warning: Failed to initialize telemetry: {}", e); + } + + let result = cli().await; + + // Only wait for telemetry flush if OTLP is configured + if std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok() { + // Use a shorter, dynamic wait with max timeout + let max_wait = tokio::time::Duration::from_millis(500); + let start = tokio::time::Instant::now(); + + // Give telemetry a chance to flush, but don't wait too long + while start.elapsed() < max_wait { + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // In future, we could check if there are pending spans/metrics here + // For now, we just do a quick wait to allow batch exports to complete + if start.elapsed() >= tokio::time::Duration::from_millis(200) { + break; // Most exports should complete within 200ms + } + } + + // Then shutdown the providers + goose::tracing::shutdown_otlp(); + } + + result } diff --git a/crates/goose-cli/src/session/mod.rs b/crates/goose-cli/src/session/mod.rs index 0cdda04bcf31..60c412222f1f 100644 --- a/crates/goose-cli/src/session/mod.rs +++ b/crates/goose-cli/src/session/mod.rs @@ -851,7 +851,8 @@ impl Session { pub async fn headless(&mut self, prompt: String) -> Result<()> { let message = Message::user().with_text(&prompt); self.process_message(message, CancellationToken::default()) - .await + .await?; + Ok(()) } async fn process_agent_response( @@ -1023,12 +1024,45 @@ impl Session { for content in &message.content { if let MessageContent::ToolRequest(tool_request) = content { if let Ok(tool_call) = &tool_request.tool_call { - tracing::info!(monotonic_counter.goose.tool_calls = 1, + tracing::info!(counter.goose.tool_calls = 1, tool_name = %tool_call.name, - "Tool call executed" + "Tool call started" ); } } + if let MessageContent::ToolResponse(tool_response) = content { + let tool_name = self.messages + .iter() + .rev() + .find_map(|msg| { + msg.content.iter().find_map(|c| { + if let MessageContent::ToolRequest(req) = c { + if req.id == tool_response.id { + if let Ok(tool_call) = &req.tool_call { + Some(tool_call.name.clone()) + } else { + None + } + } else { + None + } + } else { + None + } + }) + }) + .unwrap_or_else(|| "unknown".to_string()); + + let success = tool_response.tool_result.is_ok(); + let result_status = if success { "success" } else { "error" }; + + tracing::info!( + counter.goose.tool_completions = 1, + tool_name = %tool_name, + result = %result_status, + "Tool call completed" + ); + } } push_message(&mut self.messages, message.clone()); diff --git a/crates/goose-server/src/commands/agent.rs b/crates/goose-server/src/commands/agent.rs index 5fdfa89ae2ee..5f47e392f3d7 100644 --- a/crates/goose-server/src/commands/agent.rs +++ b/crates/goose-server/src/commands/agent.rs @@ -13,7 +13,7 @@ use tracing::info; use goose::providers::pricing::initialize_pricing_cache; pub async fn run() -> Result<()> { - // Initialize logging + // Initialize logging and telemetry crate::logging::setup_logging(Some("goosed"))?; let settings = configuration::Settings::new()?; diff --git a/crates/goose-server/src/routes/reply.rs b/crates/goose-server/src/routes/reply.rs index 6a3b91ca8c42..fb66e358f223 100644 --- a/crates/goose-server/src/routes/reply.rs +++ b/crates/goose-server/src/routes/reply.rs @@ -11,7 +11,7 @@ use bytes::Bytes; use futures::{stream::StreamExt, Stream}; use goose::{ agents::{AgentEvent, SessionConfig}, - message::{push_message, Message}, + message::{push_message, Message, MessageContent}, permission::permission_confirmation::PrincipalType, }; use goose::{ @@ -37,6 +37,53 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use utoipa::ToSchema; +fn track_tool_telemetry(content: &MessageContent, all_messages: &[Message]) { + match content { + MessageContent::ToolRequest(tool_request) => { + if let Ok(tool_call) = &tool_request.tool_call { + tracing::info!(monotonic_counter.goose.tool_calls = 1, + tool_name = %tool_call.name, + "Tool call started" + ); + } + } + MessageContent::ToolResponse(tool_response) => { + let tool_name = all_messages + .iter() + .rev() + .find_map(|msg| { + msg.content.iter().find_map(|c| { + if let MessageContent::ToolRequest(req) = c { + if req.id == tool_response.id { + if let Ok(tool_call) = &req.tool_call { + Some(tool_call.name.clone()) + } else { + None + } + } else { + None + } + } else { + None + } + }) + }) + .unwrap_or_else(|| "unknown".to_string()); + + let success = tool_response.tool_result.is_ok(); + let result_status = if success { "success" } else { "error" }; + + tracing::info!( + counter.goose.tool_completions = 1, + tool_name = %tool_name, + result = %result_status, + "Tool call completed" + ); + } + _ => {} + } +} + #[derive(Debug, Deserialize, Serialize)] struct ChatRequest { messages: Vec, @@ -126,6 +173,15 @@ async fn reply_handler( ) -> Result { verify_secret_key(&headers, &state)?; + let session_start = std::time::Instant::now(); + + tracing::info!( + counter.goose.session_starts = 1, + session_type = "app", + interface = "ui", + "Session started" + ); + let (tx, rx) = mpsc::channel(100); let stream = ReceiverStream::new(rx); let cancel_token = CancellationToken::new(); @@ -222,8 +278,12 @@ async fn reply_handler( response = timeout(Duration::from_millis(500), stream.next()) => { match response { Ok(Some(Ok(AgentEvent::Message(message)))) => { - push_message(&mut all_messages, message.clone()); - stream_event(MessageEvent::Message { message }, &tx, &cancel_token).await; + for content in &message.content { + track_tool_telemetry(content, &all_messages); + } + + push_message(&mut all_messages, message.clone()); + stream_event(MessageEvent::Message { message }, &tx, &cancel_token).await; } Ok(Some(Ok(AgentEvent::HistoryReplaced(new_messages)))) => { // Replace the message history with the compacted messages @@ -269,10 +329,12 @@ async fn reply_handler( if all_messages.len() > saved_message_count { if let Ok(provider) = agent.provider().await { let provider = Arc::clone(&provider); + let session_path_clone = session_path.to_path_buf(); + let all_messages_clone = all_messages.clone(); tokio::spawn(async move { if let Err(e) = session::persist_messages( - &session_path, - &all_messages, + &session_path_clone, + &all_messages_clone, Some(provider), Some(PathBuf::from(&session_working_dir)), ) @@ -284,6 +346,58 @@ async fn reply_handler( } } + let session_duration = session_start.elapsed(); + + if let Ok(metadata) = session::read_metadata(&session_path) { + let total_tokens = metadata.total_tokens.unwrap_or(0); + let message_count = metadata.message_count; + + tracing::info!( + counter.goose.session_completions = 1, + session_type = "app", + interface = "ui", + exit_type = "normal", + duration_ms = session_duration.as_millis() as u64, + total_tokens, + message_count, + "Session completed" + ); + + tracing::info!( + counter.goose.session_duration_ms = session_duration.as_millis() as u64, + session_type = "app", + interface = "ui", + "Session duration" + ); + + if total_tokens > 0 { + tracing::info!( + counter.goose.session_tokens = total_tokens, + session_type = "app", + interface = "ui", + "Session tokens" + ); + } + } else { + tracing::info!( + counter.goose.session_completions = 1, + session_type = "app", + interface = "ui", + exit_type = "normal", + duration_ms = session_duration.as_millis() as u64, + total_tokens = 0u64, + message_count = all_messages.len(), + "Session completed" + ); + + tracing::info!( + counter.goose.session_duration_ms = session_duration.as_millis() as u64, + session_type = "app", + interface = "ui", + "Session duration" + ); + } + let _ = stream_event( MessageEvent::Finish { reason: "stop".to_string(), diff --git a/crates/goose/src/tracing/mod.rs b/crates/goose/src/tracing/mod.rs index 3d5f34c971a7..8acd9203c7d0 100644 --- a/crates/goose/src/tracing/mod.rs +++ b/crates/goose/src/tracing/mod.rs @@ -1,6 +1,7 @@ pub mod langfuse_layer; mod observation_layer; pub mod otlp_layer; +pub mod rate_limiter; pub use langfuse_layer::{create_langfuse_observer, LangfuseBatchManager}; pub use observation_layer::{ @@ -10,3 +11,6 @@ pub use otlp_layer::{ create_otlp_metrics_filter, create_otlp_tracing_filter, create_otlp_tracing_layer, init_otlp_metrics, init_otlp_tracing, init_otlp_tracing_only, shutdown_otlp, OtlpConfig, }; +pub use rate_limiter::{ + MetricData, RateLimitedTelemetrySender, SpanData as RateLimitedSpanData, TelemetryEvent, +}; diff --git a/crates/goose/src/tracing/otlp_layer.rs b/crates/goose/src/tracing/otlp_layer.rs index b2ce425bb719..e54712da52ac 100644 --- a/crates/goose/src/tracing/otlp_layer.rs +++ b/crates/goose/src/tracing/otlp_layer.rs @@ -121,9 +121,12 @@ pub fn create_otlp_tracing_layer() -> OtlpResult { let tracer_provider = trace::TracerProvider::builder() .with_batch_exporter(exporter, runtime::Tokio) + .with_max_events_per_span(2048) + .with_max_attributes_per_span(512) + .with_max_links_per_span(512) .with_resource(resource) .with_id_generator(RandomIdGenerator::default()) - .with_sampler(Sampler::AlwaysOn) + .with_sampler(Sampler::TraceIdRatioBased(0.1)) .build(); let tracer = tracer_provider.tracer("goose"); @@ -150,7 +153,7 @@ pub fn create_otlp_metrics_layer() -> OtlpResult { .with_resource(resource) .with_reader( opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio) - .with_interval(Duration::from_secs(5)) // Reduced from 30s to 5s for faster metrics + .with_interval(Duration::from_millis(2000)) .build(), ) .build(); @@ -220,12 +223,13 @@ pub fn create_otlp_metrics_filter() -> FilterFn) -> bool> /// Shutdown OTLP providers gracefully pub fn shutdown_otlp() { + // Shutdown the tracer provider and flush any pending spans global::shutdown_tracer_provider(); - // Note: There's currently no clean way to shutdown the global meter provider - // in the OpenTelemetry Rust SDK. The meter provider will be cleaned up when - // the process exits. Individual meter providers can be shut down if you have - // a direct reference to them. + // Force flush of metrics by waiting a bit + // The meter provider doesn't have a direct shutdown method in the current SDK, + // but we can give it time to export any pending metrics + std::thread::sleep(std::time::Duration::from_millis(500)); } #[cfg(test)] diff --git a/crates/goose/src/tracing/rate_limiter.rs b/crates/goose/src/tracing/rate_limiter.rs new file mode 100644 index 000000000000..f1a9c9b7e0e4 --- /dev/null +++ b/crates/goose/src/tracing/rate_limiter.rs @@ -0,0 +1,143 @@ +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tokio::time::sleep; +use tracing::{info, warn}; + +pub struct RateLimitedTelemetrySender { + sender: mpsc::UnboundedSender, +} + +#[derive(Debug, Clone)] +pub enum TelemetryEvent { + Span(SpanData), + Metric(MetricData), +} + +#[derive(Debug, Clone)] +pub struct SpanData { + pub name: String, + pub attributes: Vec<(String, String)>, + pub duration: Option, +} + +#[derive(Debug, Clone)] +pub struct MetricData { + pub name: String, + pub value: f64, + pub labels: Vec<(String, String)>, +} + +impl RateLimitedTelemetrySender { + pub fn new(rate_limit_ms: u64) -> Self { + let (sender, mut receiver) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + let mut last_send = Instant::now(); + let rate_limit_duration = Duration::from_millis(rate_limit_ms); + + info!( + "Starting rate-limited telemetry sender with {}ms delay", + rate_limit_ms + ); + + while let Some(event) = receiver.recv().await { + let elapsed = last_send.elapsed(); + if elapsed < rate_limit_duration { + let sleep_duration = rate_limit_duration - elapsed; + sleep(sleep_duration).await; + } + + match event { + TelemetryEvent::Span(span_data) => { + Self::process_span(span_data).await; + } + TelemetryEvent::Metric(metric_data) => { + Self::process_metric(metric_data).await; + } + } + + last_send = Instant::now(); + } + + warn!("Rate-limited telemetry sender shutting down"); + }); + + Self { sender } + } + + pub fn send_span( + &self, + span_data: SpanData, + ) -> Result<(), mpsc::error::SendError> { + self.sender.send(TelemetryEvent::Span(span_data)) + } + + pub fn send_metric( + &self, + metric_data: MetricData, + ) -> Result<(), mpsc::error::SendError> { + self.sender.send(TelemetryEvent::Metric(metric_data)) + } + + async fn process_span(span_data: SpanData) { + let span = tracing::info_span!("telemetry_span", name = %span_data.name); + let _enter = span.enter(); + + for (key, value) in span_data.attributes { + tracing::Span::current().record(key.as_str(), value.as_str()); + } + + if let Some(duration) = span_data.duration { + info!(duration_ms = duration.as_millis(), "span_duration"); + } + } + + async fn process_metric(metric_data: MetricData) { + info!( + metric_name = %metric_data.name, + metric_value = metric_data.value, + labels = ?metric_data.labels, + "telemetry_metric" + ); + } +} + +impl Default for RateLimitedTelemetrySender { + fn default() -> Self { + Self::new(400) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{timeout, Duration as TokioDuration}; + + #[tokio::test] + async fn test_rate_limited_sender() { + let sender = RateLimitedTelemetrySender::new(100); // 100ms rate limit for testing + + let span_data = SpanData { + name: "test_span".to_string(), + attributes: vec![("key".to_string(), "value".to_string())], + duration: Some(Duration::from_millis(50)), + }; + + let metric_data = MetricData { + name: "test_metric".to_string(), + value: 42.0, + labels: vec![("label".to_string(), "value".to_string())], + }; + + // Send events + assert!(sender.send_span(span_data).is_ok()); + assert!(sender.send_metric(metric_data).is_ok()); + + // Give time for processing + timeout(TokioDuration::from_millis(500), async { + tokio::time::sleep(TokioDuration::from_millis(300)).await; + }) + .await + .unwrap(); + } +}