Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes fs.js s3 read/write issues introduced by switching clients #282

Merged
merged 5 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions pkg/lang/javascript/aws_runtime/_fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ const targetRegion = process.env['AWS_TARGET_REGION']

const s3Client = new S3Client({ region: targetRegion })

const streamToString = (stream: Readable): Promise<string> =>
new Promise((resolve, reject) => {
const chunks: Uint8Array[] = []
const streamToString = async (stream: Readable, encoding: string) =>
(await streamToBuffer(stream)).toString(encoding)
const streamToBuffer = (stream: Readable) =>
new Promise<Buffer>((resolve, reject) => {
const chunks = []
stream.on('data', (chunk) => chunks.push(chunk))
stream.on('error', reject)
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')))
stream.on('end', () => resolve(Buffer.concat(chunks)))
})

async function getCallParameters(paramKey, dispatcherMode) {
Expand All @@ -36,7 +38,7 @@ async function getCallParameters(paramKey, dispatcherMode) {

let parameters: any = ''
if (result.Body) {
parameters = await streamToString(result.Body as Readable)
parameters = await streamToString(result.Body as Readable, 'utf-8')
console.log(parameters)
}
if (parameters != '') {
Expand Down Expand Up @@ -88,7 +90,7 @@ exports.saveParametersToS3 = saveParametersToS3
async function s3_writeFile(...args) {
const bucketParams = {
Bucket: bucketName,
Key: `${args[0]}`,
Key: stripLeadingSlashes(`${args[0]}`),
Body: args[1],
}
try {
Expand All @@ -111,9 +113,11 @@ async function s3_readFile(...args) {
// Get the object from the Amazon S3 bucket. It is returned as a ReadableStream.
const data = await s3Client.send(new GetObjectCommand(bucketParams))
if (data.Body) {
return await streamToString(data.Body as Readable)
if (args[1]?.encoding) {
return await streamToString(data.Body, args[1].encoding)
}
return streamToBuffer(data.Body)
}
return ''
} catch (err) {
console.log('Error', err)
throw err
Expand All @@ -123,7 +127,7 @@ async function s3_readFile(...args) {
async function s3_readdir(path) {
const bucketParams: ListObjectsCommandInput = {
Bucket: bucketName,
Prefix: `${path}`,
Prefix: stripLeadingSlashes(`${path}`),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this really used to be the behaviour? If so, we broke backwards compat, then we'll be breaking backwards compat again to return it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual behavior was actually quite a bit more complex as seen here:
https://github.com/hasnat/s3fs/blob/master/lib/utils.js

The key bit is that all the keys were normalized across host operating systems to match a specific format. Removing that from our FS breaks the expectation that you can use the Cloud FS the same as your local FS.

}

try {
Expand All @@ -140,7 +144,10 @@ async function s3_readdir(path) {
}

async function s3_exists(fpath) {
const bucketParams = { Bucket: bucketName, Key: `${path}` }
const bucketParams = {
Bucket: bucketName,
Key: stripLeadingSlashes(`${path}`),
}
try {
const data = await s3Client.send(new HeadObjectCommand(bucketParams))
console.debug('Success. Object deleted.', data)
Expand All @@ -152,7 +159,10 @@ async function s3_exists(fpath) {
}

async function s3_deleteFile(fpath) {
const bucketParams = { Bucket: bucketName, Key: `${path}` }
const bucketParams = {
Bucket: bucketName,
Key: stripLeadingSlashes(`${path}`),
}
try {
const data = await s3Client.send(new DeleteObjectCommand(bucketParams))
console.debug('Success. Object deleted.', data)
Expand All @@ -162,6 +172,9 @@ async function s3_deleteFile(fpath) {
throw err
}
}
function stripLeadingSlashes(path: string): string {
return path.replace(/^\/+/, '')
}

exports.fs = {
writeFile: s3_writeFile,
Expand Down
32 changes: 22 additions & 10 deletions pkg/lang/javascript/aws_runtime/fs.js.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ const bucketEnvVar = '{{.BucketNameEnvVar}}';
const bucketName = process.env[bucketEnvVar];
const targetRegion = process.env['AWS_TARGET_REGION'];
const s3Client = new client_s3_1.S3Client({ region: targetRegion });
const streamToString = (stream) => new Promise((resolve, reject) => {
const streamToString = async (stream, encoding) => (await streamToBuffer(stream)).toString(encoding);
const streamToBuffer = (stream) => new Promise((resolve, reject) => {
const chunks = [];
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
stream.on('end', () => resolve(Buffer.concat(chunks)));
});
async function getCallParameters(paramKey, dispatcherMode) {
let isEmitter = dispatcherMode === 'emitter' ? true : false;
Expand All @@ -23,7 +24,7 @@ async function getCallParameters(paramKey, dispatcherMode) {
const result = await s3Client.send(new client_s3_1.GetObjectCommand(bucketParams));
let parameters = '';
if (result.Body) {
parameters = await streamToString(result.Body);
parameters = await streamToString(result.Body, "utf-8");
console.log(parameters);
}
if (parameters != '') {
Expand Down Expand Up @@ -73,7 +74,7 @@ exports.saveParametersToS3 = saveParametersToS3;
async function s3_writeFile(...args) {
const bucketParams = {
Bucket: bucketName,
Key: `${args[0]}`,
Key: stripLeadingSlashes(`${args[0]}`),
Body: args[1],
};
try {
Expand All @@ -88,15 +89,17 @@ async function s3_writeFile(...args) {
async function s3_readFile(...args) {
const bucketParams = {
Bucket: bucketName,
Key: `${args[0]}`,
Key: stripLeadingSlashes(`${args[0]}`),
};
try {
// Get the object from the Amazon S3 bucket. It is returned as a ReadableStream.
const data = await s3Client.send(new client_s3_1.GetObjectCommand(bucketParams));
if (data.Body) {
return await streamToString(data.Body);
if (args[1]?.encoding) {
return await streamToString(data.Body, args[1].encoding);
}
return streamToBuffer(data.Body);
}
return '';
}
catch (err) {
console.log('Error', err);
Expand All @@ -106,7 +109,7 @@ async function s3_readFile(...args) {
async function s3_readdir(path) {
const bucketParams = {
Bucket: bucketName,
Prefix: `${path}`,
Prefix: stripLeadingSlashes(`${path}`),
};
try {
const data = await s3Client.send(new client_s3_1.ListObjectsCommand(bucketParams));
Expand All @@ -122,7 +125,10 @@ async function s3_readdir(path) {
}
}
async function s3_exists(fpath) {
const bucketParams = { Bucket: bucketName, Key: `${path}` };
const bucketParams = {
Bucket: bucketName,
Key: stripLeadingSlashes(`${path}`)
};
try {
const data = await s3Client.send(new client_s3_1.HeadObjectCommand(bucketParams));
console.debug('Success. Object deleted.', data);
Expand All @@ -134,7 +140,10 @@ async function s3_exists(fpath) {
}
}
async function s3_deleteFile(fpath) {
const bucketParams = { Bucket: bucketName, Key: `${path}` };
const bucketParams = {
Bucket: bucketName,
Key: stripLeadingSlashes(`${path}`)
};
try {
const data = await s3Client.send(new client_s3_1.DeleteObjectCommand(bucketParams));
console.debug('Success. Object deleted.', data);
Expand All @@ -145,6 +154,9 @@ async function s3_deleteFile(fpath) {
throw err;
}
}
function stripLeadingSlashes(path) {
return path.replace(/^\/+/, "");
}
exports.fs = {
writeFile: s3_writeFile,
readFile: s3_readFile,
Expand Down