From 9a25bf57a93f77074bdb084921f0591dbe0b44e1 Mon Sep 17 00:00:00 2001 From: Henry Date: Sun, 12 Jan 2025 01:14:11 +0000 Subject: [PATCH] update multer s3 implementation --- packages/components/src/storageUtils.ts | 103 ++++++++++++++++++ .../openai-assistants-vector-store/index.ts | 2 +- .../controllers/openai-assistants/index.ts | 2 +- packages/server/src/index.ts | 14 +++ .../server/src/routes/attachments/index.ts | 7 +- .../server/src/routes/documentstore/index.ts | 6 +- .../routes/openai-assistants-files/index.ts | 6 +- .../openai-assistants-vector-store/index.ts | 6 +- .../server/src/routes/predictions/index.ts | 12 +- packages/server/src/routes/vectors/index.ts | 9 +- .../src/services/documentstore/index.ts | 9 +- .../openai-assistants-vector-store/index.ts | 7 +- .../src/services/openai-assistants/index.ts | 7 +- packages/server/src/utils/buildChatflow.ts | 9 +- packages/server/src/utils/createAttachment.ts | 14 ++- packages/server/src/utils/index.ts | 21 ---- packages/server/src/utils/telemetry.ts | 3 +- packages/server/src/utils/upsertVector.ts | 15 ++- 18 files changed, 178 insertions(+), 74 deletions(-) diff --git a/packages/components/src/storageUtils.ts b/packages/components/src/storageUtils.ts index 2b7146cde02..8f72f9a590b 100644 --- a/packages/components/src/storageUtils.ts +++ b/packages/components/src/storageUtils.ts @@ -11,6 +11,64 @@ import { import { Readable } from 'node:stream' import { getUserHome } from './utils' import sanitize from 'sanitize-filename' +import multer from 'multer' +const multerS3 = require('multer-s3') + +/** + * Get user settings file + * TODO: move env variables to settings json file, easier configuration + */ +export const getUserSettingsFilePath = () => { + if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json') + const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')] + for (const checkPath of checkPaths) { + if (fs.existsSync(checkPath)) { + return checkPath + } + } + return '' +} + +export const getOrgId = () => { + const settingsContent = fs.readFileSync(getUserSettingsFilePath(), 'utf8') + try { + const settings = JSON.parse(settingsContent) + return settings.instanceId + } catch (error) { + return '' + } +} + +const getUploadPath = (): string => { + return process.env.BLOB_STORAGE_PATH + ? path.join(process.env.BLOB_STORAGE_PATH, 'uploads', getOrgId()) + : path.join(getUserHome(), '.flowise', 'uploads', getOrgId()) +} + +export const getMulterStorage = () => { + const storageType = getStorageType() + + if (storageType === 's3') { + const s3Client = getS3Config().s3Client + const Bucket = getS3Config().Bucket + + const upload = multer({ + storage: multerS3({ + s3: s3Client, + bucket: Bucket, + metadata: function (req: Request, file: Express.Multer.File, cb: (error: any, metadata: any) => void) { + cb(null, { fieldName: file.fieldname, originalName: file.originalname, orgId: getOrgId() }) + }, + key: function (req: Request, file: Express.Multer.File, cb: (error: any, metadata: any) => void) { + cb(null, `${getOrgId()}/${Date.now().toString()}`) + } + }) + }) + return upload + } else { + return multer({ dest: getUploadPath() }) + } +} export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: string, fileNames: string[]) => { const storageType = getStorageType() @@ -120,6 +178,37 @@ export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName: } } +export const getFileFromUpload = async (filePath: string): Promise => { + const storageType = getStorageType() + if (storageType === 's3') { + const { s3Client, Bucket } = getS3Config() + + let Key = filePath + // remove the first '/' if it exists + if (Key.startsWith('/')) { + Key = Key.substring(1) + } + const getParams = { + Bucket, + Key + } + + const response = await s3Client.send(new GetObjectCommand(getParams)) + const body = response.Body + if (body instanceof Readable) { + const streamToString = await body.transformToString('base64') + if (streamToString) { + return Buffer.from(streamToString, 'base64') + } + } + // @ts-ignore + const buffer = Buffer.concat(response.Body.toArray()) + return buffer + } else { + return fs.readFileSync(filePath) + } +} + export const getFileFromStorage = async (file: string, ...paths: string[]): Promise => { const storageType = getStorageType() const sanitizedFilename = _sanitizeFilename(file) @@ -183,6 +272,20 @@ export const removeFilesFromStorage = async (...paths: string[]) => { } } +export const removeSpecificFileFromUpload = async (filePath: string) => { + const storageType = getStorageType() + if (storageType === 's3') { + let Key = filePath + // remove the first '/' if it exists + if (Key.startsWith('/')) { + Key = Key.substring(1) + } + await _deleteS3Folder(Key) + } else { + fs.unlinkSync(filePath) + } +} + export const removeSpecificFileFromStorage = async (...paths: string[]) => { const storageType = getStorageType() if (storageType === 's3') { diff --git a/packages/server/src/controllers/openai-assistants-vector-store/index.ts b/packages/server/src/controllers/openai-assistants-vector-store/index.ts index 2b82ca0d078..f2216992260 100644 --- a/packages/server/src/controllers/openai-assistants-vector-store/index.ts +++ b/packages/server/src/controllers/openai-assistants-vector-store/index.ts @@ -143,7 +143,7 @@ const uploadFilesToAssistantVectorStore = async (req: Request, res: Response, ne // Address file name with special characters: https://github.com/expressjs/multer/issues/1104 file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8') uploadFiles.push({ - filePath: file.path, + filePath: file.path ?? file.key, fileName: file.originalname }) } diff --git a/packages/server/src/controllers/openai-assistants/index.ts b/packages/server/src/controllers/openai-assistants/index.ts index 94c2afe69e7..1b516af8c03 100644 --- a/packages/server/src/controllers/openai-assistants/index.ts +++ b/packages/server/src/controllers/openai-assistants/index.ts @@ -84,7 +84,7 @@ const uploadAssistantFiles = async (req: Request, res: Response, next: NextFunct // Address file name with special characters: https://github.com/expressjs/multer/issues/1104 file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8') uploadFiles.push({ - filePath: file.path, + filePath: file.path ?? file.key, fileName: file.originalname }) } diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 1ed6a749fb8..1d163d6b3bb 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -33,6 +33,20 @@ declare global { interface Request { io?: Server } + namespace Multer { + interface File { + bucket: string + key: string + acl: string + contentType: string + contentDisposition: null + storageClass: string + serverSideEncryption: null + metadata: any + location: string + etag: string + } + } } } diff --git a/packages/server/src/routes/attachments/index.ts b/packages/server/src/routes/attachments/index.ts index a4223781f72..99d07714c86 100644 --- a/packages/server/src/routes/attachments/index.ts +++ b/packages/server/src/routes/attachments/index.ts @@ -1,13 +1,10 @@ import express from 'express' -import multer from 'multer' import attachmentsController from '../../controllers/attachments' -import { getUploadPath } from '../../utils' +import { getMulterStorage } from 'flowise-components' const router = express.Router() -const upload = multer({ dest: getUploadPath() }) - // CREATE -router.post('/:chatflowId/:chatId', upload.array('files'), attachmentsController.createAttachment) +router.post('/:chatflowId/:chatId', getMulterStorage().array('files'), attachmentsController.createAttachment) export default router diff --git a/packages/server/src/routes/documentstore/index.ts b/packages/server/src/routes/documentstore/index.ts index b53ceb3c5cc..3e7e9b4f42d 100644 --- a/packages/server/src/routes/documentstore/index.ts +++ b/packages/server/src/routes/documentstore/index.ts @@ -1,12 +1,10 @@ import express from 'express' -import multer from 'multer' -import { getUploadPath } from '../../utils' import documentStoreController from '../../controllers/documentstore' +import { getMulterStorage } from 'flowise-components' const router = express.Router() -const upload = multer({ dest: getUploadPath() }) -router.post(['/upsert/', '/upsert/:id'], upload.array('files'), documentStoreController.upsertDocStoreMiddleware) +router.post(['/upsert/', '/upsert/:id'], getMulterStorage().array('files'), documentStoreController.upsertDocStoreMiddleware) router.post(['/refresh/', '/refresh/:id'], documentStoreController.refreshDocStoreMiddleware) diff --git a/packages/server/src/routes/openai-assistants-files/index.ts b/packages/server/src/routes/openai-assistants-files/index.ts index d717754715f..40ab30fce6d 100644 --- a/packages/server/src/routes/openai-assistants-files/index.ts +++ b/packages/server/src/routes/openai-assistants-files/index.ts @@ -1,12 +1,10 @@ import express from 'express' -import multer from 'multer' import openaiAssistantsController from '../../controllers/openai-assistants' -import { getUploadPath } from '../../utils' +import { getMulterStorage } from 'flowise-components' const router = express.Router() -const upload = multer({ dest: getUploadPath() }) router.post('/download/', openaiAssistantsController.getFileFromAssistant) -router.post('/upload/', upload.array('files'), openaiAssistantsController.uploadAssistantFiles) +router.post('/upload/', getMulterStorage().array('files'), openaiAssistantsController.uploadAssistantFiles) export default router diff --git a/packages/server/src/routes/openai-assistants-vector-store/index.ts b/packages/server/src/routes/openai-assistants-vector-store/index.ts index a6721566c12..0db457befb4 100644 --- a/packages/server/src/routes/openai-assistants-vector-store/index.ts +++ b/packages/server/src/routes/openai-assistants-vector-store/index.ts @@ -1,10 +1,8 @@ import express from 'express' -import multer from 'multer' import openaiAssistantsVectorStoreController from '../../controllers/openai-assistants-vector-store' -import { getUploadPath } from '../../utils' +import { getMulterStorage } from 'flowise-components' const router = express.Router() -const upload = multer({ dest: getUploadPath() }) // CREATE router.post('/', openaiAssistantsVectorStoreController.createAssistantVectorStore) @@ -22,7 +20,7 @@ router.put(['/', '/:id'], openaiAssistantsVectorStoreController.updateAssistantV router.delete(['/', '/:id'], openaiAssistantsVectorStoreController.deleteAssistantVectorStore) // POST -router.post('/:id', upload.array('files'), openaiAssistantsVectorStoreController.uploadFilesToAssistantVectorStore) +router.post('/:id', getMulterStorage().array('files'), openaiAssistantsVectorStoreController.uploadFilesToAssistantVectorStore) // DELETE router.patch(['/', '/:id'], openaiAssistantsVectorStoreController.deleteFilesFromAssistantVectorStore) diff --git a/packages/server/src/routes/predictions/index.ts b/packages/server/src/routes/predictions/index.ts index ca192c89fb2..e92843307c2 100644 --- a/packages/server/src/routes/predictions/index.ts +++ b/packages/server/src/routes/predictions/index.ts @@ -1,13 +1,15 @@ import express from 'express' -import multer from 'multer' import predictionsController from '../../controllers/predictions' -import { getUploadPath } from '../../utils' +import { getMulterStorage } from 'flowise-components' const router = express.Router() -const upload = multer({ dest: getUploadPath() }) - // CREATE -router.post(['/', '/:id'], upload.array('files'), predictionsController.getRateLimiterMiddleware, predictionsController.createPrediction) +router.post( + ['/', '/:id'], + getMulterStorage().array('files'), + predictionsController.getRateLimiterMiddleware, + predictionsController.createPrediction +) export default router diff --git a/packages/server/src/routes/vectors/index.ts b/packages/server/src/routes/vectors/index.ts index 060d7e4b34e..fa66fc27a57 100644 --- a/packages/server/src/routes/vectors/index.ts +++ b/packages/server/src/routes/vectors/index.ts @@ -1,19 +1,16 @@ import express from 'express' -import multer from 'multer' import vectorsController from '../../controllers/vectors' -import { getUploadPath } from '../../utils' +import { getMulterStorage } from 'flowise-components' const router = express.Router() -const upload = multer({ dest: getUploadPath() }) - // CREATE router.post( ['/upsert/', '/upsert/:id'], - upload.array('files'), + getMulterStorage().array('files'), vectorsController.getRateLimiterMiddleware, vectorsController.upsertVectorMiddleware ) -router.post(['/internal-upsert/', '/internal-upsert/:id'], upload.array('files'), vectorsController.createInternalUpsert) +router.post(['/internal-upsert/', '/internal-upsert/:id'], getMulterStorage().array('files'), vectorsController.createInternalUpsert) export default router diff --git a/packages/server/src/services/documentstore/index.ts b/packages/server/src/services/documentstore/index.ts index db2ecf322a2..f5bdf0e5ac5 100644 --- a/packages/server/src/services/documentstore/index.ts +++ b/packages/server/src/services/documentstore/index.ts @@ -1,17 +1,18 @@ import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { DocumentStore } from '../../database/entities/DocumentStore' -import * as fs from 'fs' import * as path from 'path' import { addArrayFilesToStorage, addSingleFileToStorage, getFileFromStorage, + getFileFromUpload, ICommonObject, IDocument, mapExtToInputField, mapMimeTypeToInputField, removeFilesFromStorage, - removeSpecificFileFromStorage + removeSpecificFileFromStorage, + removeSpecificFileFromUpload } from 'flowise-components' import { addLoaderSource, @@ -1441,7 +1442,7 @@ const upsertDocStoreMiddleware = async ( const filesLoaderConfig: ICommonObject = {} for (const file of files) { const fileNames: string[] = [] - const fileBuffer = fs.readFileSync(file.path) + const fileBuffer = await getFileFromUpload(file.path ?? file.key) // Address file name with special characters: https://github.com/expressjs/multer/issues/1104 file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8') @@ -1481,7 +1482,7 @@ const upsertDocStoreMiddleware = async ( filesLoaderConfig[fileInputField] = JSON.stringify([storagePath]) } - fs.unlinkSync(file.path) + await removeSpecificFileFromUpload(file.path ?? file.key) } loaderConfig = { diff --git a/packages/server/src/services/openai-assistants-vector-store/index.ts b/packages/server/src/services/openai-assistants-vector-store/index.ts index 46f9c183f8e..671e18d9098 100644 --- a/packages/server/src/services/openai-assistants-vector-store/index.ts +++ b/packages/server/src/services/openai-assistants-vector-store/index.ts @@ -1,11 +1,11 @@ import OpenAI from 'openai' import { StatusCodes } from 'http-status-codes' -import fs from 'fs' import { Credential } from '../../database/entities/Credential' import { InternalFlowiseError } from '../../errors/internalFlowiseError' import { getErrorMessage } from '../../errors/utils' import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { decryptCredentialData } from '../../utils' +import { getFileFromUpload, removeSpecificFileFromUpload } from 'flowise-components' const getAssistantVectorStore = async (credentialId: string, vectorStoreId: string) => { try { @@ -178,13 +178,14 @@ const uploadFilesToAssistantVectorStore = async ( const openai = new OpenAI({ apiKey: openAIApiKey }) const uploadedFiles = [] for (const file of files) { - const toFile = await OpenAI.toFile(fs.readFileSync(file.filePath), file.fileName) + const fileBuffer = await getFileFromUpload(file.filePath) + const toFile = await OpenAI.toFile(fileBuffer, file.fileName) const createdFile = await openai.files.create({ file: toFile, purpose: 'assistants' }) uploadedFiles.push(createdFile) - fs.unlinkSync(file.filePath) + await removeSpecificFileFromUpload(file.filePath) } const file_ids = [...uploadedFiles.map((file) => file.id)] diff --git a/packages/server/src/services/openai-assistants/index.ts b/packages/server/src/services/openai-assistants/index.ts index c908a546755..e842b04565b 100644 --- a/packages/server/src/services/openai-assistants/index.ts +++ b/packages/server/src/services/openai-assistants/index.ts @@ -1,11 +1,11 @@ import OpenAI from 'openai' -import fs from 'fs' import { StatusCodes } from 'http-status-codes' import { decryptCredentialData } from '../../utils' import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { Credential } from '../../database/entities/Credential' import { InternalFlowiseError } from '../../errors/internalFlowiseError' import { getErrorMessage } from '../../errors/utils' +import { getFileFromUpload, removeSpecificFileFromUpload } from 'flowise-components' // ---------------------------------------- // Assistants @@ -101,13 +101,14 @@ const uploadFilesToAssistant = async (credentialId: string, files: { filePath: s const uploadedFiles = [] for (const file of files) { - const toFile = await OpenAI.toFile(fs.readFileSync(file.filePath), file.fileName) + const fileBuffer = await getFileFromUpload(file.filePath) + const toFile = await OpenAI.toFile(fileBuffer, file.fileName) const createdFile = await openai.files.create({ file: toFile, purpose: 'assistants' }) uploadedFiles.push(createdFile) - fs.unlinkSync(file.filePath) + await removeSpecificFileFromUpload(file.filePath) } return uploadedFiles diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index 09cc847439a..6dc135f01b3 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -9,7 +9,9 @@ import { mapMimeTypeToInputField, mapExtToInputField, generateFollowUpPrompts, - IServerSideEventStreamer + IServerSideEventStreamer, + getFileFromUpload, + removeSpecificFileFromUpload } from 'flowise-components' import { StatusCodes } from 'http-status-codes' import { @@ -49,7 +51,6 @@ import { validateChatflowAPIKey } from './validateKey' import { databaseEntities } from '.' import { v4 as uuidv4 } from 'uuid' import { omit } from 'lodash' -import * as fs from 'fs' import logger from './logger' import { utilAddChatMessage } from './addChatMesage' import { buildAgentGraph } from './buildAgentGraph' @@ -162,7 +163,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals const overrideConfig: ICommonObject = { ...req.body } const fileNames: string[] = [] for (const file of files) { - const fileBuffer = fs.readFileSync(file.path) + const fileBuffer = await getFileFromUpload(file.path ?? file.key) // Address file name with special characters: https://github.com/expressjs/multer/issues/1104 file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8') const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid) @@ -195,7 +196,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals overrideConfig[fileInputField] = storagePath } - fs.unlinkSync(file.path) + await removeSpecificFileFromUpload(file.path ?? file.key) } if (overrideConfig.vars && typeof overrideConfig.vars === 'string') { overrideConfig.vars = JSON.parse(overrideConfig.vars) diff --git a/packages/server/src/utils/createAttachment.ts b/packages/server/src/utils/createAttachment.ts index 94b161ce3ca..3a2e691a80d 100644 --- a/packages/server/src/utils/createAttachment.ts +++ b/packages/server/src/utils/createAttachment.ts @@ -1,7 +1,13 @@ import { Request } from 'express' import * as path from 'path' -import * as fs from 'fs' -import { addArrayFilesToStorage, IDocument, mapExtToInputField, mapMimeTypeToInputField } from 'flowise-components' +import { + addArrayFilesToStorage, + getFileFromUpload, + IDocument, + mapExtToInputField, + mapMimeTypeToInputField, + removeSpecificFileFromUpload +} from 'flowise-components' import { getRunningExpressApp } from './getRunningExpressApp' import { getErrorMessage } from '../errors/utils' @@ -41,7 +47,7 @@ export const createFileAttachment = async (req: Request) => { if (files.length) { const isBase64 = req.body.base64 for (const file of files) { - const fileBuffer = fs.readFileSync(file.path) + const fileBuffer = await getFileFromUpload(file.path ?? file.key) const fileNames: string[] = [] // Address file name with special characters: https://github.com/expressjs/multer/issues/1104 @@ -63,7 +69,7 @@ export const createFileAttachment = async (req: Request) => { fileInputField = fileInputFieldFromExt } - fs.unlinkSync(file.path) + await removeSpecificFileFromUpload(file.path ?? file.key) try { const nodeData = { diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 976ffb9e90d..d50012a6fd1 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -1690,21 +1690,6 @@ export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEd return { nodes: nodeData, edges: edgeData } } -/** - * Get user settings file - * TODO: move env variables to settings json file, easier configuration - */ -export const getUserSettingsFilePath = () => { - if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json') - const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')] - for (const checkPath of checkPaths) { - if (fs.existsSync(checkPath)) { - return checkPath - } - } - return '' -} - /** * Get app current version */ @@ -1773,9 +1758,3 @@ export const getAPIOverrideConfig = (chatflow: IChatFlow) => { return { nodeOverrides: {}, variableOverrides: [], apiOverrideStatus: false } } } - -export const getUploadPath = (): string => { - return process.env.BLOB_STORAGE_PATH - ? path.join(process.env.BLOB_STORAGE_PATH, 'uploads') - : path.join(getUserHome(), '.flowise', 'uploads') -} diff --git a/packages/server/src/utils/telemetry.ts b/packages/server/src/utils/telemetry.ts index 4b033f2095b..07938f9f593 100644 --- a/packages/server/src/utils/telemetry.ts +++ b/packages/server/src/utils/telemetry.ts @@ -2,7 +2,8 @@ import { v4 as uuidv4 } from 'uuid' import { PostHog } from 'posthog-node' import path from 'path' import fs from 'fs' -import { getUserHome, getUserSettingsFilePath } from '.' +import { getUserHome } from '.' +import { getUserSettingsFilePath } from 'flowise-components' export class Telemetry { postHog?: PostHog diff --git a/packages/server/src/utils/upsertVector.ts b/packages/server/src/utils/upsertVector.ts index d92ab41d357..0bcee73ce0f 100644 --- a/packages/server/src/utils/upsertVector.ts +++ b/packages/server/src/utils/upsertVector.ts @@ -1,8 +1,15 @@ import { Request } from 'express' -import * as fs from 'fs' import * as path from 'path' import { cloneDeep, omit } from 'lodash' -import { ICommonObject, IMessage, addArrayFilesToStorage, mapMimeTypeToInputField, mapExtToInputField } from 'flowise-components' +import { + ICommonObject, + IMessage, + addArrayFilesToStorage, + mapMimeTypeToInputField, + mapExtToInputField, + getFileFromUpload, + removeSpecificFileFromUpload +} from 'flowise-components' import logger from '../utils/logger' import { buildFlow, @@ -57,7 +64,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) => const overrideConfig: ICommonObject = { ...req.body } for (const file of files) { const fileNames: string[] = [] - const fileBuffer = fs.readFileSync(file.path) + const fileBuffer = await getFileFromUpload(file.path ?? file.key) // Address file name with special characters: https://github.com/expressjs/multer/issues/1104 file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8') const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid) @@ -90,7 +97,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) => overrideConfig[fileInputField] = storagePath } - fs.unlinkSync(file.path) + await removeSpecificFileFromUpload(file.path ?? file.key) } if (overrideConfig.vars && typeof overrideConfig.vars === 'string') { overrideConfig.vars = JSON.parse(overrideConfig.vars)