diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index 72b3acab87a6..c7d8a0ab9989 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -467,6 +467,12 @@ impl Agent { cancellation_token: Option, session: &Session, ) -> (String, Result) { + let input_summary = serde_json::json!({ + "tool": tool_call.name, + "arguments": tool_call.arguments, + }); + tracing::Span::current().record("input", tracing::field::display(&input_summary)); + if tool_call.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME { let arguments = tool_call .arguments @@ -793,7 +799,10 @@ impl Agent { } } - #[instrument(skip(self, user_message, session_config), fields(user_message))] + #[instrument( + skip(self, user_message, session_config), + fields(user_message, trace_input) + )] pub async fn reply( &self, user_message: Message, @@ -802,6 +811,10 @@ impl Agent { ) -> Result>> { let session_manager = self.config.session_manager.clone(); + let message_text_for_trace = user_message.as_concat_text(); + tracing::Span::current().record("user_message", message_text_for_trace.as_str()); + tracing::Span::current().record("trace_input", message_text_for_trace.as_str()); + for content in &user_message.content { if let MessageContent::ActionRequired(action_required) = content { if let ActionRequiredData::ElicitationResponse { id, user_data } = @@ -1014,7 +1027,6 @@ impl Agent { goose_mode, initial_messages, } = context; - let reply_span = tracing::Span::current(); self.reset_retry_attempts().await; let provider = self.provider().await?; @@ -1034,10 +1046,12 @@ impl Agent { let working_dir = session.working_dir.clone(); Ok(Box::pin(async_stream::try_stream! { - let _ = reply_span.enter(); + let reply_stream_span = tracing::info_span!(target: "goose::agents::agent", "reply_stream"); + let _stream_guard = reply_stream_span.enter(); let mut turns_taken = 0u32; let max_turns = session_config.max_turns.unwrap_or(DEFAULT_MAX_TURNS); let mut compaction_attempts = 0; + let mut last_assistant_text = String::new(); loop { if is_token_cancelled(&cancel_token) { @@ -1138,6 +1152,10 @@ impl Agent { let num_tool_requests = frontend_requests.len() + remaining_requests.len(); if num_tool_requests == 0 { + let text = filtered_response.as_concat_text(); + if !text.is_empty() { + last_assistant_text = text; + } messages_to_add.push(response.clone()); continue; } @@ -1508,6 +1526,10 @@ impl Agent { tokio::task::yield_now().await; } + + if !last_assistant_text.is_empty() { + tracing::info!(target: "goose::agents::agent", trace_output = last_assistant_text.as_str()); + } })) } diff --git a/crates/goose/src/tracing/langfuse_layer.rs b/crates/goose/src/tracing/langfuse_layer.rs index 2ac418cf1ab0..df2541a4493b 100644 --- a/crates/goose/src/tracing/langfuse_layer.rs +++ b/crates/goose/src/tracing/langfuse_layer.rs @@ -166,7 +166,10 @@ pub fn create_langfuse_observer() -> Option { return None; } - let base_url = env::var("LANGFUSE_URL").unwrap_or_else(|_| DEFAULT_LANGFUSE_URL.to_string()); + let base_url = env::var("LANGFUSE_URL") + .or_else(|_| env::var("LANGFUSE_BASE_URL")) + .or_else(|_| env::var("LANGFUSE_HOST")) + .unwrap_or_else(|_| DEFAULT_LANGFUSE_URL.to_string()); let batch_manager = Arc::new(Mutex::new(LangfuseBatchManager::new( public_key, secret_key, base_url, diff --git a/crates/goose/src/tracing/observation_layer.rs b/crates/goose/src/tracing/observation_layer.rs index 81ebe21a8e10..5d85feb1daa6 100644 --- a/crates/goose/src/tracing/observation_layer.rs +++ b/crates/goose/src/tracing/observation_layer.rs @@ -183,7 +183,44 @@ impl ObservationLayer { trace_id } + pub async fn update_trace(&self, updates: serde_json::Map) { + let trace_id = self.ensure_trace_id().await; + let mut body = json!({ "id": trace_id }); + for (k, v) in updates { + body[k] = v; + } + let mut batch = self.batch_manager.lock().await; + batch.add_event("trace-create", body); + } + pub async fn handle_record(&self, span_id: u64, metadata: serde_json::Map) { + // Handle trace-level fields by updating the trace itself + let trace_fields: Vec<&str> = vec!["trace_input", "trace_output"]; + let has_trace_fields = trace_fields.iter().any(|f| metadata.contains_key(*f)); + + if has_trace_fields { + let mut trace_updates = serde_json::Map::new(); + if let Some(val) = metadata.get("trace_input") { + trace_updates.insert("input".to_string(), val.clone()); + } + if let Some(val) = metadata.get("trace_output") { + trace_updates.insert("output".to_string(), val.clone()); + } + if !trace_updates.is_empty() { + self.update_trace(trace_updates).await; + } + } + + // Filter out trace-level fields from span metadata + let span_metadata: serde_json::Map = metadata + .into_iter() + .filter(|(k, _)| !trace_fields.contains(&k.as_str())) + .collect(); + + if span_metadata.is_empty() { + return; + } + let observation_id = { let spans = self.span_tracker.lock().await; spans.get_span(span_id).cloned() @@ -199,20 +236,20 @@ impl ObservationLayer { }); // Handle special fields - if let Some(val) = metadata.get("input") { + if let Some(val) = span_metadata.get("input") { update["input"] = val.clone(); } - if let Some(val) = metadata.get("output") { + if let Some(val) = span_metadata.get("output") { update["output"] = val.clone(); } - if let Some(val) = metadata.get("model_config") { + if let Some(val) = span_metadata.get("model_config") { update["metadata"] = json!({ "model_config": val }); } // Handle any remaining metadata - let remaining_metadata: serde_json::Map = metadata + let remaining_metadata: serde_json::Map = span_metadata .iter() .filter(|(k, _)| !["input", "output", "model_config"].contains(&k.as_str())) .map(|(k, v)| (k.clone(), v.clone())) @@ -221,14 +258,12 @@ impl ObservationLayer { if !remaining_metadata.is_empty() { let flattened = flatten_metadata(remaining_metadata); if update.get("metadata").is_some() { - // If metadata exists (from model_config), merge with it if let Some(obj) = update["metadata"].as_object_mut() { for (k, v) in flattened { obj.insert(k, v); } } } else { - // Otherwise set it directly update["metadata"] = json!(flattened); } } @@ -502,6 +537,89 @@ mod tests { assert_eq!(body["metadata"]["custom_field"], "custom value"); } + #[tokio::test] + async fn test_trace_input_output_updates() { + let (fixture, layer) = TestFixture::new().with_test_layer(); + let span_id = 1u64; + let span_data = create_test_span_data(); + + layer.handle_span(span_id, span_data).await; + + let mut metadata = serde_json::Map::new(); + metadata.insert("trace_input".to_string(), json!("hello from user")); + metadata.insert("trace_output".to_string(), json!("response from assistant")); + + layer.handle_record(span_id, metadata).await; + tokio::time::sleep(TEST_WAIT_DURATION).await; + + let events = fixture.get_events().await; + // trace-create, observation-create, trace-create (update with input/output) + assert!(events.len() >= 3); + + let trace_update = events + .iter() + .rfind(|(t, b)| t == "trace-create" && b.get("input").is_some_and(|v| v.is_string())) + .expect("should have a trace update with input"); + assert_eq!(trace_update.1["input"], "hello from user"); + assert_eq!(trace_update.1["output"], "response from assistant"); + } + + #[tokio::test] + async fn test_trace_fields_not_sent_as_span_metadata() { + let (fixture, layer) = TestFixture::new().with_test_layer(); + let span_id = 1u64; + let span_data = create_test_span_data(); + + layer.handle_span(span_id, span_data).await; + + // Only trace-level fields, no span-level fields + let mut metadata = serde_json::Map::new(); + metadata.insert("trace_input".to_string(), json!("user msg")); + + layer.handle_record(span_id, metadata).await; + tokio::time::sleep(TEST_WAIT_DURATION).await; + + let events = fixture.get_events().await; + // Should NOT have a span-update since there are no span-level fields + let span_updates: Vec<_> = events.iter().filter(|(t, _)| t == "span-update").collect(); + assert!( + span_updates.is_empty(), + "trace-only fields should not generate span-update events" + ); + } + + #[tokio::test] + async fn test_mixed_trace_and_span_fields() { + let (fixture, layer) = TestFixture::new().with_test_layer(); + let span_id = 1u64; + let span_data = create_test_span_data(); + + layer.handle_span(span_id, span_data).await; + + let mut metadata = serde_json::Map::new(); + metadata.insert("trace_input".to_string(), json!("user msg")); + metadata.insert("input".to_string(), json!("tool input")); + metadata.insert("output".to_string(), json!("tool output")); + + layer.handle_record(span_id, metadata).await; + tokio::time::sleep(TEST_WAIT_DURATION).await; + + let events = fixture.get_events().await; + + // Should have both a trace update and a span-update + let trace_updates: Vec<_> = events + .iter() + .filter(|(t, b)| t == "trace-create" && b.get("input").is_some_and(|v| v.is_string())) + .collect(); + assert_eq!(trace_updates.len(), 1); + assert_eq!(trace_updates[0].1["input"], "user msg"); + + let span_updates: Vec<_> = events.iter().filter(|(t, _)| t == "span-update").collect(); + assert_eq!(span_updates.len(), 1); + assert_eq!(span_updates[0].1["input"], "tool input"); + assert_eq!(span_updates[0].1["output"], "tool output"); + } + #[test] fn test_flatten_metadata() { let _fixture = TestFixture::new();