From 55002e28b108a4b15a9865c4a0223e9b40a73ab5 Mon Sep 17 00:00:00 2001 From: nadav mizrahi Date: Thu, 12 Sep 2024 09:02:08 +0300 Subject: [PATCH 1/2] NSFS | fix copy_object issues Signed-off-by: nadav mizrahi (cherry picked from commit b744060e1682949d2710943648a4e315eb9ec32c) --- src/sdk/namespace_fs.js | 38 +++++++++---------- src/sdk/object_sdk.js | 33 ++++++++-------- .../nsfs_s3_tests_pending_list.txt | 5 --- src/test/unit_tests/test_namespace_fs.js | 26 ++++++++++--- 4 files changed, 56 insertions(+), 46 deletions(-) diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 58d4a8df55..9f856a0ae0 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -479,7 +479,7 @@ class NamespaceFS { } /** - * @param {nb.ObjectSDK} object_sdk + * @param {nb.ObjectSDK} object_sdk * @returns {nb.NativeFSContext} */ prepare_fs_context(object_sdk) { @@ -1056,7 +1056,9 @@ class NamespaceFS { // end the stream res.end(); - await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal }); + // in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream + // and don't care about the readable part, set readable: false + await stream_utils.wait_finished(res, { readable: false, signal: object_sdk.abort_controller.signal }); object_sdk.throw_if_aborted(); dbg.log0('NamespaceFS: read_object_stream completed file', file_path, { @@ -1174,9 +1176,7 @@ class NamespaceFS { } if (copy_res) { - if (copy_res === copy_status_enum.FALLBACK) { - params.copy_source.nsfs_copy_fallback(); - } else { + if (copy_res !== copy_status_enum.FALLBACK) { // open file after copy link/same inode should use read open mode open_mode = config.NSFS_OPEN_READ_MODE; if (copy_res === copy_status_enum.SAME_INODE) open_path = file_path; @@ -1259,10 +1259,8 @@ class NamespaceFS { let stat = await target_file.stat({ ...fs_context, disable_ctime_check: part_upload }); this._verify_encryption(params.encryption, this._get_encryption_info(stat)); - // handle xattr - // assign user xattr on non copy / copy with xattr_copy header provided const copy_xattr = params.copy_source && params.xattr_copy; - let fs_xattr = copy_xattr ? undefined : to_fs_xattr(params.xattr); + let fs_xattr = to_fs_xattr(params.xattr); // assign noobaa internal xattr - content type, md5, versioning xattr if (params.content_type) { @@ -1305,7 +1303,6 @@ class NamespaceFS { // when object is a dir, xattr are set on the folder itself and the content is in .folder file if (is_dir_content) { - if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr); await this._assign_dir_content_to_xattr(fs_context, fs_xattr, { ...params, size: stat.size }, copy_xattr); } stat = await nb_native().fs.stat({ ...fs_context, disable_ctime_check: part_upload }, file_path); @@ -1317,12 +1314,11 @@ class NamespaceFS { await native_fs_utils._make_path_dirs(file_path, fs_context); const copy_xattr = params.copy_source && params.xattr_copy; - let fs_xattr = copy_xattr ? {} : to_fs_xattr(params.xattr) || {}; + let fs_xattr = to_fs_xattr(params.xattr) || {}; if (params.content_type) { fs_xattr = fs_xattr || {}; fs_xattr[XATTR_CONTENT_TYPE] = params.content_type; } - if (params.copy_source) fs_xattr = await this._get_copy_source_xattr(params, fs_context, fs_xattr); await this._assign_dir_content_to_xattr(fs_context, fs_xattr, params, copy_xattr); // when .folder exist and it's no upload flow - .folder should be deleted if it exists @@ -1338,13 +1334,6 @@ class NamespaceFS { return upload_info; } - async _get_copy_source_xattr(params, fs_context, fs_xattr) { - const is_source_dir = params.copy_source.key.endsWith('/'); - const source_file_md_path = await this._find_version_path(fs_context, params.copy_source, is_source_dir); - const source_stat = await nb_native().fs.stat(fs_context, source_file_md_path); - return { ...source_stat.xattr, ...fs_xattr }; - } - // move to dest GPFS (wt) / POSIX (w / undefined) - non part upload async _move_to_dest(fs_context, source_path, dest_path, target_file, open_mode, key) { let retries = config.NSFS_RENAME_RETRIES; @@ -1475,7 +1464,7 @@ class NamespaceFS { // Can be finetuned further on if needed and inserting the Semaphore logic inside // Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream) async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) { - const { source_stream } = params; + const { source_stream, copy_source } = params; try { // Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy const md5_enabled = this._is_force_md5_enabled(object_sdk); @@ -1490,8 +1479,14 @@ class NamespaceFS { large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size }); chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1)); - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + if (copy_source) { + await this.read_object_stream(copy_source, object_sdk, chunk_fs); + } else if (params.source_params) { + await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs); + } else { + await stream_utils.pipeline([source_stream, chunk_fs]); + await stream_utils.wait_finished(chunk_fs); + } return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes }; } catch (error) { dbg.error('_upload_stream had error: ', error); @@ -1777,6 +1772,7 @@ class NamespaceFS { upload_params.params.xattr = create_params_parsed.xattr; upload_params.params.storage_class = create_params_parsed.storage_class; upload_params.digest = MD5Async && (((await MD5Async.digest()).toString('hex')) + '-' + multiparts.length); + upload_params.params.content_type = create_params_parsed.content_type; const upload_info = await this._finish_upload(upload_params); diff --git a/src/sdk/object_sdk.js b/src/sdk/object_sdk.js index f94e392216..fdb1164354 100644 --- a/src/sdk/object_sdk.js +++ b/src/sdk/object_sdk.js @@ -105,8 +105,8 @@ class ObjectSDK { * in order to handle aborting requests gracefully. The `abort_controller` member will * be used to signal async flows that abort was detected. * @see {@link https://nodejs.org/docs/latest/api/globals.html#class-abortcontroller} - * @param {import('http').IncomingMessage} req - * @param {import('http').ServerResponse} res + * @param {import('http').IncomingMessage} req + * @param {import('http').ServerResponse} res */ setup_abort_controller(req, res) { res.once('error', err => { @@ -157,7 +157,7 @@ class ObjectSDK { } /** - * @param {string} name + * @param {string} name * @returns {Promise} */ async _get_bucket_namespace(name) { @@ -268,7 +268,7 @@ class ObjectSDK { return Boolean(fs_root_path || fs_root_path === ''); } - // validates requests for non nsfs buckets from accounts which are nsfs_only + // validates requests for non nsfs buckets from accounts which are nsfs_only has_non_nsfs_bucket_access(account, ns) { dbg.log1('validate_non_nsfs_bucket: ', account, ns?.write_resource?.resource); if (!account) return false; @@ -524,7 +524,7 @@ class ObjectSDK { /** * Calls the op and report time and error to stats collector. * on_success can be added to update read/write stats (but on_success shouln't throw) - * + * * @template T * @param {{ * op_name: string; @@ -642,7 +642,9 @@ class ObjectSDK { params.content_type = source_md.content_type; } try { - if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith('noobaa-namespace')); + //omitBy iterates all xattr calling startsWith on them. this can include symbols such as XATTR_SORT_SYMBOL. + //in that case startsWith will not apply + if (params.xattr) params.xattr = _.omitBy(params.xattr, (val, name) => name.startsWith?.('noobaa-namespace')); } catch (e) { dbg.log3("Got an error while trying to omitBy param.xattr:", params.xattr, "error:", e); } @@ -658,12 +660,6 @@ class ObjectSDK { params.copy_source.bucket = actual_source_ns.get_bucket(bucket); params.copy_source.obj_id = source_md.obj_id; params.copy_source.version_id = source_md.version_id; - if (source_ns instanceof NamespaceFS) { - params.copy_source.nsfs_copy_fallback = () => { - this._populate_nsfs_copy_fallback({ source_params, source_ns, params }); - params.copy_source = null; - }; - } } else { // source cannot be copied directly (different plaforms, accounts, etc.) // set the source_stream to read from the copy source @@ -671,6 +667,7 @@ class ObjectSDK { source_params.object_md = source_md; source_params.obj_id = source_md.obj_id; source_params.version_id = source_md.version_id; + source_params.bucket = actual_source_ns.get_bucket(bucket); // param size is needed when doing an upload. Can be overrided during ranged writes params.size = source_md.size; @@ -684,7 +681,13 @@ class ObjectSDK { // if the source namespace is NSFS then we need to pass the read_object_stream the read_stream if (source_ns instanceof NamespaceFS) { - this._populate_nsfs_copy_fallback({ source_params, source_ns, params }); + if (target_ns instanceof NamespaceFS) { + params.source_ns = actual_source_ns; + params.source_params = source_params; + } else { + //this._populate_nsfs_copy_fallback({ source_params, source_ns, params }); + throw new Error('TODO fix _populate_nsfs_copy_fallback'); + } } else { params.source_stream = await source_ns.read_object_stream(source_params, this); } @@ -701,9 +704,9 @@ class ObjectSDK { } } - // nsfs copy_object & server side copy consisted of link and a fallback to + // nsfs copy_object & server side copy consisted of link and a fallback to // read stream and then upload stream - // nsfs copy object when can't server side copy - fallback directly + // nsfs copy object when can't server side copy - fallback directly _populate_nsfs_copy_fallback({ source_ns, params, source_params }) { const read_stream = new stream.PassThrough(); source_ns.read_object_stream(source_params, this, read_stream) diff --git a/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt b/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt index a8b0b9e89f..d6afbf63d4 100644 --- a/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt +++ b/src/test/system_tests/ceph_s3_tests/s3-tests-lists/nsfs_s3_tests_pending_list.txt @@ -152,12 +152,7 @@ s3tests_boto3/functional/test_s3.py::test_put_object_ifmatch_failed s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_failed s3tests_boto3/functional/test_s3.py::test_put_object_ifnonmatch_overwrite_existed_failed s3tests_boto3/functional/test_s3.py::test_object_raw_authenticated_bucket_gone -s3tests_boto3/functional/test_s3.py::test_object_copy_to_itself_with_metadata s3tests_boto3/functional/test_s3.py::test_object_copy_canned_acl -s3tests_boto3/functional/test_s3.py::test_object_copy_retaining_metadata -s3tests_boto3/functional/test_s3.py::test_object_copy_replacing_metadata -s3tests_boto3/functional/test_s3.py::test_object_copy_versioning_multipart_upload -s3tests_boto3/functional/test_s3.py::test_list_multipart_upload s3tests_boto3/functional/test_s3.py::test_multipart_upload_missing_part s3tests_boto3/functional/test_s3.py::test_multipart_upload_incorrect_etag s3tests_boto3/functional/test_s3.py::test_set_bucket_tagging diff --git a/src/test/unit_tests/test_namespace_fs.js b/src/test/unit_tests/test_namespace_fs.js index 1db9bc4067..fb75305973 100644 --- a/src/test/unit_tests/test_namespace_fs.js +++ b/src/test/unit_tests/test_namespace_fs.js @@ -1575,12 +1575,14 @@ mocha.describe('namespace_fs copy object', function() { assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr), [XATTR_DIR_CONTENT]: `${read_md_res.size}` }); assert.equal(stream_content_type, read_md_res.content_type); + const copy_source = { bucket: upload_bkt, key: key1 }; await ns_tmp.upload_object({ bucket: upload_bkt, key: key2, - copy_source: { bucket: upload_bkt, key: key1 }, + copy_source: copy_source, size: 100, - xattr_copy: true + xattr_copy: true, + xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk) }, dummy_object_sdk); const file_path2 = ns_tmp_bucket_path + '/' + key2; xattr = await get_xattr(file_path2); @@ -1615,12 +1617,14 @@ mocha.describe('namespace_fs copy object', function() { assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr) }); assert.equal(stream_content_type, read_md_res.content_type); + const copy_source = { bucket: upload_bkt, key: src_key }; await ns_tmp.upload_object({ bucket: upload_bkt, key: dst_key, - copy_source: { bucket: upload_bkt, key: src_key }, + copy_source: copy_source, size: 100, xattr_copy: true, + xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk) }, dummy_object_sdk); const file_path2 = ns_tmp_bucket_path + '/' + dst_key; xattr = await get_xattr(file_path2); @@ -1656,12 +1660,14 @@ mocha.describe('namespace_fs copy object', function() { assert.deepStrictEqual(xattr, { ...add_user_prefix(read_md_res.xattr) }); assert.equal(stream_content_type, read_md_res.content_type); + const copy_source = { bucket: upload_bkt, key: src_key }; await ns_tmp.upload_object({ bucket: upload_bkt, key: dst_key, - copy_source: { bucket: upload_bkt, key: src_key }, + copy_source: copy_source, size: 0, - xattr_copy: true + xattr_copy: true, + xattr: await _get_source_copy_xattr(copy_source, ns_tmp, dummy_object_sdk) }, dummy_object_sdk); const file_path2 = ns_tmp_bucket_path + '/' + dst_key; xattr = await get_xattr(file_path2); @@ -1687,6 +1693,16 @@ mocha.describe('namespace_fs copy object', function() { }); +//simulates object_sdk.fix_copy_source_params filtering of source xattr for copy object tests +async function _get_source_copy_xattr(copy_source, source_ns, object_sdk) { + const read_md_res = await source_ns.read_object_md({ + bucket: copy_source.bucket, + key: copy_source.key + }, object_sdk); + const res = _.omitBy(read_md_res.xattr, (val, name) => name.startsWith?.('noobaa-namespace')); + return res; +} + async function list_objects(ns, bucket, delimiter, prefix, dummy_object_sdk) { const res = await ns.list_objects({ bucket: bucket, From 449cdeb5b2bce9baa29f37e37f98568a2f4e516f Mon Sep 17 00:00:00 2001 From: nadav mizrahi Date: Wed, 14 Aug 2024 15:30:53 +0300 Subject: [PATCH 2/2] NSFS | NC | fix list-objecs-versions issues Signed-off-by: nadav mizrahi (cherry picked from commit b21517a5256d089f6fc7a308fec3688ce9d740c9) --- src/sdk/namespace_fs.js | 39 +++++++++++---- .../unit_tests/test_bucketspace_versioning.js | 47 ++++++++++++++++--- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 9f856a0ae0..dca97ab528 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -106,7 +106,7 @@ function _get_version_id_by_stat({ino, mtimeNsBigint}) { return 'mtime-' + mtimeNsBigint.toString(36) + '-ino-' + ino.toString(36); } -function _is_version_object_including_null_version(filename) { +function _is_version_or_null_in_file_name(filename) { const is_version_object = _is_version_object(filename); if (!is_version_object) { return _is_version_null_version(filename); @@ -626,6 +626,7 @@ class NamespaceFS { * key: string, * common_prefix: boolean, * stat?: nb.NativeFSStats, + * is_latest: boolean, * }} Result */ @@ -714,15 +715,17 @@ class NamespaceFS { const isDir = await is_directory_or_symlink_to_directory(ent, fs_context, path.join(dir_path, ent.name)); let r; - if (list_versions && _is_version_object_including_null_version(ent.name)) { + if (list_versions && _is_version_or_null_in_file_name(ent.name)) { r = { key: this._get_version_entry_key(dir_key, ent), common_prefix: isDir, + is_latest: false }; } else { r = { key: this._get_entry_key(dir_key, ent, isDir), common_prefix: isDir, + is_latest: true }; } await insert_entry_to_results_arr(r); @@ -837,6 +840,20 @@ class NamespaceFS { } }; + let previous_key; + /** + * delete markers are always in the .versions folder, so we need to have special case to determine + * if they are delete markers. since the result list is ordered by latest entries first, the first + * entry of every key is the latest + * TODO need different way to check for isLatest in case of unordered list object versions + * @param {Object} obj_info + */ + const set_latest_delete_marker = obj_info => { + if (obj_info.delete_marker && previous_key !== obj_info.key) { + obj_info.is_latest = true; + } + }; + const prefix_dir_key = prefix.slice(0, prefix.lastIndexOf('/') + 1); await process_dir(prefix_dir_key); await Promise.all(results.map(async r => { @@ -858,15 +875,17 @@ class NamespaceFS { if (r.common_prefix) { res.common_prefixes.push(r.key); } else { - obj_info = this._get_object_info(bucket, r.key, r.stat, 'null', false, true); + obj_info = this._get_object_info(bucket, r.key, r.stat, false, r.is_latest); if (!list_versions && obj_info.delete_marker) { continue; } if (this._is_hidden_version_path(obj_info.key)) { obj_info.key = path.normalize(obj_info.key.replace(HIDDEN_VERSIONS_PATH + '/', '')); obj_info.key = _get_filename(obj_info.key); + set_latest_delete_marker(obj_info); } res.objects.push(obj_info); + previous_key = obj_info.key; } if (res.is_truncated) { if (list_versions && _is_version_object(r.key)) { @@ -910,7 +929,7 @@ class NamespaceFS { } } this._throw_if_delete_marker(stat); - return this._get_object_info(params.bucket, params.key, stat, params.version_id || 'null', isDir); + return this._get_object_info(params.bucket, params.key, stat, isDir); } catch (err) { if (this._should_update_issues_report(params, file_path, err)) { this.run_update_issues_report(object_sdk, err); @@ -2272,16 +2291,18 @@ class NamespaceFS { } /** - * @param {string} bucket - * @param {string} key - * @param {nb.NativeFSStats} stat + * @param {string} bucket + * @param {string} key + * @param {nb.NativeFSStats} stat + * @param {Boolean} isDir + * @param {boolean} [is_latest=true] * @returns {nb.ObjectInfo} */ - _get_object_info(bucket, key, stat, return_version_id, isDir, is_latest = true) { + _get_object_info(bucket, key, stat, isDir, is_latest = true) { const etag = this._get_etag(stat); const create_time = stat.mtime.getTime(); const encryption = this._get_encryption_info(stat); - const version_id = return_version_id && this._is_versioning_enabled() && this._get_version_id_by_xattr(stat); + const version_id = (this._is_versioning_enabled() || this._is_versioning_suspended()) && this._get_version_id_by_xattr(stat); const delete_marker = stat.xattr?.[XATTR_DELETE_MARKER] === 'true'; const dir_content_type = stat.xattr?.[XATTR_DIR_CONTENT] && ((Number(stat.xattr?.[XATTR_DIR_CONTENT]) > 0 && 'application/octet-stream') || 'application/x-directory'); const content_type = stat.xattr?.[XATTR_CONTENT_TYPE] || diff --git a/src/test/unit_tests/test_bucketspace_versioning.js b/src/test/unit_tests/test_bucketspace_versioning.js index 8ce706c02a..d386b2c093 100644 --- a/src/test/unit_tests/test_bucketspace_versioning.js +++ b/src/test/unit_tests/test_bucketspace_versioning.js @@ -2846,6 +2846,7 @@ mocha.describe('List-objects', function() { const version_key_2 = 'search_key_mtime-crkfjum9883k-ino-guu7'; const version_key_3 = 'search_key_mtime-crkfjx1hui2o-ino-guu9'; const version_key_4 = 'search_key_mtime-crkfjx1hui2o-ino-guuh'; + const key_version = 'mtime-gffdt785k3k-ino-fgy'; const dir_key = 'dir1/delete_marker_key'; const dir_version_key_1 = 'delete_marker_key_mtime-crkfjknr7xmo-ino-guu4'; @@ -2910,13 +2911,13 @@ mocha.describe('List-objects', function() { await s3_client.putBucketVersioning({ Bucket: bucket_name, VersioningConfiguration: { MFADelete: 'Disabled', Status: 'Enabled' } }); const bucket_ver = await s3_client.getBucketVersioning({ Bucket: bucket_name }); assert.equal(bucket_ver.Status, 'Enabled'); - await create_object(`${full_path}/${key}`, body, 'null'); - await create_object(`${full_path_version_dir}/${version_key_1}`, version_body, 'null'); - await create_object(`${full_path_version_dir}/${version_key_2}`, version_body, 'null'); - await create_object(`${full_path_version_dir}/${version_key_3}`, version_body, 'null'); + await create_object(`${full_path}/${key}`, body, key_version); + await create_object(`${full_path_version_dir}/${version_key_1}`, version_body, 'mtime-crh3783sxk3k-ino-guty'); + await create_object(`${full_path_version_dir}/${version_key_2}`, version_body, 'mtime-crkfjum9883k-ino-guu7'); + await create_object(`${full_path_version_dir}/${version_key_3}`, version_body, 'mtime-crkfjx1hui2o-ino-guu9'); file_pointer = await create_object(`${full_path}/${dir_key}`, version_body, 'null', true); - await create_object(`${dir1_version_dir}/${dir_version_key_1}`, version_body, 'null'); - await create_object(`${dir1_version_dir}/${dir_version_key_2}`, version_body, 'null'); + await create_object(`${dir1_version_dir}/${dir_version_key_1}`, version_body, 'mtime-crkfjknr7xmo-ino-guu4'); + await create_object(`${dir1_version_dir}/${dir_version_key_2}`, version_body, 'mtime-crkfjr98uiv4-ino-guu6'); }); mocha.after(async () => { @@ -3048,7 +3049,7 @@ mocha.describe('List-objects', function() { file_pointer.replacexattr(DEFAULT_FS_CONFIG, xattr_delete_marker); const res = await s3_client.listObjectVersions({Bucket: bucket_name}); - res.Versions.forEach(val => { + res.DeleteMarkers.forEach(val => { if (val.Key === dir_key) { assert.equal(val.IsLatest, true); assert.equal(res.DeleteMarkers[0].IsLatest, true); @@ -3056,6 +3057,38 @@ mocha.describe('List-objects', function() { } }); }); + + mocha.it('list object versions - should show isLatest flag only for latest objects', async function() { + const res = await s3_client.listObjectVersions({Bucket: bucket_name}); + let count = 0; + res.Versions.forEach(val => { + if (val.IsLatest) { + if (val.Key === key) { + assert.equal(val.VersionId, key_version); + } + count += 1; + } + }); + res.DeleteMarkers.forEach(val => { + if (val.IsLatest) { + assert.equal(val.VersionId, 'null'); + count += 1; + } + }); + assert.equal(count, 2); + }); + + mocha.it('list object versions - should show version-id in suspention mode', async function() { + await s3_client.putBucketVersioning({ Bucket: bucket_name, VersioningConfiguration: { MFADelete: 'Disabled', Status: 'Suspended' } }); + const res = await s3_client.listObjectVersions({Bucket: bucket_name}); + let count = 0; + res.Versions.forEach(val => { + if (val.VersionId !== 'null') { + count += 1; + } + }); + assert.equal(count, 6); + }); }); async function create_object(object_path, data, version_id, return_fd) {