Skip to content

Commit

Permalink
Merge pull request #8385 from romayalon/romy-backport-5.16
Browse files Browse the repository at this point in the history
Backport copy object issue to 5.16
  • Loading branch information
nimrod-becker committed Sep 19, 2024
2 parents 1c40d7b + 449cdeb commit b1de141
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 62 deletions.
77 changes: 47 additions & 30 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ function _get_version_id_by_stat({ino, mtimeNsBigint}) {
return 'mtime-' + mtimeNsBigint.toString(36) + '-ino-' + ino.toString(36);
}

function _is_version_object_including_null_version(filename) {
function _is_version_or_null_in_file_name(filename) {
const is_version_object = _is_version_object(filename);
if (!is_version_object) {
return _is_version_null_version(filename);
Expand Down Expand Up @@ -479,7 +479,7 @@ class NamespaceFS {
}

/**
* @param {nb.ObjectSDK} object_sdk
* @param {nb.ObjectSDK} object_sdk
* @returns {nb.NativeFSContext}
*/
prepare_fs_context(object_sdk) {
Expand Down Expand Up @@ -626,6 +626,7 @@ class NamespaceFS {
* key: string,
* common_prefix: boolean,
* stat?: nb.NativeFSStats,
* is_latest: boolean,
* }} Result
*/

Expand Down Expand Up @@ -714,15 +715,17 @@ class NamespaceFS {
const isDir = await is_directory_or_symlink_to_directory(ent, fs_context, path.join(dir_path, ent.name));

let r;
if (list_versions && _is_version_object_including_null_version(ent.name)) {
if (list_versions && _is_version_or_null_in_file_name(ent.name)) {
r = {
key: this._get_version_entry_key(dir_key, ent),
common_prefix: isDir,
is_latest: false
};
} else {
r = {
key: this._get_entry_key(dir_key, ent, isDir),
common_prefix: isDir,
is_latest: true
};
}
await insert_entry_to_results_arr(r);
Expand Down Expand Up @@ -837,6 +840,20 @@ class NamespaceFS {
}
};

let previous_key;
/**
* delete markers are always in the .versions folder, so we need to have special case to determine
* if they are delete markers. since the result list is ordered by latest entries first, the first
* entry of every key is the latest
* TODO need different way to check for isLatest in case of unordered list object versions
* @param {Object} obj_info
*/
const set_latest_delete_marker = obj_info => {
if (obj_info.delete_marker && previous_key !== obj_info.key) {
obj_info.is_latest = true;
}
};

const prefix_dir_key = prefix.slice(0, prefix.lastIndexOf('/') + 1);
await process_dir(prefix_dir_key);
await Promise.all(results.map(async r => {
Expand All @@ -858,15 +875,17 @@ class NamespaceFS {
if (r.common_prefix) {
res.common_prefixes.push(r.key);
} else {
obj_info = this._get_object_info(bucket, r.key, r.stat, 'null', false, true);
obj_info = this._get_object_info(bucket, r.key, r.stat, false, r.is_latest);
if (!list_versions && obj_info.delete_marker) {
continue;
}
if (this._is_hidden_version_path(obj_info.key)) {
obj_info.key = path.normalize(obj_info.key.replace(HIDDEN_VERSIONS_PATH + '/', ''));
obj_info.key = _get_filename(obj_info.key);
set_latest_delete_marker(obj_info);
}
res.objects.push(obj_info);
previous_key = obj_info.key;
}
if (res.is_truncated) {
if (list_versions && _is_version_object(r.key)) {
Expand Down Expand Up @@ -910,7 +929,7 @@ class NamespaceFS {
}
}
this._throw_if_delete_marker(stat);
return this._get_object_info(params.bucket, params.key, stat, params.version_id || 'null', isDir);
return this._get_object_info(params.bucket, params.key, stat, isDir);
} catch (err) {
if (this._should_update_issues_report(params, file_path, err)) {
this.run_update_issues_report(object_sdk, err);
Expand Down Expand Up @@ -1056,7 +1075,9 @@ class NamespaceFS {
// end the stream
res.end();

await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal });
// in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream
// and don't care about the readable part, set readable: false
await stream_utils.wait_finished(res, { readable: false, signal: object_sdk.abort_controller.signal });
object_sdk.throw_if_aborted();

dbg.log0('NamespaceFS: read_object_stream completed file', file_path, {
Expand Down Expand Up @@ -1174,9 +1195,7 @@ class NamespaceFS {
}

if (copy_res) {
if (copy_res === copy_status_enum.FALLBACK) {
params.copy_source.nsfs_copy_fallback();
} else {
if (copy_res !== copy_status_enum.FALLBACK) {
// open file after copy link/same inode should use read open mode
open_mode = config.NSFS_OPEN_READ_MODE;
if (copy_res === copy_status_enum.SAME_INODE) open_path = file_path;
Expand Down Expand Up @@ -1259,10 +1278,8 @@ class NamespaceFS {
let stat = await target_file.stat({ ...fs_context, disable_ctime_check: part_upload });
this._verify_encryption(params.encryption, this._get_encryption_info(stat));

// handle xattr
// assign user xattr on non copy / copy with xattr_copy header provided
const copy_xattr = params.copy_source && params.xattr_copy;
let fs_xattr = copy_xattr ? undefined : to_fs_xattr(params.xattr);
let fs_xattr = to_fs_xattr(params.xattr);

// assign noobaa internal xattr - content type, md5, versioning xattr
if (params.content_type) {
Expand Down Expand Up @@ -1305,7 +1322,6 @@ class NamespaceFS {

// when object is a dir, xattr are set on the folder itself and the content is in .folder file
if (is_dir_content) {
if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr);
await this._assign_dir_content_to_xattr(fs_context, fs_xattr, { ...params, size: stat.size }, copy_xattr);
}
stat = await nb_native().fs.stat({ ...fs_context, disable_ctime_check: part_upload }, file_path);
Expand All @@ -1317,12 +1333,11 @@ class NamespaceFS {
await native_fs_utils._make_path_dirs(file_path, fs_context);
const copy_xattr = params.copy_source && params.xattr_copy;

let fs_xattr = copy_xattr ? {} : to_fs_xattr(params.xattr) || {};
let fs_xattr = to_fs_xattr(params.xattr) || {};
if (params.content_type) {
fs_xattr = fs_xattr || {};
fs_xattr[XATTR_CONTENT_TYPE] = params.content_type;
}
if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr);

await this._assign_dir_content_to_xattr(fs_context, fs_xattr, params, copy_xattr);
// when .folder exist and it's no upload flow - .folder should be deleted if it exists
Expand All @@ -1338,13 +1353,6 @@ class NamespaceFS {
return upload_info;
}

async _get_copy_source_xattr(params, fs_context, fs_xattr) {
const is_source_dir = params.copy_source.key.endsWith('/');
const source_file_md_path = await this._find_version_path(fs_context, params.copy_source, is_source_dir);
const source_stat = await nb_native().fs.stat(fs_context, source_file_md_path);
return { ...source_stat.xattr, ...fs_xattr };
}

// move to dest GPFS (wt) / POSIX (w / undefined) - non part upload
async _move_to_dest(fs_context, source_path, dest_path, target_file, open_mode, key) {
let retries = config.NSFS_RENAME_RETRIES;
Expand Down Expand Up @@ -1475,7 +1483,7 @@ class NamespaceFS {
// Can be finetuned further on if needed and inserting the Semaphore logic inside
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
const { source_stream } = params;
const { source_stream, copy_source } = params;
try {
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
const md5_enabled = this._is_force_md5_enabled(object_sdk);
Expand All @@ -1490,8 +1498,14 @@ class NamespaceFS {
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
if (copy_source) {
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
} else if (params.source_params) {
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
}
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
} catch (error) {
dbg.error('_upload_stream had error: ', error);
Expand Down Expand Up @@ -1777,6 +1791,7 @@ class NamespaceFS {
upload_params.params.xattr = create_params_parsed.xattr;
upload_params.params.storage_class = create_params_parsed.storage_class;
upload_params.digest = MD5Async && (((await MD5Async.digest()).toString('hex')) + '-' + multiparts.length);
upload_params.params.content_type = create_params_parsed.content_type;

const upload_info = await this._finish_upload(upload_params);

Expand Down Expand Up @@ -2276,16 +2291,18 @@ class NamespaceFS {
}

/**
* @param {string} bucket
* @param {string} key
* @param {nb.NativeFSStats} stat
* @param {string} bucket
* @param {string} key
* @param {nb.NativeFSStats} stat
* @param {Boolean} isDir
* @param {boolean} [is_latest=true]
* @returns {nb.ObjectInfo}
*/
_get_object_info(bucket, key, stat, return_version_id, isDir, is_latest = true) {
_get_object_info(bucket, key, stat, isDir, is_latest = true) {
const etag = this._get_etag(stat);
const create_time = stat.mtime.getTime();
const encryption = this._get_encryption_info(stat);
const version_id = return_version_id && this._is_versioning_enabled() && this._get_version_id_by_xattr(stat);
const version_id = (this._is_versioning_enabled() || this._is_versioning_suspended()) && this._get_version_id_by_xattr(stat);
const delete_marker = stat.xattr?.[XATTR_DELETE_MARKER] === 'true';
const dir_content_type = stat.xattr?.[XATTR_DIR_CONTENT] && ((Number(stat.xattr?.[XATTR_DIR_CONTENT]) > 0 && 'application/octet-stream') || 'application/x-directory');
const content_type = stat.xattr?.[XATTR_CONTENT_TYPE] ||
Expand Down
33 changes: 18 additions & 15 deletions src/sdk/object_sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class ObjectSDK {
* in order to handle aborting requests gracefully. The `abort_controller` member will
* be used to signal async flows that abort was detected.
* @see {@link https://nodejs.org/docs/latest/api/globals.html#class-abortcontroller}
* @param {import('http').IncomingMessage} req
* @param {import('http').ServerResponse} res
* @param {import('http').IncomingMessage} req
* @param {import('http').ServerResponse} res
*/
setup_abort_controller(req, res) {
res.once('error', err => {
Expand Down Expand Up @@ -157,7 +157,7 @@ class ObjectSDK {
}

/**
* @param {string} name
* @param {string} name
* @returns {Promise<nb.Namespace>}
*/
async _get_bucket_namespace(name) {
Expand Down Expand Up @@ -268,7 +268,7 @@ class ObjectSDK {
return Boolean(fs_root_path || fs_root_path === '');
}

// validates requests for non nsfs buckets from accounts which are nsfs_only
// validates requests for non nsfs buckets from accounts which are nsfs_only
has_non_nsfs_bucket_access(account, ns) {
dbg.log1('validate_non_nsfs_bucket: ', account, ns?.write_resource?.resource);
if (!account) return false;
Expand Down Expand Up @@ -524,7 +524,7 @@ class ObjectSDK {
/**
* Calls the op and report time and error to stats collector.
* on_success can be added to update read/write stats (but on_success shouln't throw)
*
*
* @template T
* @param {{
* op_name: string;
Expand Down Expand Up @@ -642,7 +642,9 @@ class ObjectSDK {
params.content_type = source_md.content_type;
}
try {
if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith('noobaa-namespace'));
//omitBy iterates all xattr calling startsWith on them. this can include symbols such as XATTR_SORT_SYMBOL.
//in that case startsWith will not apply
if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith?.('noobaa-namespace'));
} catch (e) {
dbg.log3("Got an error while trying to omitBy param.xattr:", params.xattr, "error:", e);
}
Expand All @@ -658,19 +660,14 @@ class ObjectSDK {
params.copy_source.bucket = actual_source_ns.get_bucket(bucket);
params.copy_source.obj_id = source_md.obj_id;
params.copy_source.version_id = source_md.version_id;
if (source_ns instanceof NamespaceFS) {
params.copy_source.nsfs_copy_fallback = () => {
this._populate_nsfs_copy_fallback({ source_params, source_ns, params });
params.copy_source = null;
};
}
} else {
// source cannot be copied directly (different plaforms, accounts, etc.)
// set the source_stream to read from the copy source
// Source params need these for read operations
source_params.object_md = source_md;
source_params.obj_id = source_md.obj_id;
source_params.version_id = source_md.version_id;
source_params.bucket = actual_source_ns.get_bucket(bucket);
// param size is needed when doing an upload. Can be overrided during ranged writes
params.size = source_md.size;

Expand All @@ -684,7 +681,13 @@ class ObjectSDK {

// if the source namespace is NSFS then we need to pass the read_object_stream the read_stream
if (source_ns instanceof NamespaceFS) {
this._populate_nsfs_copy_fallback({ source_params, source_ns, params });
if (target_ns instanceof NamespaceFS) {
params.source_ns = actual_source_ns;
params.source_params = source_params;
} else {
//this._populate_nsfs_copy_fallback({ source_params, source_ns, params });
throw new Error('TODO fix _populate_nsfs_copy_fallback');
}
} else {
params.source_stream = await source_ns.read_object_stream(source_params, this);
}
Expand All @@ -701,9 +704,9 @@ class ObjectSDK {
}
}

// nsfs copy_object & server side copy consisted of link and a fallback to
// nsfs copy_object & server side copy consisted of link and a fallback to
// read stream and then upload stream
// nsfs copy object when can't server side copy - fallback directly
// nsfs copy object when can't server side copy - fallback directly
_populate_nsfs_copy_fallback({ source_ns, params, source_params }) {
const read_stream = new stream.PassThrough();
source_ns.read_object_stream(source_params, this, read_stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,7 @@ s3tests_boto3/functional/test_s3.py::test_put_object_ifmatch_failed
s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_failed
s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_overwrite_existed_failed
s3tests_boto3/functional/test_s3.py::test_object_raw_authenticated_bucket_gone
s3tests_boto3/functional/test_s3.py::test_object_copy_to_itself_with_metadata
s3tests_boto3/functional/test_s3.py::test_object_copy_canned_acl
s3tests_boto3/functional/test_s3.py::test_object_copy_retaining_metadata
s3tests_boto3/functional/test_s3.py::test_object_copy_replacing_metadata
s3tests_boto3/functional/test_s3.py::test_object_copy_versioning_multipart_upload
s3tests_boto3/functional/test_s3.py::test_list_multipart_upload
s3tests_boto3/functional/test_s3.py::test_multipart_upload_missing_part
s3tests_boto3/functional/test_s3.py::test_multipart_upload_incorrect_etag
s3tests_boto3/functional/test_s3.py::test_set_bucket_tagging
Expand Down
Loading

0 comments on commit b1de141

Please sign in to comment.