Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion dabgent/dabgent_agent/src/processor/sandbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ impl<E: EventStore> Processor<Event> for ToolProcessor<E> {
}
}
// Phase 1: AgentMessage with ToolUse -> emit ToolResult
// IMPORTANT: We check if self.recipient is None OR matches the event's recipient
// This is crucial because:
// 1. Some ToolProcessors (like for task execution) have recipient: None to process all tools
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, can we make ToolProcessor echo the recipient based on who spawned the task? Maybe that's part of #462 though

// 2. Others (like the planner) have a specific recipient to filter events
// Without the is_none() check, ToolProcessors with recipient: None would never process
// any tool calls, breaking task execution in planning mode
Event::AgentMessage {
response,
recipient,
..
} if response.finish_reason == FinishReason::ToolUse
&& recipient.eq(&self.recipient) =>
&& (self.recipient.is_none() || recipient.eq(&self.recipient)) =>
{
let tool_results = self.run_tools(&response, &event.stream_id, &event.aggregate_id).await?;
let tool_result_event = Event::ToolResult(tool_results);
Expand Down
141 changes: 111 additions & 30 deletions dabgent/dabgent_agent/src/processor/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct Thread {
pub preamble: Option<String>,
pub tools: Option<Vec<ToolDefinition>>,
pub messages: Vec<rig::completion::Message>,
pub is_completed: bool,
}

impl Aggregate for Thread {
Expand Down Expand Up @@ -82,26 +83,35 @@ impl Aggregate for Thread {
});
}
Event::ToolResult(tool_results) => {
// Convert tool results to User message with ToolResult content
let tool_contents: Vec<rig::message::UserContent> = tool_results
.iter()
.map(|typed_result| {
// Convert TypedToolResult to ToolResult
let tool_result = rig::message::ToolResult {
id: typed_result.result.id.clone(),
content: typed_result.result.content.clone(),
call_id: None, // Add call_id if available
};
rig::message::UserContent::ToolResult(tool_result)
})
.collect();

if !tool_contents.is_empty() {
self.messages.push(rig::completion::Message::User {
content: rig::OneOrMany::many(tool_contents).unwrap(),
});
// Check if this is a done tool result - if so, don't convert to user message
let is_done_tool = tool_results.iter().any(|tr| matches!(tr.tool_name, crate::event::ToolKind::Done));
tracing::debug!("Thread applying ToolResult. Done tool: {}, Tool count: {}", is_done_tool, tool_results.len());

if !is_done_tool {
// Convert tool results to User message with ToolResult content
let tool_contents: Vec<rig::message::UserContent> = tool_results
.iter()
.map(|typed_result| {
// Convert TypedToolResult to ToolResult
let tool_result = rig::message::ToolResult {
id: typed_result.result.id.clone(),
content: typed_result.result.content.clone(),
call_id: None, // Add call_id if available
};
rig::message::UserContent::ToolResult(tool_result)
})
.collect();

if !tool_contents.is_empty() {
self.messages.push(rig::completion::Message::User {
content: rig::OneOrMany::many(tool_contents).unwrap(),
});
}
}
}
Event::TaskCompleted { .. } => {
self.is_completed = true;
}
_ => {}
}
}
Expand Down Expand Up @@ -143,7 +153,14 @@ impl Thread {
None | Some(rig::completion::Message::Assistant { .. }) => {
Ok(vec![Event::UserMessage(content)])
}
_ => Err(Error::WrongTurn),
_ => {
tracing::warn!("Rejecting UserMessage - last message is not Assistant. Last: {:?}",
self.messages.last().map(|m| match m {
rig::completion::Message::User { .. } => "User",
rig::completion::Message::Assistant { .. } => "Assistant",
}));
Err(Error::WrongTurn)
}
},
_ => unreachable!(),
}
Expand All @@ -166,26 +183,81 @@ impl Thread {
pub struct ThreadProcessor<T: LLMClient, E: EventStore> {
llm: T,
event_store: E,
recipient_filter: Option<String>,
}

impl<T: LLMClient, E: EventStore> Processor<Event> for ThreadProcessor<T, E> {
async fn run(&mut self, event: &EventDb<Event>) -> eyre::Result<()> {
let query = Query::stream(&event.stream_id).aggregate(&event.aggregate_id);
match &event.data {
Event::UserMessage(..) | Event::ToolResult(..) => {
tracing::info!("ThreadProcessor processing event for aggregate {}: {:?}",
event.aggregate_id,
match &event.data {
Event::UserMessage(_) => "UserMessage",
Event::ToolResult(_) => "ToolResult",
_ => "Other"
});
let events = self.event_store.load_events::<Event>(&query, None).await?;
let mut thread = Thread::fold(&events);

// Check recipient filter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part looks brittle. Maybe it should be a lambda function if we really, really need it?

if let Some(ref filter) = self.recipient_filter {
tracing::debug!("ThreadProcessor checking recipient. Thread recipient: {:?}, Filter: {}",
thread.recipient, filter);
if let Some(ref thread_recipient) = thread.recipient {
// Check if the thread's recipient matches our filter
// Support prefix matching for patterns like "task-*"
if filter.ends_with("*") {
let prefix = &filter[..filter.len() - 1];
if !thread_recipient.starts_with(prefix) {
tracing::debug!("Skipping thread with recipient {} (filter: {})", thread_recipient, filter);
return Ok(());
}
} else if thread_recipient != filter {
tracing::debug!("Skipping thread with recipient {} (filter: {})", thread_recipient, filter);
return Ok(());
}
} else {
// Thread has no recipient but we have a filter - skip
tracing::debug!("Skipping thread with no recipient (filter: {})", filter);
return Ok(());
}
}

tracing::info!("ThreadProcessor recipient check passed for aggregate {}", event.aggregate_id);

// Don't process if thread is already completed
if thread.is_completed {
tracing::info!("Thread {} is completed, skipping processing", event.aggregate_id);
return Ok(());
}

tracing::debug!("Thread {} - Last message type: {:?}", event.aggregate_id,
thread.messages.last().map(|m| match m {
rig::completion::Message::User { .. } => "User",
rig::completion::Message::Assistant { .. } => "Assistant",
}));
let completion = self.completion(&thread).await?;
let new_events = thread.process(Command::Agent(completion))?;
for new_event in new_events.iter() {
self.event_store
.push_event(
&event.stream_id,
&event.aggregate_id,
new_event,
&Default::default(),
)
.await?;
tracing::info!("ThreadProcessor generated completion for aggregate {}", event.aggregate_id);
match thread.process(Command::Agent(completion.clone())) {
Ok(new_events) => {
tracing::info!("ThreadProcessor processed {} new events for aggregate {}", new_events.len(), event.aggregate_id);
for new_event in new_events.iter() {
self.event_store
.push_event(
&event.stream_id,
&event.aggregate_id,
new_event,
&Default::default(),
)
.await?;
}
}
Err(e) => {
tracing::error!("ThreadProcessor failed to process command for aggregate {}: {:?}", event.aggregate_id, e);
return Err(eyre::eyre!("Failed to process command: {:?}", e));
}
}
}
_ => {}
Expand All @@ -196,7 +268,16 @@ impl<T: LLMClient, E: EventStore> Processor<Event> for ThreadProcessor<T, E> {

impl<T: LLMClient, E: EventStore> ThreadProcessor<T, E> {
pub fn new(llm: T, event_store: E) -> Self {
Self { llm, event_store }
Self {
llm,
event_store,
recipient_filter: None,
}
}

pub fn with_recipient_filter(mut self, filter: String) -> Self {
self.recipient_filter = Some(filter);
self
}

pub async fn completion(&self, thread: &Thread) -> Result<CompletionResponse> {
Expand Down
Loading