-
Notifications
You must be signed in to change notification settings - Fork 39
replace fs implementation with v3 client #70
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,170 @@ | ||||||
const _ = require('lodash') | ||||||
const path = require('path') | ||||||
import { Readable } from 'stream' | ||||||
import { | ||||||
S3Client, | ||||||
PutObjectCommand, | ||||||
GetObjectCommand, | ||||||
ListObjectsCommand, | ||||||
ListObjectsCommandInput, | ||||||
DeleteObjectCommand, | ||||||
HeadObjectCommand, | ||||||
} from '@aws-sdk/client-s3' | ||||||
|
||||||
const payloadBucketPhysicalName = process.env.KLOTHO_S3_PREFIX + '{{.PayloadsBucketName}}' | ||||||
const targetRegion = process.env['AWS_TARGET_REGION'] | ||||||
|
||||||
const userBucketPath = '/files' | ||||||
|
||||||
const s3Client = new S3Client({ region: targetRegion }) | ||||||
|
||||||
const streamToString = (stream: Readable): Promise<string> => | ||||||
new Promise((resolve, reject) => { | ||||||
const chunks: Uint8Array[] = [] | ||||||
stream.on('data', (chunk) => chunks.push(chunk)) | ||||||
stream.on('error', reject) | ||||||
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))) | ||||||
}) | ||||||
|
||||||
async function getCallParameters(paramKey, dispatcherMode) { | ||||||
let isEmitter = dispatcherMode === 'emitter' ? true : false | ||||||
try { | ||||||
const bucketParams = { | ||||||
Bucket: payloadBucketPhysicalName, | ||||||
Key: paramKey, | ||||||
} | ||||||
const result = await s3Client.send(new GetObjectCommand(bucketParams)) | ||||||
|
||||||
let parameters = '' | ||||||
if (result.Body) { | ||||||
parameters = await streamToString(result.Body as Readable) | ||||||
} | ||||||
parameters = JSON.parse(parameters) | ||||||
|
||||||
if (isEmitter && Array.isArray(parameters)) { | ||||||
// Emitters only have 1 parameter - the runtime saves an array, so we | ||||||
// normalize the parameter | ||||||
parameters = parameters[0] | ||||||
if (Array.isArray(parameters)) { | ||||||
let paramPairs = _.toPairs(parameters) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||||||
paramPairs = paramPairs.map((x) => { | ||||||
if (x[1].type == 'Buffer') { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
return [x[0], Buffer.from(x[1].data)] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this supposed to be a clone then? It's already a buffer, so this just reads it into another buffer, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like it, i can just change it to return x[1].data |
||||||
} else { | ||||||
return x | ||||||
} | ||||||
}) | ||||||
|
||||||
parameters = _.toPairs(paramPairs) | ||||||
} | ||||||
} | ||||||
|
||||||
return parameters || {} | ||||||
} catch (e) { | ||||||
console.error(e) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we want to rethrow? (Ditto for similar lines below) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call, we should ill add that in, was my miss |
||||||
throw e | ||||||
} | ||||||
} | ||||||
exports.getCallParameters = getCallParameters | ||||||
|
||||||
async function saveParametersToS3(paramsS3Key, params) { | ||||||
try { | ||||||
const bucketParams = { | ||||||
Bucket: payloadBucketPhysicalName, | ||||||
Key: paramsS3Key, | ||||||
Body: JSON.stringify(params), | ||||||
} | ||||||
await s3Client.send(new PutObjectCommand(bucketParams)) | ||||||
} catch (e) { | ||||||
console.error(e) | ||||||
throw e | ||||||
} | ||||||
} | ||||||
exports.saveParametersToS3 = saveParametersToS3 | ||||||
|
||||||
async function s3_writeFile(...args) { | ||||||
const bucketParams = { | ||||||
Bucket: payloadBucketPhysicalName, | ||||||
Key: `${userBucketPath}/${args[0]}`, | ||||||
Body: args[1], | ||||||
} | ||||||
try { | ||||||
await s3Client.send(new PutObjectCommand(bucketParams)) | ||||||
console.debug( | ||||||
'Successfully uploaded object: ' + bucketParams.Bucket + '/' + bucketParams.Key | ||||||
) | ||||||
} catch (err) { | ||||||
console.log('Error', err) | ||||||
throw err | ||||||
} | ||||||
} | ||||||
|
||||||
async function s3_readFile(...args) { | ||||||
const bucketParams = { | ||||||
Bucket: payloadBucketPhysicalName, | ||||||
Key: `${userBucketPath}/${args[0]}`, | ||||||
} | ||||||
try { | ||||||
// Get the object from the Amazon S3 bucket. It is returned as a ReadableStream. | ||||||
const data = await s3Client.send(new GetObjectCommand(bucketParams)) | ||||||
if (data.Body) { | ||||||
return await streamToString(data.Body as Readable) | ||||||
} | ||||||
return '' | ||||||
} catch (err) { | ||||||
console.log('Error', err) | ||||||
throw err | ||||||
} | ||||||
} | ||||||
|
||||||
async function s3_readdir(path) { | ||||||
const bucketParams: ListObjectsCommandInput = { | ||||||
Bucket: payloadBucketPhysicalName, | ||||||
Prefix: `${userBucketPath}/${path}`, | ||||||
} | ||||||
|
||||||
try { | ||||||
const data = await s3Client.send(new ListObjectsCommand(bucketParams)) | ||||||
if (data.Contents) { | ||||||
const objectKeys: string[] = data.Contents.map((c) => c.Key!) | ||||||
console.debug('Success', objectKeys) | ||||||
return objectKeys | ||||||
} | ||||||
} catch (err) { | ||||||
console.log('Error', err) | ||||||
throw err | ||||||
} | ||||||
} | ||||||
|
||||||
async function s3_exists(fpath) { | ||||||
const bucketParams = { Bucket: payloadBucketPhysicalName, Key: `${userBucketPath}/${path}` } | ||||||
try { | ||||||
const data = await s3Client.send(new HeadObjectCommand(bucketParams)) | ||||||
console.debug('Success. Object deleted.', data) | ||||||
return data // For unit tests. | ||||||
} catch (err) { | ||||||
console.log('Error', err) | ||||||
throw err | ||||||
} | ||||||
} | ||||||
|
||||||
async function s3_deleteFile(fpath) { | ||||||
const bucketParams = { Bucket: payloadBucketPhysicalName, Key: `${userBucketPath}/${path}` } | ||||||
try { | ||||||
const data = await s3Client.send(new DeleteObjectCommand(bucketParams)) | ||||||
console.debug('Success. Object deleted.', data) | ||||||
return data // For unit tests. | ||||||
} catch (err) { | ||||||
console.log('Error', err) | ||||||
throw err | ||||||
} | ||||||
} | ||||||
|
||||||
exports.fs = { | ||||||
writeFile: s3_writeFile, | ||||||
readFile: s3_readFile, | ||||||
readdir: s3_readdir, | ||||||
access: s3_exists, | ||||||
rm: s3_deleteFile, | ||||||
} | ||||||
exports.fs.promises = exports.fs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: