Skip to content

Commit

Permalink
Bugfix/Openai assistant thread not found (FlowiseAI#3426)
Browse files Browse the repository at this point in the history
fix openai assistant thread not found by exponential backoff retries
  • Loading branch information
HenryHengZJ authored Oct 29, 2024
1 parent cf06175 commit ebc4641
Showing 1 changed file with 147 additions and 90 deletions.
237 changes: 147 additions & 90 deletions packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,28 +267,54 @@ class OpenAIAssistant_Agents implements INode {
// List all runs, in case existing thread is still running
if (!isNewThread) {
const promise = (threadId: string) => {
return new Promise<void>((resolve) => {
return new Promise<void>((resolve, reject) => {
const maxWaitTime = 30000 // Maximum wait time of 30 seconds
const startTime = Date.now()
let delay = 500 // Initial delay between retries
const maxRetries = 10
let retries = 0

const timeout = setInterval(async () => {
const allRuns = await openai.beta.threads.runs.list(threadId)
if (allRuns.data && allRuns.data.length) {
const firstRunId = allRuns.data[0].id
const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status
if (
runStatus &&
(runStatus === 'cancelled' ||
runStatus === 'completed' ||
runStatus === 'expired' ||
runStatus === 'failed' ||
runStatus === 'requires_action')
) {
try {
const allRuns = await openai.beta.threads.runs.list(threadId)
if (allRuns.data && allRuns.data.length) {
const firstRunId = allRuns.data[0].id
const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status
if (
runStatus &&
(runStatus === 'cancelled' ||
runStatus === 'completed' ||
runStatus === 'expired' ||
runStatus === 'failed' ||
runStatus === 'requires_action')
) {
clearInterval(timeout)
resolve()
}
} else {
clearInterval(timeout)
resolve()
reject(new Error(`Empty Thread: ${threadId}`))
}
} else {
} catch (error: any) {
if (error.response?.status === 404) {
clearInterval(timeout)
reject(new Error(`Thread not found: ${threadId}`))
} else if (error.response?.status === 429 && retries < maxRetries) {
retries++
delay *= 2
console.warn(`Rate limit exceeded, retrying in ${delay}ms...`)
} else {
clearInterval(timeout)
reject(new Error(`Unexpected error: ${error.message}`))
}
}

// Timeout condition to stop the loop if maxWaitTime is exceeded
if (Date.now() - startTime > maxWaitTime) {
clearInterval(timeout)
resolve()
reject(new Error('Timeout waiting for thread to finish.'))
}
}, 500)
}, delay)
})
}
await promise(threadId)
Expand Down Expand Up @@ -576,96 +602,127 @@ class OpenAIAssistant_Agents implements INode {

const promise = (threadId: string, runId: string) => {
return new Promise((resolve, reject) => {
const maxWaitTime = 30000 // Maximum wait time of 30 seconds
const startTime = Date.now()
let delay = 500 // Initial delay between retries
const maxRetries = 10
let retries = 0

const timeout = setInterval(async () => {
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
const state = run.status
if (state === 'completed') {
clearInterval(timeout)
resolve(state)
} else if (state === 'requires_action') {
if (run.required_action?.submit_tool_outputs.tool_calls) {
try {
const run = await openai.beta.threads.runs.retrieve(threadId, runId)
const state = run.status

if (state === 'completed') {
clearInterval(timeout)
const actions: ICommonObject[] = []
run.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
const functionCall = item.function
let args = {}
try {
args = JSON.parse(functionCall.arguments)
} catch (e) {
console.error('Error parsing arguments, default to empty object')
}
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
resolve(state)
} else if (state === 'requires_action') {
if (run.required_action?.submit_tool_outputs.tool_calls) {
clearInterval(timeout)
const actions: ICommonObject[] = []
run.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
const functionCall = item.function
let args = {}
try {
args = JSON.parse(functionCall.arguments)
} catch (e) {
console.error('Error parsing arguments, default to empty object')
}
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
})
})
})

const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue
const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue

// Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamToolEvent(chatId, tool.name)
// Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamToolEvent(chatId, tool.name)
}

try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
sessionId: threadId,
chatId: options.chatId,
input
})
await analyticHandlers.onToolEnd(toolIds, toolOutput)
submitToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
} catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
clearInterval(timeout)
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}`
)
)
return
}
}

const newRun = await openai.beta.threads.runs.retrieve(threadId, runId)
const newStatus = newRun?.status

try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
sessionId: threadId,
chatId: options.chatId,
input
})
await analyticHandlers.onToolEnd(toolIds, toolOutput)
submitToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
if (submitToolOutputs.length && newStatus === 'requires_action') {
await openai.beta.threads.runs.submitToolOutputs(threadId, runId, {
tool_outputs: submitToolOutputs
})
resolve(state)
} else {
await openai.beta.threads.runs.cancel(threadId, runId)
resolve('requires_action_retry')
}
} catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
clearInterval(timeout)
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}`
)
new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`)
)
break
}
}

const newRun = await openai.beta.threads.runs.retrieve(threadId, runId)
const newStatus = newRun?.status

try {
if (submitToolOutputs.length && newStatus === 'requires_action') {
await openai.beta.threads.runs.submitToolOutputs(threadId, runId, {
tool_outputs: submitToolOutputs
})
resolve(state)
} else {
await openai.beta.threads.runs.cancel(threadId, runId)
resolve('requires_action_retry')
}
} catch (e) {
clearInterval(timeout)
reject(new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`))
}
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
clearInterval(timeout)
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}`
)
)
}
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
} catch (error: any) {
if (error.response?.status === 404 || error.response?.status === 429) {
clearInterval(timeout)
reject(new Error(`API error: ${error.response?.status} for Thread ID: ${threadId}, Run ID: ${runId}`))
} else if (retries < maxRetries) {
retries++
delay *= 2 // Exponential backoff
console.warn(`Transient error, retrying in ${delay}ms...`)
} else {
clearInterval(timeout)
reject(new Error(`Max retries reached. Error: ${error.message}`))
}
}

// Stop the loop if maximum wait time is exceeded
if (Date.now() - startTime > maxWaitTime) {
clearInterval(timeout)
reject(
new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}`)
)
reject(new Error('Timeout waiting for thread to finish.'))
}
}, 500)
}, delay)
})
}

Expand Down

0 comments on commit ebc4641

Please sign in to comment.