Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions crates/goose/src/agents/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,12 @@ impl Agent {
cancellation_token: Option<CancellationToken>,
session: &Session,
) -> (String, Result<ToolCallResult, ErrorData>) {
let input_summary = serde_json::json!({
"tool": tool_call.name,
"arguments": tool_call.arguments,
Comment on lines +471 to +472
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json!({"tool": tool_call.name, "arguments": tool_call.arguments}) moves fields out of tool_call, so subsequent uses of tool_call.name / tool_call.arguments in this function will not compile; serialize references (e.g., &tool_call.name / &tool_call.arguments) or clone the fields before building input_summary.

Suggested change
"tool": tool_call.name,
"arguments": tool_call.arguments,
"tool": &tool_call.name,
"arguments": &tool_call.arguments,

Copilot uses AI. Check for mistakes.
});
tracing::Span::current().record("input", tracing::field::display(&input_summary));

if tool_call.name == PLATFORM_MANAGE_SCHEDULE_TOOL_NAME {
let arguments = tool_call
.arguments
Expand Down Expand Up @@ -793,7 +799,10 @@ impl Agent {
}
}

#[instrument(skip(self, user_message, session_config), fields(user_message))]
#[instrument(
skip(self, user_message, session_config),
fields(user_message, trace_input)
)]
pub async fn reply(
&self,
user_message: Message,
Expand All @@ -802,6 +811,10 @@ impl Agent {
) -> Result<BoxStream<'_, Result<AgentEvent>>> {
let session_manager = self.config.session_manager.clone();

let message_text_for_trace = user_message.as_concat_text();
tracing::Span::current().record("user_message", message_text_for_trace.as_str());
tracing::Span::current().record("trace_input", message_text_for_trace.as_str());

for content in &user_message.content {
if let MessageContent::ActionRequired(action_required) = content {
if let ActionRequiredData::ElicitationResponse { id, user_data } =
Expand Down Expand Up @@ -1014,7 +1027,6 @@ impl Agent {
goose_mode,
initial_messages,
} = context;
let reply_span = tracing::Span::current();
self.reset_retry_attempts().await;

let provider = self.provider().await?;
Expand All @@ -1034,10 +1046,12 @@ impl Agent {

let working_dir = session.working_dir.clone();
Ok(Box::pin(async_stream::try_stream! {
let _ = reply_span.enter();
let reply_stream_span = tracing::info_span!(target: "goose::agents::agent", "reply_stream");
let _stream_guard = reply_stream_span.enter();
let mut turns_taken = 0u32;
let max_turns = session_config.max_turns.unwrap_or(DEFAULT_MAX_TURNS);
let mut compaction_attempts = 0;
let mut last_assistant_text = String::new();

loop {
if is_token_cancelled(&cancel_token) {
Expand Down Expand Up @@ -1138,6 +1152,10 @@ impl Agent {

let num_tool_requests = frontend_requests.len() + remaining_requests.len();
if num_tool_requests == 0 {
let text = filtered_response.as_concat_text();
if !text.is_empty() {
last_assistant_text = text;
}
messages_to_add.push(response.clone());
continue;
}
Expand Down Expand Up @@ -1508,6 +1526,10 @@ impl Agent {

tokio::task::yield_now().await;
}

if !last_assistant_text.is_empty() {
tracing::info!(target: "goose::agents::agent", trace_output = last_assistant_text.as_str());
}
}))
}

Expand Down
5 changes: 4 additions & 1 deletion crates/goose/src/tracing/langfuse_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ pub fn create_langfuse_observer() -> Option<ObservationLayer> {
return None;
}

let base_url = env::var("LANGFUSE_URL").unwrap_or_else(|_| DEFAULT_LANGFUSE_URL.to_string());
let base_url = env::var("LANGFUSE_URL")
.or_else(|_| env::var("LANGFUSE_BASE_URL"))
.or_else(|_| env::var("LANGFUSE_HOST"))
.unwrap_or_else(|_| DEFAULT_LANGFUSE_URL.to_string());

let batch_manager = Arc::new(Mutex::new(LangfuseBatchManager::new(
public_key, secret_key, base_url,
Expand Down
130 changes: 124 additions & 6 deletions crates/goose/src/tracing/observation_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,44 @@ impl ObservationLayer {
trace_id
}

pub async fn update_trace(&self, updates: serde_json::Map<String, Value>) {
let trace_id = self.ensure_trace_id().await;
let mut body = json!({ "id": trace_id });
for (k, v) in updates {
body[k] = v;
Comment on lines +189 to +190
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

body[k] = v is indexing a serde_json::Value with a String, which doesn’t type-check; use body[&k] = v or mutate the underlying object via as_object_mut() and insert.

Suggested change
for (k, v) in updates {
body[k] = v;
if let Some(map) = body.as_object_mut() {
for (k, v) in updates {
map.insert(k, v);
}

Copilot uses AI. Check for mistakes.
}
let mut batch = self.batch_manager.lock().await;
batch.add_event("trace-create", body);
}

pub async fn handle_record(&self, span_id: u64, metadata: serde_json::Map<String, Value>) {
// Handle trace-level fields by updating the trace itself
let trace_fields: Vec<&str> = vec!["trace_input", "trace_output"];
let has_trace_fields = trace_fields.iter().any(|f| metadata.contains_key(*f));

if has_trace_fields {
let mut trace_updates = serde_json::Map::new();
if let Some(val) = metadata.get("trace_input") {
trace_updates.insert("input".to_string(), val.clone());
}
if let Some(val) = metadata.get("trace_output") {
trace_updates.insert("output".to_string(), val.clone());
}
if !trace_updates.is_empty() {
self.update_trace(trace_updates).await;
}
}

// Filter out trace-level fields from span metadata
let span_metadata: serde_json::Map<String, Value> = metadata
.into_iter()
.filter(|(k, _)| !trace_fields.contains(&k.as_str()))
.collect();

if span_metadata.is_empty() {
return;
}

let observation_id = {
let spans = self.span_tracker.lock().await;
spans.get_span(span_id).cloned()
Expand All @@ -199,20 +236,20 @@ impl ObservationLayer {
});

// Handle special fields
if let Some(val) = metadata.get("input") {
if let Some(val) = span_metadata.get("input") {
update["input"] = val.clone();
}

if let Some(val) = metadata.get("output") {
if let Some(val) = span_metadata.get("output") {
update["output"] = val.clone();
}

if let Some(val) = metadata.get("model_config") {
if let Some(val) = span_metadata.get("model_config") {
update["metadata"] = json!({ "model_config": val });
}

// Handle any remaining metadata
let remaining_metadata: serde_json::Map<String, Value> = metadata
let remaining_metadata: serde_json::Map<String, Value> = span_metadata
.iter()
.filter(|(k, _)| !["input", "output", "model_config"].contains(&k.as_str()))
.map(|(k, v)| (k.clone(), v.clone()))
Expand All @@ -221,14 +258,12 @@ impl ObservationLayer {
if !remaining_metadata.is_empty() {
let flattened = flatten_metadata(remaining_metadata);
if update.get("metadata").is_some() {
// If metadata exists (from model_config), merge with it
if let Some(obj) = update["metadata"].as_object_mut() {
for (k, v) in flattened {
obj.insert(k, v);
}
}
} else {
// Otherwise set it directly
update["metadata"] = json!(flattened);
}
}
Expand Down Expand Up @@ -502,6 +537,89 @@ mod tests {
assert_eq!(body["metadata"]["custom_field"], "custom value");
}

#[tokio::test]
async fn test_trace_input_output_updates() {
let (fixture, layer) = TestFixture::new().with_test_layer();
let span_id = 1u64;
let span_data = create_test_span_data();

layer.handle_span(span_id, span_data).await;

let mut metadata = serde_json::Map::new();
metadata.insert("trace_input".to_string(), json!("hello from user"));
metadata.insert("trace_output".to_string(), json!("response from assistant"));

layer.handle_record(span_id, metadata).await;
tokio::time::sleep(TEST_WAIT_DURATION).await;

let events = fixture.get_events().await;
// trace-create, observation-create, trace-create (update with input/output)
assert!(events.len() >= 3);

let trace_update = events
.iter()
.rfind(|(t, b)| t == "trace-create" && b.get("input").is_some_and(|v| v.is_string()))
.expect("should have a trace update with input");
assert_eq!(trace_update.1["input"], "hello from user");
assert_eq!(trace_update.1["output"], "response from assistant");
}

#[tokio::test]
async fn test_trace_fields_not_sent_as_span_metadata() {
let (fixture, layer) = TestFixture::new().with_test_layer();
let span_id = 1u64;
let span_data = create_test_span_data();

layer.handle_span(span_id, span_data).await;

// Only trace-level fields, no span-level fields
let mut metadata = serde_json::Map::new();
metadata.insert("trace_input".to_string(), json!("user msg"));

layer.handle_record(span_id, metadata).await;
tokio::time::sleep(TEST_WAIT_DURATION).await;

let events = fixture.get_events().await;
// Should NOT have a span-update since there are no span-level fields
let span_updates: Vec<_> = events.iter().filter(|(t, _)| t == "span-update").collect();
assert!(
span_updates.is_empty(),
"trace-only fields should not generate span-update events"
);
}

#[tokio::test]
async fn test_mixed_trace_and_span_fields() {
let (fixture, layer) = TestFixture::new().with_test_layer();
let span_id = 1u64;
let span_data = create_test_span_data();

layer.handle_span(span_id, span_data).await;

let mut metadata = serde_json::Map::new();
metadata.insert("trace_input".to_string(), json!("user msg"));
metadata.insert("input".to_string(), json!("tool input"));
metadata.insert("output".to_string(), json!("tool output"));

layer.handle_record(span_id, metadata).await;
tokio::time::sleep(TEST_WAIT_DURATION).await;

let events = fixture.get_events().await;

// Should have both a trace update and a span-update
let trace_updates: Vec<_> = events
.iter()
.filter(|(t, b)| t == "trace-create" && b.get("input").is_some_and(|v| v.is_string()))
.collect();
assert_eq!(trace_updates.len(), 1);
assert_eq!(trace_updates[0].1["input"], "user msg");

let span_updates: Vec<_> = events.iter().filter(|(t, _)| t == "span-update").collect();
assert_eq!(span_updates.len(), 1);
assert_eq!(span_updates[0].1["input"], "tool input");
assert_eq!(span_updates[0].1["output"], "tool output");
}

#[test]
fn test_flatten_metadata() {
let _fixture = TestFixture::new();
Expand Down
Loading