-
Notifications
You must be signed in to change notification settings - Fork 39
replace fs implementation with v3 client #70
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
const flatted = require('flatted') | ||
const _ = require('lodash') | ||
const path = require('path') | ||
import { Readable } from 'stream' | ||
import { | ||
S3Client, | ||
PutObjectCommand, | ||
GetObjectCommand, | ||
ListObjectsCommand, | ||
ListObjectsCommandInput, | ||
DeleteObjectCommand, | ||
HeadObjectCommand, | ||
} from '@aws-sdk/client-s3' | ||
|
||
const payloadBucketPhysicalName = process.env.KLOTHO_S3_PREFIX + '{{.PayloadsBucketName}}' | ||
const targetRegion = process.env['AWS_TARGET_REGION'] | ||
|
||
const userBucketPath = '/files' | ||
|
||
const s3Client = new S3Client({ region: targetRegion }) | ||
|
||
const streamToString = (stream: Readable): Promise<string> => | ||
new Promise((resolve, reject) => { | ||
const chunks: Uint8Array[] = [] | ||
stream.on('data', (chunk) => chunks.push(chunk)) | ||
stream.on('error', reject) | ||
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))) | ||
}) | ||
|
||
async function getCallParameters(paramKey, dispatcherMode) { | ||
let isEmitter = dispatcherMode === 'emitter' ? true : false | ||
try { | ||
const bucketParams = { | ||
Bucket: payloadBucketPhysicalName, | ||
Key: paramKey, | ||
} | ||
const result = await s3Client.send(new GetObjectCommand(bucketParams)) | ||
|
||
let parameters = '' | ||
if (result.Body) { | ||
parameters = await streamToString(result.Body as Readable) | ||
} | ||
|
||
if (isEmitter && Array.isArray(parameters)) { | ||
// Emitters only have 1 parameter - the runtime saves an array, so we | ||
// normalize the parameter | ||
parameters = _.get(parameters, '[0]') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this different from just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a lot of this was copy paste from the old implementation, going to try to remove some of the more odd things like this |
||
if (Array.isArray(parameters)) { | ||
let paramPairs = _.toPairs(parameters) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
paramPairs = paramPairs.map((x) => { | ||
if (_.get(x, '[1].type') == 'Buffer') { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similarly, if this is for index bounds, I think typescript can do it as just |
||
return [x[0], Buffer.from(_.get(x, '[1].data'))] | ||
} else { | ||
return x | ||
} | ||
}) | ||
|
||
parameters = _.fromPairs(paramPairs) | ||
} | ||
} | ||
|
||
return parameters || {} | ||
} catch (e) { | ||
console.error(e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we want to rethrow? (Ditto for similar lines below) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call, we should ill add that in, was my miss |
||
return | ||
} | ||
} | ||
exports.getCallParameters = getCallParameters | ||
|
||
async function saveParametersToS3(paramsS3Key, params) { | ||
try { | ||
const bucketParams = { | ||
Bucket: payloadBucketPhysicalName, | ||
Key: paramsS3Key, | ||
Body: flatted.stringify(params), | ||
} | ||
await s3Client.send(new PutObjectCommand(bucketParams)) | ||
} catch (e) { | ||
console.error(e) | ||
return | ||
} | ||
} | ||
exports.saveParametersToS3 = saveParametersToS3 | ||
|
||
async function s3_writeFile(...args) { | ||
const bucketParams = { | ||
Bucket: payloadBucketPhysicalName, | ||
Key: `${userBucketPath}/${args[0]}`, | ||
Body: args[1], | ||
} | ||
try { | ||
await s3Client.send(new PutObjectCommand(bucketParams)) | ||
console.log('Successfully uploaded object: ' + bucketParams.Bucket + '/' + bucketParams.Key) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if this should be debug? I'd be worried about spamming the service logs with what is essentially an implementation detail (they should be able to assume that if the s3 didn't log a failure, that it succeeded). Similar for other log lines, other than the ones within catches. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm, yeah i can add them all as debug |
||
} catch (err) { | ||
console.log('Error', err) | ||
} | ||
} | ||
|
||
async function s3_readFile(...args) { | ||
const bucketParams = { | ||
Bucket: payloadBucketPhysicalName, | ||
Key: `${userBucketPath}/${args[0]}`, | ||
} | ||
try { | ||
// Get the object from the Amazon S3 bucket. It is returned as a ReadableStream. | ||
const data = await s3Client.send(new GetObjectCommand(bucketParams)) | ||
if (data.Body) { | ||
return await streamToString(data.Body as Readable) | ||
} | ||
return '' | ||
} catch (err) { | ||
console.log('Error', err) | ||
} | ||
} | ||
|
||
async function s3_readdir(path) { | ||
const bucketParams: ListObjectsCommandInput = { | ||
Bucket: payloadBucketPhysicalName, | ||
Prefix: `${userBucketPath}/${path}`, | ||
} | ||
|
||
try { | ||
const data = await s3Client.send(new ListObjectsCommand(bucketParams)) | ||
if (data.Contents) { | ||
const objectKeys: string[] = data.Contents.map((c) => c.Key!) | ||
console.log('Success', objectKeys) | ||
return objectKeys | ||
} | ||
} catch (err) { | ||
console.log('Error', err) | ||
} | ||
} | ||
|
||
async function s3_exists(fpath) { | ||
const bucketParams = { Bucket: payloadBucketPhysicalName, Key: `${userBucketPath}/${path}` } | ||
try { | ||
const data = await s3Client.send(new HeadObjectCommand(bucketParams)) | ||
console.log('Success. Object deleted.', data) | ||
return data // For unit tests. | ||
} catch (err) { | ||
console.log('Error', err) | ||
} | ||
} | ||
|
||
async function s3_deleteFile(fpath) { | ||
const bucketParams = { Bucket: payloadBucketPhysicalName, Key: `${userBucketPath}/${path}` } | ||
try { | ||
const data = await s3Client.send(new DeleteObjectCommand(bucketParams)) | ||
console.log('Success. Object deleted.', data) | ||
return data // For unit tests. | ||
} catch (err) { | ||
console.log('Error', err) | ||
} | ||
} | ||
|
||
exports.fs = { | ||
writeFile: s3_writeFile, | ||
readFile: s3_readFile, | ||
readdir: s3_readdir, | ||
access: s3_exists, | ||
unlink: s3_deleteFile, | ||
} | ||
exports.fs.promises = exports.fs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: