Skip to content

Commit

Permalink
update multer s3 implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryHengZJ committed Jan 12, 2025
1 parent 208dfc1 commit 9a25bf5
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 74 deletions.
103 changes: 103 additions & 0 deletions packages/components/src/storageUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,64 @@ import {
import { Readable } from 'node:stream'
import { getUserHome } from './utils'
import sanitize from 'sanitize-filename'
import multer from 'multer'
const multerS3 = require('multer-s3')

/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json')
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}

export const getOrgId = () => {
const settingsContent = fs.readFileSync(getUserSettingsFilePath(), 'utf8')
try {
const settings = JSON.parse(settingsContent)
return settings.instanceId
} catch (error) {
return ''
}
}

const getUploadPath = (): string => {
return process.env.BLOB_STORAGE_PATH
? path.join(process.env.BLOB_STORAGE_PATH, 'uploads', getOrgId())
: path.join(getUserHome(), '.flowise', 'uploads', getOrgId())
}

export const getMulterStorage = () => {
const storageType = getStorageType()

if (storageType === 's3') {
const s3Client = getS3Config().s3Client
const Bucket = getS3Config().Bucket

const upload = multer({
storage: multerS3({
s3: s3Client,
bucket: Bucket,
metadata: function (req: Request, file: Express.Multer.File, cb: (error: any, metadata: any) => void) {
cb(null, { fieldName: file.fieldname, originalName: file.originalname, orgId: getOrgId() })
},
key: function (req: Request, file: Express.Multer.File, cb: (error: any, metadata: any) => void) {
cb(null, `${getOrgId()}/${Date.now().toString()}`)
}
})
})
return upload
} else {
return multer({ dest: getUploadPath() })
}
}

export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: string, fileNames: string[]) => {
const storageType = getStorageType()
Expand Down Expand Up @@ -120,6 +178,37 @@ export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName:
}
}

export const getFileFromUpload = async (filePath: string): Promise<Buffer> => {
const storageType = getStorageType()
if (storageType === 's3') {
const { s3Client, Bucket } = getS3Config()

let Key = filePath
// remove the first '/' if it exists
if (Key.startsWith('/')) {
Key = Key.substring(1)
}
const getParams = {
Bucket,
Key
}

const response = await s3Client.send(new GetObjectCommand(getParams))
const body = response.Body
if (body instanceof Readable) {
const streamToString = await body.transformToString('base64')
if (streamToString) {
return Buffer.from(streamToString, 'base64')
}
}
// @ts-ignore
const buffer = Buffer.concat(response.Body.toArray())
return buffer
} else {
return fs.readFileSync(filePath)
}
}

export const getFileFromStorage = async (file: string, ...paths: string[]): Promise<Buffer> => {
const storageType = getStorageType()
const sanitizedFilename = _sanitizeFilename(file)
Expand Down Expand Up @@ -183,6 +272,20 @@ export const removeFilesFromStorage = async (...paths: string[]) => {
}
}

export const removeSpecificFileFromUpload = async (filePath: string) => {
const storageType = getStorageType()
if (storageType === 's3') {
let Key = filePath
// remove the first '/' if it exists
if (Key.startsWith('/')) {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else {
fs.unlinkSync(filePath)
}
}

export const removeSpecificFileFromStorage = async (...paths: string[]) => {
const storageType = getStorageType()
if (storageType === 's3') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ const uploadFilesToAssistantVectorStore = async (req: Request, res: Response, ne
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')
uploadFiles.push({
filePath: file.path,
filePath: file.path ?? file.key,
fileName: file.originalname
})
}
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/controllers/openai-assistants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ const uploadAssistantFiles = async (req: Request, res: Response, next: NextFunct
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')
uploadFiles.push({
filePath: file.path,
filePath: file.path ?? file.key,
fileName: file.originalname
})
}
Expand Down
14 changes: 14 additions & 0 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ declare global {
interface Request {
io?: Server
}
namespace Multer {
interface File {
bucket: string
key: string
acl: string
contentType: string
contentDisposition: null
storageClass: string
serverSideEncryption: null
metadata: any
location: string
etag: string
}
}
}
}

Expand Down
7 changes: 2 additions & 5 deletions packages/server/src/routes/attachments/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import express from 'express'
import multer from 'multer'
import attachmentsController from '../../controllers/attachments'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()

const upload = multer({ dest: getUploadPath() })

// CREATE
router.post('/:chatflowId/:chatId', upload.array('files'), attachmentsController.createAttachment)
router.post('/:chatflowId/:chatId', getMulterStorage().array('files'), attachmentsController.createAttachment)

export default router
6 changes: 2 additions & 4 deletions packages/server/src/routes/documentstore/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import express from 'express'
import multer from 'multer'
import { getUploadPath } from '../../utils'
import documentStoreController from '../../controllers/documentstore'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()
const upload = multer({ dest: getUploadPath() })

router.post(['/upsert/', '/upsert/:id'], upload.array('files'), documentStoreController.upsertDocStoreMiddleware)
router.post(['/upsert/', '/upsert/:id'], getMulterStorage().array('files'), documentStoreController.upsertDocStoreMiddleware)

router.post(['/refresh/', '/refresh/:id'], documentStoreController.refreshDocStoreMiddleware)

Expand Down
6 changes: 2 additions & 4 deletions packages/server/src/routes/openai-assistants-files/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import express from 'express'
import multer from 'multer'
import openaiAssistantsController from '../../controllers/openai-assistants'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()
const upload = multer({ dest: getUploadPath() })

router.post('/download/', openaiAssistantsController.getFileFromAssistant)
router.post('/upload/', upload.array('files'), openaiAssistantsController.uploadAssistantFiles)
router.post('/upload/', getMulterStorage().array('files'), openaiAssistantsController.uploadAssistantFiles)

export default router
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import express from 'express'
import multer from 'multer'
import openaiAssistantsVectorStoreController from '../../controllers/openai-assistants-vector-store'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()
const upload = multer({ dest: getUploadPath() })

// CREATE
router.post('/', openaiAssistantsVectorStoreController.createAssistantVectorStore)
Expand All @@ -22,7 +20,7 @@ router.put(['/', '/:id'], openaiAssistantsVectorStoreController.updateAssistantV
router.delete(['/', '/:id'], openaiAssistantsVectorStoreController.deleteAssistantVectorStore)

// POST
router.post('/:id', upload.array('files'), openaiAssistantsVectorStoreController.uploadFilesToAssistantVectorStore)
router.post('/:id', getMulterStorage().array('files'), openaiAssistantsVectorStoreController.uploadFilesToAssistantVectorStore)

// DELETE
router.patch(['/', '/:id'], openaiAssistantsVectorStoreController.deleteFilesFromAssistantVectorStore)
Expand Down
12 changes: 7 additions & 5 deletions packages/server/src/routes/predictions/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import express from 'express'
import multer from 'multer'
import predictionsController from '../../controllers/predictions'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()

const upload = multer({ dest: getUploadPath() })

// CREATE
router.post(['/', '/:id'], upload.array('files'), predictionsController.getRateLimiterMiddleware, predictionsController.createPrediction)
router.post(
['/', '/:id'],
getMulterStorage().array('files'),
predictionsController.getRateLimiterMiddleware,
predictionsController.createPrediction
)

export default router
9 changes: 3 additions & 6 deletions packages/server/src/routes/vectors/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import express from 'express'
import multer from 'multer'
import vectorsController from '../../controllers/vectors'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()

const upload = multer({ dest: getUploadPath() })

// CREATE
router.post(
['/upsert/', '/upsert/:id'],
upload.array('files'),
getMulterStorage().array('files'),
vectorsController.getRateLimiterMiddleware,
vectorsController.upsertVectorMiddleware
)
router.post(['/internal-upsert/', '/internal-upsert/:id'], upload.array('files'), vectorsController.createInternalUpsert)
router.post(['/internal-upsert/', '/internal-upsert/:id'], getMulterStorage().array('files'), vectorsController.createInternalUpsert)

export default router
9 changes: 5 additions & 4 deletions packages/server/src/services/documentstore/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { DocumentStore } from '../../database/entities/DocumentStore'
import * as fs from 'fs'
import * as path from 'path'
import {
addArrayFilesToStorage,
addSingleFileToStorage,
getFileFromStorage,
getFileFromUpload,
ICommonObject,
IDocument,
mapExtToInputField,
mapMimeTypeToInputField,
removeFilesFromStorage,
removeSpecificFileFromStorage
removeSpecificFileFromStorage,
removeSpecificFileFromUpload
} from 'flowise-components'
import {
addLoaderSource,
Expand Down Expand Up @@ -1441,7 +1442,7 @@ const upsertDocStoreMiddleware = async (
const filesLoaderConfig: ICommonObject = {}
for (const file of files) {
const fileNames: string[] = []
const fileBuffer = fs.readFileSync(file.path)
const fileBuffer = await getFileFromUpload(file.path ?? file.key)
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')

Expand Down Expand Up @@ -1481,7 +1482,7 @@ const upsertDocStoreMiddleware = async (
filesLoaderConfig[fileInputField] = JSON.stringify([storagePath])
}

fs.unlinkSync(file.path)
await removeSpecificFileFromUpload(file.path ?? file.key)
}

loaderConfig = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import OpenAI from 'openai'
import { StatusCodes } from 'http-status-codes'
import fs from 'fs'
import { Credential } from '../../database/entities/Credential'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { getErrorMessage } from '../../errors/utils'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { decryptCredentialData } from '../../utils'
import { getFileFromUpload, removeSpecificFileFromUpload } from 'flowise-components'

const getAssistantVectorStore = async (credentialId: string, vectorStoreId: string) => {
try {
Expand Down Expand Up @@ -178,13 +178,14 @@ const uploadFilesToAssistantVectorStore = async (
const openai = new OpenAI({ apiKey: openAIApiKey })
const uploadedFiles = []
for (const file of files) {
const toFile = await OpenAI.toFile(fs.readFileSync(file.filePath), file.fileName)
const fileBuffer = await getFileFromUpload(file.filePath)
const toFile = await OpenAI.toFile(fileBuffer, file.fileName)
const createdFile = await openai.files.create({
file: toFile,
purpose: 'assistants'
})
uploadedFiles.push(createdFile)
fs.unlinkSync(file.filePath)
await removeSpecificFileFromUpload(file.filePath)
}

const file_ids = [...uploadedFiles.map((file) => file.id)]
Expand Down
7 changes: 4 additions & 3 deletions packages/server/src/services/openai-assistants/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import OpenAI from 'openai'
import fs from 'fs'
import { StatusCodes } from 'http-status-codes'
import { decryptCredentialData } from '../../utils'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { Credential } from '../../database/entities/Credential'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { getErrorMessage } from '../../errors/utils'
import { getFileFromUpload, removeSpecificFileFromUpload } from 'flowise-components'

// ----------------------------------------
// Assistants
Expand Down Expand Up @@ -101,13 +101,14 @@ const uploadFilesToAssistant = async (credentialId: string, files: { filePath: s
const uploadedFiles = []

for (const file of files) {
const toFile = await OpenAI.toFile(fs.readFileSync(file.filePath), file.fileName)
const fileBuffer = await getFileFromUpload(file.filePath)
const toFile = await OpenAI.toFile(fileBuffer, file.fileName)
const createdFile = await openai.files.create({
file: toFile,
purpose: 'assistants'
})
uploadedFiles.push(createdFile)
fs.unlinkSync(file.filePath)
await removeSpecificFileFromUpload(file.filePath)
}

return uploadedFiles
Expand Down
Loading

0 comments on commit 9a25bf5

Please sign in to comment.