Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 65 additions & 60 deletions assistant/src/runtime/routes/conversation-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1890,78 +1890,83 @@ export async function handleSendMessage(

if (slashResult.kind === "clean") {
conversation.processing = true;
const provenance = provenanceFromTrustContext(conversation.trustContext);
const channelMeta = {
...provenance,
userMessageChannel: sourceChannel,
assistantMessageChannel: sourceChannel,
userMessageInterface: sourceInterface,
assistantMessageInterface: sourceInterface,
};
const cleanMsg = createUserMessage(rawContent, attachments);
const persisted = await addMessage(
mapping.conversationId,
"user",
JSON.stringify(cleanMsg.content),
channelMeta,
);
conversation.getMessages().push(cleanMsg);

const conversationId = mapping.conversationId;

let assistantMessagePersisted = false;
// Outer try/finally guarantees the processing flag is cleared (and the
// queue drained) on every failure path — including a throw from the
// initial user-message persist below, which would otherwise leave the
// conversation stuck in queued mode indefinitely.
try {
broadcastMessage({
type: "user_message_echo",
text: rawContent,
conversationId,
messageId: persisted.id,
clientMessageId,
});
publishConversationMessagesChanged(conversationId, originClientId);

const result = await conversation.forceClean();
const responseText = formatCleanResult(result);

const assistantMsg = createAssistantMessage(responseText);
await addMessage(
conversationId,
"assistant",
JSON.stringify(assistantMsg.content),
const provenance = provenanceFromTrustContext(conversation.trustContext);
const channelMeta = {
...provenance,
userMessageChannel: sourceChannel,
assistantMessageChannel: sourceChannel,
userMessageInterface: sourceInterface,
assistantMessageInterface: sourceInterface,
};
const cleanMsg = createUserMessage(rawContent, attachments);
const persisted = await addMessage(
mapping.conversationId,
"user",
JSON.stringify(cleanMsg.content),
channelMeta,
);
assistantMessagePersisted = true;
conversation.getMessages().push(assistantMsg);
conversation.getMessages().push(cleanMsg);

broadcastMessage({
type: "assistant_text_delta",
text: responseText,
conversationId,
});
broadcastMessage({ type: "message_complete", conversationId });
publishConversationMessagesChanged(conversationId, originClientId);
} catch (err) {
if (assistantMessagePersisted) {
let assistantMessagePersisted = false;
try {
broadcastMessage({
type: "user_message_echo",
text: rawContent,
conversationId,
messageId: persisted.id,
clientMessageId,
});
publishConversationMessagesChanged(conversationId, originClientId);

const result = await conversation.forceClean();
const responseText = formatCleanResult(result);

const assistantMsg = createAssistantMessage(responseText);
await addMessage(
conversationId,
"assistant",
JSON.stringify(assistantMsg.content),
channelMeta,
);
assistantMessagePersisted = true;
conversation.getMessages().push(assistantMsg);

broadcastMessage({
type: "assistant_text_delta",
text: responseText,
conversationId,
});
broadcastMessage({ type: "message_complete", conversationId });
publishConversationMessagesChanged(conversationId, originClientId);
} catch (err) {
if (assistantMessagePersisted) {
publishConversationMessagesChanged(conversationId, originClientId);
}
log.error({ err, conversationId }, "Clean command failed");
broadcastMessage({
type: "conversation_error",
conversationId,
code: "UNKNOWN",
userMessage: `Clean failed: ${err instanceof Error ? err.message : String(err)}`,
retryable: true,
});
}
log.error({ err, conversationId }, "Clean command failed");
broadcastMessage({
type: "conversation_error",

return {
accepted: true,
messageId: persisted.id,
conversationId,
code: "UNKNOWN",
userMessage: `Clean failed: ${err instanceof Error ? err.message : String(err)}`,
retryable: true,
});
};
} finally {
conversation.processing = false;
silentlyWithLog(conversation.drainQueue(), "clean-command queue drain");
}

return {
accepted: true,
messageId: persisted.id,
conversationId,
};
}

const resolvedContent = slashResult.content;
Expand Down
Loading