Skip to content

Commit 26444ac

Browse files
Feature/sse (#3125)
* Base changes for ServerSide Events (instead of socket.io) * lint fixes * adding of interface and separate methods for streaming events * lint * first draft, handles both internal and external prediction end points. * lint fixes * additional internal end point for streaming and associated changes * return streamresponse as true to build agent flow * 1) JSON formatting for internal events 2) other fixes * 1) convert internal event to metadata to maintain consistency with external response * fix action and metadata streaming * fix for error when agent flow is aborted * prevent subflows from streaming and other code cleanup * prevent streaming from enclosed tools * add fix for preventing chaintool streaming * update lock file * add open when hidden to sse * Streaming errors * Streaming errors * add fix for showing error message --------- Co-authored-by: Henry <[email protected]>
1 parent 7a4c7ef commit 26444ac

File tree

47 files changed

+1021
-327
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1021
-327
lines changed

packages/components/nodes/agents/AirtableAgent/AirtableAgent.ts

+10-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import axios from 'axios'
22
import { BaseLanguageModel } from '@langchain/core/language_models/base'
33
import { AgentExecutor } from 'langchain/agents'
44
import { LLMChain } from 'langchain/chains'
5-
import { ICommonObject, INode, INodeData, INodeParams, PromptTemplate } from '../../../src/Interface'
5+
import { ICommonObject, INode, INodeData, INodeParams, IServerSideEventStreamer, PromptTemplate } from '../../../src/Interface'
66
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
77
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
88
import { LoadPyodide, finalSystemPrompt, systemPrompt } from './core'
@@ -104,11 +104,17 @@ class Airtable_Agents implements INode {
104104
input = await checkInputs(moderations, input)
105105
} catch (e) {
106106
await new Promise((resolve) => setTimeout(resolve, 500))
107-
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
107+
// if (options.shouldStreamResponse) {
108+
// streamResponse(options.sseStreamer, options.chatId, e.message)
109+
// }
108110
return formatResponse(e.message)
109111
}
110112
}
111113

114+
const shouldStreamResponse = options.shouldStreamResponse
115+
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
116+
const chatId = options.chatId
117+
112118
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
113119
const accessToken = getCredentialParam('accessToken', credentialData, nodeData)
114120

@@ -123,7 +129,6 @@ class Airtable_Agents implements INode {
123129
let base64String = Buffer.from(JSON.stringify(airtableData)).toString('base64')
124130

125131
const loggerHandler = new ConsoleCallbackHandler(options.logger)
126-
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
127132
const callbacks = await additionalCallbacks(nodeData, options)
128133

129134
const pyodide = await LoadPyodide()
@@ -194,7 +199,8 @@ json.dumps(my_dict)`
194199
answer: finalResult
195200
}
196201

197-
if (options.socketIO && options.socketIOClientId) {
202+
if (options.shouldStreamResponse) {
203+
const handler = new CustomChainHandler(shouldStreamResponse ? sseStreamer : undefined, chatId)
198204
const result = await chain.call(inputs, [loggerHandler, handler, ...callbacks])
199205
return result?.text
200206
} else {

packages/components/nodes/agents/AutoGPT/AutoGPT.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ class AutoGPT_Agents implements INode {
113113
input = await checkInputs(moderations, input)
114114
} catch (e) {
115115
await new Promise((resolve) => setTimeout(resolve, 500))
116-
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
116+
// if (options.shouldStreamResponse) {
117+
// streamResponse(options.sseStreamer, options.chatId, e.message)
118+
// }
117119
return formatResponse(e.message)
118120
}
119121
}

packages/components/nodes/agents/BabyAGI/BabyAGI.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ class BabyAGI_Agents implements INode {
7373
input = await checkInputs(moderations, input)
7474
} catch (e) {
7575
await new Promise((resolve) => setTimeout(resolve, 500))
76-
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
76+
// if (options.shouldStreamResponse) {
77+
// streamResponse(options.sseStreamer, options.chatId, e.message)
78+
// }
7779
return formatResponse(e.message)
7880
}
7981
}

packages/components/nodes/agents/CSVAgent/CSVAgent.ts

+10-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { BaseLanguageModel } from '@langchain/core/language_models/base'
22
import { AgentExecutor } from 'langchain/agents'
33
import { LLMChain } from 'langchain/chains'
44
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
5-
import { ICommonObject, INode, INodeData, INodeParams, PromptTemplate } from '../../../src/Interface'
5+
import { ICommonObject, INode, INodeData, INodeParams, IServerSideEventStreamer, PromptTemplate } from '../../../src/Interface'
66
import { getBaseClasses } from '../../../src/utils'
77
import { LoadPyodide, finalSystemPrompt, systemPrompt } from './core'
88
import { checkInputs, Moderation } from '../../moderation/Moderation'
@@ -90,13 +90,18 @@ class CSV_Agents implements INode {
9090
input = await checkInputs(moderations, input)
9191
} catch (e) {
9292
await new Promise((resolve) => setTimeout(resolve, 500))
93-
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
93+
// if (options.shouldStreamResponse) {
94+
// streamResponse(options.sseStreamer, options.chatId, e.message)
95+
// }
9496
return formatResponse(e.message)
9597
}
9698
}
9799

98100
const loggerHandler = new ConsoleCallbackHandler(options.logger)
99-
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
101+
const shouldStreamResponse = options.shouldStreamResponse
102+
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
103+
const chatId = options.chatId
104+
100105
const callbacks = await additionalCallbacks(nodeData, options)
101106

102107
let files: string[] = []
@@ -203,7 +208,8 @@ json.dumps(my_dict)`
203208
answer: finalResult
204209
}
205210

206-
if (options.socketIO && options.socketIOClientId) {
211+
if (options.shouldStreamResponse) {
212+
const handler = new CustomChainHandler(shouldStreamResponse ? sseStreamer : undefined, chatId)
207213
const result = await chain.call(inputs, [loggerHandler, handler, ...callbacks])
208214
return result?.text
209215
} else {

packages/components/nodes/agents/ConversationalAgent/ConversationalAgent.ts

+27-7
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,16 @@ import { RunnableSequence } from '@langchain/core/runnables'
99
import { ChatConversationalAgent } from 'langchain/agents'
1010
import { getBaseClasses } from '../../../src/utils'
1111
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
12-
import { IVisionChatModal, FlowiseMemory, ICommonObject, INode, INodeData, INodeParams, IUsedTool } from '../../../src/Interface'
12+
import {
13+
IVisionChatModal,
14+
FlowiseMemory,
15+
ICommonObject,
16+
INode,
17+
INodeData,
18+
INodeParams,
19+
IUsedTool,
20+
IServerSideEventStreamer
21+
} from '../../../src/Interface'
1322
import { AgentExecutor } from '../../../src/agents'
1423
import { addImagesToMessages, llmSupportsVision } from '../../../src/multiModalUtils'
1524
import { checkInputs, Moderation } from '../../moderation/Moderation'
@@ -106,12 +115,18 @@ class ConversationalAgent_Agents implements INode {
106115
const memory = nodeData.inputs?.memory as FlowiseMemory
107116
const moderations = nodeData.inputs?.inputModeration as Moderation[]
108117

118+
const shouldStreamResponse = options.shouldStreamResponse
119+
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
120+
const chatId = options.chatId
109121
if (moderations && moderations.length > 0) {
110122
try {
111123
// Use the output of the moderation chain as input for the BabyAGI agent
112124
input = await checkInputs(moderations, input)
113125
} catch (e) {
114126
await new Promise((resolve) => setTimeout(resolve, 500))
127+
// if (options.shouldStreamResponse) {
128+
// streamResponse(options.sseStreamer, options.chatId, e.message)
129+
// }
115130
//streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
116131
return formatResponse(e.message)
117132
}
@@ -125,15 +140,17 @@ class ConversationalAgent_Agents implements INode {
125140
let sourceDocuments: ICommonObject[] = []
126141
let usedTools: IUsedTool[] = []
127142

128-
if (options.socketIO && options.socketIOClientId) {
129-
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
143+
if (options.shouldStreamResponse) {
144+
const handler = new CustomChainHandler(shouldStreamResponse ? sseStreamer : undefined, chatId)
130145
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
131146
if (res.sourceDocuments) {
132-
options.socketIO.to(options.socketIOClientId).emit('sourceDocuments', flatten(res.sourceDocuments))
147+
if (options.sseStreamer) {
148+
sseStreamer.streamSourceDocumentsEvent(options.chatId, flatten(res.sourceDocuments))
149+
}
133150
sourceDocuments = res.sourceDocuments
134151
}
135152
if (res.usedTools) {
136-
options.socketIO.to(options.socketIOClientId).emit('usedTools', res.usedTools)
153+
sseStreamer.streamUsedToolsEvent(options.chatId, res.usedTools)
137154
usedTools = res.usedTools
138155
}
139156
// If the tool is set to returnDirect, stream the output to the client
@@ -142,11 +159,14 @@ class ConversationalAgent_Agents implements INode {
142159
inputTools = flatten(inputTools)
143160
for (const tool of res.usedTools) {
144161
const inputTool = inputTools.find((inputTool: Tool) => inputTool.name === tool.tool)
145-
if (inputTool && inputTool.returnDirect) {
146-
options.socketIO.to(options.socketIOClientId).emit('token', tool.toolOutput)
162+
if (inputTool && inputTool.returnDirect && options.sseStreamer) {
163+
sseStreamer.streamTokenEvent(options.chatId, tool.toolOutput)
147164
}
148165
}
149166
}
167+
if (sseStreamer) {
168+
sseStreamer.streamEndEvent(options.chatId)
169+
}
150170
} else {
151171
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
152172
if (res.sourceDocuments) {

packages/components/nodes/agents/ConversationalRetrievalToolAgent/ConversationalRetrievalToolAgent.ts

+20-8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,16 @@ import { ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate, Pr
77
import { formatToOpenAIToolMessages } from 'langchain/agents/format_scratchpad/openai_tools'
88
import { getBaseClasses } from '../../../src/utils'
99
import { type ToolsAgentStep } from 'langchain/agents/openai/output_parser'
10-
import { FlowiseMemory, ICommonObject, INode, INodeData, INodeParams, IUsedTool, IVisionChatModal } from '../../../src/Interface'
10+
import {
11+
FlowiseMemory,
12+
ICommonObject,
13+
INode,
14+
INodeData,
15+
INodeParams,
16+
IServerSideEventStreamer,
17+
IUsedTool,
18+
IVisionChatModal
19+
} from '../../../src/Interface'
1120
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
1221
import { AgentExecutor, ToolCallingAgentOutputParser } from '../../../src/agents'
1322
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
@@ -104,16 +113,19 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
104113
const memory = nodeData.inputs?.memory as FlowiseMemory
105114
const moderations = nodeData.inputs?.inputModeration as Moderation[]
106115

107-
const isStreamable = options.socketIO && options.socketIOClientId
116+
const shouldStreamResponse = options.shouldStreamResponse
117+
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
118+
const chatId = options.chatId
108119

109120
if (moderations && moderations.length > 0) {
110121
try {
111122
// Use the output of the moderation chain as input for the OpenAI Function Agent
112123
input = await checkInputs(moderations, input)
113124
} catch (e) {
114125
await new Promise((resolve) => setTimeout(resolve, 500))
115-
if (isStreamable)
116-
streamResponse(options.socketIO && options.socketIOClientId, e.message, options.socketIO, options.socketIOClientId)
126+
if (shouldStreamResponse) {
127+
streamResponse(sseStreamer, chatId, e.message)
128+
}
117129
return formatResponse(e.message)
118130
}
119131
}
@@ -127,15 +139,15 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
127139
let sourceDocuments: ICommonObject[] = []
128140
let usedTools: IUsedTool[] = []
129141

130-
if (isStreamable) {
131-
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
142+
if (shouldStreamResponse) {
143+
const handler = new CustomChainHandler(sseStreamer, chatId)
132144
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
133145
if (res.sourceDocuments) {
134-
options.socketIO.to(options.socketIOClientId).emit('sourceDocuments', flatten(res.sourceDocuments))
146+
sseStreamer.streamSourceDocumentsEvent(chatId, flatten(res.sourceDocuments))
135147
sourceDocuments = res.sourceDocuments
136148
}
137149
if (res.usedTools) {
138-
options.socketIO.to(options.socketIOClientId).emit('usedTools', res.usedTools)
150+
sseStreamer.streamUsedToolsEvent(chatId, res.usedTools)
139151
usedTools = res.usedTools
140152
}
141153
} else {

packages/components/nodes/agents/LlamaIndexAgents/OpenAIToolAgent/OpenAIToolAgent_LlamaIndex.ts

+23-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
11
import { flatten } from 'lodash'
22
import { ChatMessage, OpenAI, OpenAIAgent } from 'llamaindex'
33
import { getBaseClasses } from '../../../../src/utils'
4-
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, IUsedTool } from '../../../../src/Interface'
4+
import {
5+
FlowiseMemory,
6+
ICommonObject,
7+
IMessage,
8+
INode,
9+
INodeData,
10+
INodeParams,
11+
IServerSideEventStreamer,
12+
IUsedTool
13+
} from '../../../../src/Interface'
514

615
class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
716
label: string
@@ -67,7 +76,9 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
6776
let tools = nodeData.inputs?.tools
6877
tools = flatten(tools)
6978

70-
const isStreamingEnabled = options.socketIO && options.socketIOClientId
79+
const shouldStreamResponse = options.shouldStreamResponse
80+
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
81+
const chatId = options.chatId
7182

7283
const chatHistory = [] as ChatMessage[]
7384

@@ -104,7 +115,7 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
104115
let isStreamingStarted = false
105116
const usedTools: IUsedTool[] = []
106117

107-
if (isStreamingEnabled) {
118+
if (shouldStreamResponse) {
108119
const stream = await agent.chat({
109120
message: input,
110121
chatHistory,
@@ -116,7 +127,9 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
116127
text += chunk.response.delta
117128
if (!isStreamingStarted) {
118129
isStreamingStarted = true
119-
options.socketIO.to(options.socketIOClientId).emit('start', chunk.response.delta)
130+
if (sseStreamer) {
131+
sseStreamer.streamStartEvent(chatId, chunk.response.delta)
132+
}
120133
if (chunk.sources.length) {
121134
for (const sourceTool of chunk.sources) {
122135
usedTools.push({
@@ -125,11 +138,14 @@ class OpenAIFunctionAgent_LlamaIndex_Agents implements INode {
125138
toolOutput: sourceTool.output as any
126139
})
127140
}
128-
options.socketIO.to(options.socketIOClientId).emit('usedTools', usedTools)
141+
if (sseStreamer) {
142+
sseStreamer.streamUsedToolsEvent(chatId, usedTools)
143+
}
129144
}
130145
}
131-
132-
options.socketIO.to(options.socketIOClientId).emit('token', chunk.response.delta)
146+
if (sseStreamer) {
147+
sseStreamer.streamTokenEvent(chatId, chunk.response.delta)
148+
}
133149
}
134150
} else {
135151
const response = await agent.chat({ message: input, chatHistory, verbose: process.env.DEBUG === 'true' ? true : false })

0 commit comments

Comments
 (0)