diff --git a/.eslintrc.js b/.eslintrc.js index 8857284fc7..d900c33930 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -131,7 +131,13 @@ module.exports = { 'function-call-argument-newline': 'off', - 'space-unary-ops': ['error', { words: false, nonwords: false }], + 'space-unary-ops': ['error', { + words: true, + nonwords: false, + overrides: { + typeof: false, + }, + }], // max line length is 80 by default, allow some slack // TODO eslint max-len for code lines should be error and reduced to ~100 instead of 140 diff --git a/Makefile b/Makefile index d1e04e5496..3c8eb6d2a7 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,12 @@ noobaa: base @echo "\033[1;32mNooBaa done.\033[0m" .PHONY: noobaa +nbdev: + $(CONTAINER_ENGINE) build $(CPUSET) -f src/deploy/NVA_build/dev.Dockerfile $(CACHE_FLAG) -t nbdev --build-arg GIT_COMMIT=$(GIT_COMMIT) . $(REDIRECT_STDOUT) + @echo "\033[1;32mImage 'nbdev' is ready.\033[0m" + @echo "Usage: docker run -it nbdev" +.PHONY: nbdev + clean: @echo Stopping and Deleting containers @$(CONTAINER_ENGINE) ps -a | grep noobaa_ | awk '{print $1}' | xargs $(CONTAINER_ENGINE) stop &> /dev/null diff --git a/src/agent/block_store_services/block_store_base.js b/src/agent/block_store_services/block_store_base.js index 0cdb769319..f3c6b8bf1a 100644 --- a/src/agent/block_store_services/block_store_base.js +++ b/src/agent/block_store_services/block_store_base.js @@ -382,7 +382,7 @@ class BlockStoreBase { const reply = {}; const delay_ms = 200; const data = crypto.randomBytes(1024); - const digest_type = config.CHUNK_CODER_FRAG_DIGEST_TYPE; + const digest_type = config.CHUNK_CODER_FRAG_DIGEST_TYPE || 'sha1'; const block_md = { id: '_test_store_perf', digest_type, diff --git a/src/api/bucket_api.js b/src/api/bucket_api.js index d37ba3045e..ab97c41b82 100644 --- a/src/api/bucket_api.js +++ b/src/api/bucket_api.js @@ -14,36 +14,6 @@ module.exports = { methods: { - put_object_lock_configuration: { - method: 'PUT', - params: { - type: 'object', - required: ['name', 'object_lock_configuration'], - properties: { - name: { $ref: 'common_api#/definitions/bucket_name' }, - object_lock_configuration: { $ref: '#/definitions/object_lock_configuration' }, - }, - }, - auth: { - system: 'admin' - }, - }, - get_object_lock_configuration: { - method: 'GET', - params: { - type: 'object', - required: ['name'], - properties: { - name: { $ref: 'common_api#/definitions/bucket_name' }, - }, - }, - reply: { - $ref: '#/definitions/object_lock_configuration' - }, - auth: { - system: 'admin' - }, - }, create_bucket: { method: 'POST', params: { @@ -916,7 +886,40 @@ module.exports = { auth: { system: 'admin' } - } + }, + + put_object_lock_configuration: { + method: 'PUT', + params: { + type: 'object', + required: ['name', 'object_lock_configuration'], + properties: { + name: { $ref: 'common_api#/definitions/bucket_name' }, + object_lock_configuration: { $ref: '#/definitions/object_lock_configuration' }, + }, + }, + auth: { + system: 'admin' + }, + }, + + get_object_lock_configuration: { + method: 'GET', + params: { + type: 'object', + required: ['name'], + properties: { + name: { $ref: 'common_api#/definitions/bucket_name' }, + }, + }, + reply: { + $ref: '#/definitions/object_lock_configuration' + }, + auth: { + system: 'admin' + }, + }, + }, definitions: { diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js new file mode 100644 index 0000000000..345fcc9101 --- /dev/null +++ b/src/sdk/namespace_fs.js @@ -0,0 +1,445 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const fs = require('fs'); +const path = require('path'); +const util = require('util'); +const mime = require('mime'); +const uuid = require('uuid'); +const events = require('events'); +const crypto = require('crypto'); + +const fs_utils = require('../util/fs_utils'); +// const fs_xattr = require('fs-xattr'); +// const s3_utils = require('../endpoint/s3/s3_utils'); +// const stream = require('stream'); + +// const XATTR_KEY = 'xattr.noobaa.io'; + +class NamespaceFS { + + constructor({ data_path }) { + this.data_path = data_path; + } + + get_write_resource() { + return this; + } + + ///////////////// + // OBJECT LIST // + ///////////////// + + async list_objects(params, object_sdk) { + + if (params.delimiter && params.delimiter !== '/') { + throw new Error('NamespaceFS: Invalid delimiter ' + params.delimiter); + } + + await this._load_bucket(params); + + return params.delimiter === '/' ? + this._list_objects_shallow(params) : + this._list_objects_deep(params); + } + + /** + * @param {object} params + * @param {string} params.bucket + * @param {string} params.prefix + * @param {string} params.delimiter + * @param {string} params.key_marker + * @param {number} params.limit + */ + async _list_objects_shallow({ bucket, prefix = '', delimiter = '/', key_marker = '', limit = 1000 }) { + const bucket_root_path = path.join(this.data_path, bucket); + const prefix_last_pos = prefix.lastIndexOf('/'); + const dir_key = prefix.slice(0, prefix_last_pos + 1); + const dir_path = path.join(bucket_root_path, dir_key); + const entry_prefix = prefix.slice(prefix_last_pos + 1); + const is_marker_in_dir = key_marker.startsWith(dir_key); + const is_marker_after_dir = (!is_marker_in_dir && key_marker > dir_key); + const entry_marker = is_marker_in_dir ? key_marker.slice(dir_key.length) : ''; + const entries = is_marker_after_dir ? [] : + await fs_utils.read_dir_sorted_limit({ + dir_path, + prefix: entry_prefix, + marker: entry_marker, + limit: limit + 1, + }); + const is_truncated = entries.length > limit; + const next_marker = is_truncated ? + dir_key + entries[limit].name : + undefined; + const res = { + objects: [], + common_prefixes: [], + is_truncated, + next_marker, + }; + let count = 0; + for (const entry of entries) { + if (count >= limit) break; + count += 1; + const key = dir_key + entry.name; + if (entry.isDirectory()) { + res.common_prefixes.push(key + '/'); + } else { + const stats = await fs.promises.stat(path.join(dir_path, entry.name)); + const obj_info = this._get_object_info(bucket, key, stats); + res.objects.push(obj_info); + } + } + return res; + } + + + async _list_objects_deep(params) { + // TODO deep listing + return this._list_objects_shallow(params); + } + + // for now we do not support versioning, so returning the same as list objects + async list_object_versions(params, object_sdk) { + // return { + // objects: [], + // common_prefixes: [], + // is_truncated: false, + // next_marker: undefined, + // next_version_id_marker: undefined, + // }; + return this.list_objects(params, object_sdk); + } + + ///////////////// + // OBJECT READ // + ///////////////// + + async read_object_md(params, object_sdk) { + try { + await this._load_bucket(params); + const file_path = this._get_file_path(params); + const stat = await fs.promises.stat(file_path); + console.log(file_path, stat); + return this._get_object_info(params.bucket, params.key, stat); + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + async read_object_stream(params, object_sdk) { + try { + await this._load_bucket(params); + const file_path = this._get_file_path(params); + return fs.createReadStream(file_path, { + start: Number.isInteger(params.start) ? params.start : undefined, + // end offset for files is inclusive, so need to adjust our exclusive end + end: Number.isInteger(params.end) ? params.end - 1 : undefined, + }); + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + /////////////////// + // OBJECT UPLOAD // + /////////////////// + + async upload_object(params, object_sdk) { + try { + await this._load_bucket(params); + const file_path = this._get_file_path(params); + const upload_id = uuid.v4(); + const upload_path = path.join(this.data_path, params.bucket, '.noobaa', 'uploads', upload_id); + await Promise.all([this._make_path_dirs(file_path), this._make_path_dirs(upload_path)]); + await this._upload_stream(params.source_stream, upload_path); + // TODO use file xattr to store md5_b64 xattr, etc. + const stat = await fs.promises.stat(upload_path); + await fs.promises.rename(upload_path, file_path); + return { etag: this._get_etag(stat) }; + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + async _upload_stream(source_stream, upload_path, write_options) { + return new Promise((resolve, reject) => + source_stream + .once('error', reject) + .pipe( + fs.createWriteStream(upload_path, write_options) + .once('error', reject) + .once('finish', resolve) + ) + ); + } + + ////////////////////// + // MULTIPART UPLOAD // + ////////////////////// + + async list_uploads(params, object_sdk) { + // for now we do not support listing of multipart uploads + return { + objects: [], + common_prefixes: [], + is_truncated: false, + next_marker: undefined, + next_upload_id_marker: undefined, + }; + } + + async create_object_upload(params, object_sdk) { + try { + const create_params = JSON.stringify({ ...params, source_stream: null }); + await this._load_multipart(params); + await fs.promises.writeFile(path.join(params.mpu_path, 'create_object_upload'), create_params); + return { obj_id: params.obj_id }; + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + async upload_multipart(params, object_sdk) { + try { + await this._load_multipart(params); + const upload_path = path.join(params.mpu_path, `part-${params.num}`); + await this._upload_stream(params.source_stream, upload_path); + const stat = await fs.promises.stat(upload_path); + return { etag: this._get_etag(stat) }; + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + async list_multiparts(params, object_sdk) { + await this._load_multipart(params); + const entries = await fs.promises.readdir(params.mpu_path); + const multiparts = await Promise.all( + entries + .filter(e => e.startsWith('part-')) + .map(async e => { + const num = Number(e.slice('part-'.length)); + const part_path = path.join(params.mpu_path, e); + const stat = await fs.promises.stat(part_path); + return { + num, + size: stat.size, + etag: this._get_etag(stat), + last_modified: new Date(stat.mtime), + }; + }) + ); + return { + is_truncated: false, + next_num_marker: undefined, + multiparts, + }; + } + + async complete_object_upload(params, object_sdk) { + try { + params.multiparts.sort((a, b) => a.num - b.num); + await this._load_multipart(params); + const file_path = this._get_file_path(params); + const upload_path = path.join(params.mpu_path, 'final'); + const upload_stream = fs.createWriteStream(upload_path); + for (const { num, etag } of params.multiparts) { + const part_path = path.join(params.mpu_path, `part-${num}`); + const part_stat = await fs.promises.stat(part_path); + if (etag !== this._get_etag(part_stat)) { + throw new Error('mismatch part etag: ' + util.inspect({ num, etag, part_path, part_stat, params })); + } + for await (const data of fs.createReadStream(part_path)) { + if (!upload_stream.write(data)) { + await events.once(upload_stream, 'drain'); + } + } + } + upload_stream.end(); + const stat = await fs.promises.stat(upload_path); + await fs.promises.rename(upload_path, file_path); + await fs_utils.folder_delete(params.mpu_path); + return { etag: this._get_etag(stat) }; + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + async abort_object_upload(params, object_sdk) { + await this._load_multipart(params); + await fs_utils.folder_delete(params.mpu_path); + } + + /////////////////// + // OBJECT DELETE // + /////////////////// + + async delete_object(params, object_sdk) { + try { + await this._load_bucket(params); + const file_path = this._get_file_path(params); + await fs.promises.unlink(file_path); + return {}; + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + async delete_multiple_objects(params, object_sdk) { + try { + await this._load_bucket(params); + for (const { key } of params.objects) { + const file_path = this._get_file_path({ bucket: params.bucket, key }); + await fs.promises.unlink(file_path); + } + return params.objects.map(() => ({})); + } catch (err) { + throw this._translate_object_error_codes(err); + } + } + + //////////////////// + // OBJECT TAGGING // + //////////////////// + + async get_object_tagging(params, object_sdk) { + // TODO + return { tagging: [] }; + } + async delete_object_tagging(params, object_sdk) { + // TODO + return {}; + } + async put_object_tagging(params, object_sdk) { + // TODO + return { tagging: [] }; + } + + ////////////////////////// + // AZURE BLOB MULTIPART // + ////////////////////////// + + async upload_blob_block(params, object_sdk) { + throw new Error('TODO'); + } + async commit_blob_block_list(params, object_sdk) { + throw new Error('TODO'); + } + async get_blob_block_lists(params, object_sdk) { + throw new Error('TODO'); + } + + + ////////////// + // INTERNAL // + ////////////// + + _get_file_path({ bucket, key }) { + return path.join(this.data_path, bucket, key); + } + + async _make_path_dirs(file_path) { + const last_dir_pos = file_path.lastIndexOf('/'); + if (last_dir_pos > 0) return fs_utils.create_path(file_path.slice(0, last_dir_pos)); + } + + _get_etag(stat) { + const ident_str = 'inode-' + stat.ino.toString() + '-mtime-' + stat.mtime.getTime().toString(); + return crypto.createHash('md5').update(ident_str).digest('hex'); + } + + /** + * @param {string} bucket + * @param {string} key + * @param {fs.Stats} stat + */ + _get_object_info(bucket, key, stat) { + const etag = this._get_etag(stat); + return { + obj_id: etag, + bucket, + key, + etag, + size: stat.size, + create_time: stat.mtime.getTime() + stat.mtimeMs, + content_type: mime.getType(key) || 'application/octet-stream', + // temp: + version_id: 1, + is_latest: true, + tag_count: 0, + delete_marker: false, + xattr: {}, + }; + } + + _translate_object_error_codes(err) { + if (err.code === 'ENOENT') err.rpc_code = 'NO_SUCH_OBJECT'; + return err; + } + + async _load_bucket(params) { + try { + params.bucket_path = path.join(this.data_path, params.bucket); + await fs.promises.stat(params.bucket_path); + } catch (err) { + if (err.code === 'ENOENT') err.rpc_code = 'NO_SUCH_BUCKET'; + throw err; + } + } + + async _load_multipart(params) { + await this._load_bucket(params); + const is_new = !params.obj_id; + if (is_new) { + params.obj_id = uuid.v4(); + } + if (!params.mpu_path) { + params.mpu_path = path.join(params.bucket_path, '.noobaa', 'multipart-uploads', params.obj_id); + } + if (is_new) { + await fs_utils.create_path(params.mpu_path); + } else { + try { + await fs.promises.stat(params.mpu_path); + } catch (err) { + if (err.code === 'ENOENT') err.rpc_code = 'NO_SUCH_UPLOAD'; + throw err; + } + } + } + + + +} + + +module.exports = NamespaceFS; + + +/** + * main implements a simple cli that calls a method of namespace fs + * and prints the result returned from the async method. + * + * Usage: + */ +async function main() { + // eslint-disable-next-line global-require + require('../util/console_wrapper').original_console(); + // eslint-disable-next-line global-require + const argv = require('minimist')(process.argv.slice(2)); + const inspect = x => util.inspect(x, { depth: null, breakLength: 10, colors: true }); + argv.data_path = argv.data_path || '.'; + argv.bucket = argv.bucket || 'src'; + argv.prefix = argv.prefix || ''; + argv.key_marker = argv.key_marker || ''; + argv.delimiter = argv.delimiter || '/'; + argv.limit = isNaN(Number(argv.limit)) ? 1000 : Number(argv.limit); + const op = argv._[0] || 'list_objects'; + console.log(`${op}:\n${inspect(argv)}`); + const nsfs = new NamespaceFS({ data_path: argv.data_path }); + const res = await (nsfs[op](argv)); + console.log(`Result:\n${inspect(res)}`); +} + +if (require.main === module) main(); diff --git a/src/sdk/namespace_s3.js b/src/sdk/namespace_s3.js index d54b7620d0..9132fc5f5a 100644 --- a/src/sdk/namespace_s3.js +++ b/src/sdk/namespace_s3.js @@ -47,6 +47,7 @@ class NamespaceS3 { dbg.log0('NamespaceS3.list_objects:', this.bucket, inspect(params)); const res = await this.s3.listObjects({ + Bucket: this.bucket, Prefix: params.prefix, Delimiter: params.delimiter, Marker: params.key_marker, @@ -68,6 +69,7 @@ class NamespaceS3 { dbg.log0('NamespaceS3.list_uploads:', this.bucket, inspect(params)); const res = await this.s3.listMultipartUploads({ + Bucket: this.bucket, Prefix: params.prefix, Delimiter: params.delimiter, KeyMarker: params.key_marker, @@ -82,7 +84,7 @@ class NamespaceS3 { objects: _.map(res.Uploads, obj => this._get_s3_object_info(obj, params.bucket)), common_prefixes: _.map(res.CommonPrefixes, 'Prefix'), is_truncated: res.IsTruncated, - next_marker: res.NextMarker, + next_marker: res.NextKeyMarker, next_upload_id_marker: res.UploadIdMarker, }; } @@ -91,6 +93,7 @@ class NamespaceS3 { dbg.log0('NamespaceS3.list_object_versions:', this.bucket, inspect(params)); const res = await this.s3.listObjectVersions({ + Bucket: this.bucket, Prefix: params.prefix, Delimiter: params.delimiter, KeyMarker: params.key_marker, @@ -109,7 +112,7 @@ class NamespaceS3 { ), common_prefixes: _.map(res.CommonPrefixes, 'Prefix'), is_truncated: res.IsTruncated, - next_marker: res.NextMarker, + next_marker: res.NextKeyMarker, next_version_id_marker: res.NextVersionIdMarker, }; } @@ -122,7 +125,11 @@ class NamespaceS3 { async read_object_md(params, object_sdk) { try { dbg.log0('NamespaceS3.read_object_md:', this.bucket, inspect(params)); - const request = { Key: params.key, Range: `bytes=0-${config.INLINE_MAX_SIZE - 1}` }; + const request = { + Bucket: this.bucket, + Key: params.key, + Range: `bytes=0-${config.INLINE_MAX_SIZE - 1}`, + }; this._assign_encryption_to_request(params, request); const res = await this.s3.getObject(request).promise(); dbg.log0('NamespaceS3.read_object_md:', this.bucket, inspect(params), 'res', inspect(res)); @@ -138,6 +145,7 @@ class NamespaceS3 { dbg.log0('NamespaceS3.read_object_stream:', this.bucket, inspect(_.omit(params, 'object_md.ns'))); return new P((resolve, reject) => { const request = { + Bucket: this.bucket, Key: params.key, Range: params.end ? `bytes=${params.start}-${params.end - 1}` : undefined, }; @@ -168,7 +176,8 @@ class NamespaceS3 { // clear count for next updates count = 0; }); - const read_stream = res.httpResponse.createUnbufferedStream(); + const read_stream = /** @type {import('stream').Readable} */ + (res.httpResponse.createUnbufferedStream()); return resolve(read_stream.pipe(count_stream)); }); req.send(); @@ -191,6 +200,7 @@ class NamespaceS3 { } const request = { + Bucket: this.bucket, Key: params.key, CopySource: copy_source, ContentType: params.content_type, @@ -216,6 +226,7 @@ class NamespaceS3 { }); const request = { + Bucket: this.bucket, Key: params.key, Body: params.source_stream.pipe(count_stream), ContentLength: params.size, @@ -258,6 +269,7 @@ class NamespaceS3 { dbg.log0('NamespaceS3.create_object_upload:', this.bucket, inspect(params)); const Tagging = params.tagging && params.tagging.map(tag => tag.key + '=' + tag.value).join('&'); const request = { + Bucket: this.bucket, Key: params.key, ContentType: params.content_type, Metadata: params.xattr, @@ -276,6 +288,7 @@ class NamespaceS3 { if (params.copy_source) { const { copy_source, copy_source_range } = s3_utils.format_copy_source(params.copy_source); const request = { + Bucket: this.bucket, Key: params.key, UploadId: params.obj_id, PartNumber: params.num, @@ -299,6 +312,7 @@ class NamespaceS3 { }); const request = { + Bucket: this.bucket, Key: params.key, UploadId: params.obj_id, PartNumber: params.num, @@ -319,6 +333,7 @@ class NamespaceS3 { async list_multiparts(params, object_sdk) { dbg.log0('NamespaceS3.list_multiparts:', this.bucket, inspect(params)); const res = await this.s3.listParts({ + Bucket: this.bucket, Key: params.key, UploadId: params.obj_id, MaxParts: params.max, @@ -341,6 +356,7 @@ class NamespaceS3 { async complete_object_upload(params, object_sdk) { dbg.log0('NamespaceS3.complete_object_upload:', this.bucket, inspect(params)); const res = await this.s3.completeMultipartUpload({ + Bucket: this.bucket, Key: params.key, UploadId: params.obj_id, MultipartUpload: { @@ -359,6 +375,7 @@ class NamespaceS3 { async abort_object_upload(params, object_sdk) { dbg.log0('NamespaceS3.abort_object_upload:', this.bucket, inspect(params)); const res = await this.s3.abortMultipartUpload({ + Bucket: this.bucket, Key: params.key, UploadId: params.obj_id, }).promise(); @@ -379,6 +396,7 @@ class NamespaceS3 { })); const res = await this.s3.putObjectTagging({ + Bucket: this.bucket, Key: params.key, VersionId: params.version_id, Tagging: { TagSet } @@ -394,6 +412,7 @@ class NamespaceS3 { async delete_object_tagging(params, object_sdk) { dbg.log0('NamespaceS3.delete_object_tagging:', this.bucket, inspect(params)); const res = await this.s3.deleteObjectTagging({ + Bucket: this.bucket, Key: params.key, VersionId: params.version_id }).promise(); @@ -408,6 +427,7 @@ class NamespaceS3 { async get_object_tagging(params, object_sdk) { dbg.log0('NamespaceS3.get_object_tagging:', this.bucket, inspect(params)); const res = await this.s3.getObjectTagging({ + Bucket: this.bucket, Key: params.key, VersionId: params.version_id }).promise(); @@ -434,6 +454,7 @@ class NamespaceS3 { dbg.log0('NamespaceS3.delete_object:', this.bucket, inspect(params)); const res = await this.s3.deleteObject({ + Bucket: this.bucket, Key: params.key, VersionId: params.version_id, }).promise(); @@ -460,6 +481,7 @@ class NamespaceS3 { dbg.log0('NamespaceS3.delete_multiple_objects:', this.bucket, inspect(params)); const res = await this.s3.deleteObjects({ + Bucket: this.bucket, Delete: { Objects: _.map(params.objects, obj => ({ Key: obj.key, diff --git a/src/sdk/nb.d.ts b/src/sdk/nb.d.ts index 0de9726fe8..3c63d4a838 100644 --- a/src/sdk/nb.d.ts +++ b/src/sdk/nb.d.ts @@ -1,6 +1,6 @@ export as namespace nb; -import { ObjectId as MongoID, Binary as MongoBinary } from 'mongodb'; +import { Binary as MongoBinary, ObjectId as MongoID } from 'mongodb'; type Semaphore = import('../util/semaphore'); type KeysSemaphore = import('../util/keys_semaphore'); @@ -20,10 +20,13 @@ type NodeType = 'BLOCK_STORE_GOOGLE' | 'BLOCK_STORE_FS' | 'ENDPOINT_S3'; -type MapByID = { [id: string]: T }; + +interface MapByID { + [id: string]: T; +} interface Base { - toJSON?(): Object | string; + toJSON?(): object | string; toString?(): string; } @@ -51,11 +54,11 @@ interface Account extends Base { allowed_buckets: { full_permission: boolean; permission_list: Bucket[]; - }, - access_keys: { + }; + access_keys: Array<{ access_key: SensitiveString; secret_key: SensitiveString; - }[]; + }>; } interface NodeAPI extends Base { @@ -75,14 +78,16 @@ interface NodeAPI extends Base { heartbeat: number; os_info: { hostname: string, - }, + }; drive: { mount: string, - }, + }; // incomplete... } -type NodesById = { [node_id: string]: NodeAPI }; +interface NodesById { + [node_id: string]: NodeAPI; +} interface Pool extends Base { _id: ID; @@ -126,12 +131,12 @@ interface Tiering extends Base { avg_chunk: number; delta_chunk: number; }; - tiers: { + tiers: Array<{ order: number; tier: Tier; spillover?: boolean; disabled?: boolean; - }[]; + }>; } interface TierStatus { @@ -140,7 +145,7 @@ interface TierStatus { } interface TieringStatus { - [tier_id: string]: TierStatus + [tier_id: string]: TierStatus; } interface PoolsStatus { @@ -148,7 +153,7 @@ interface PoolsStatus { valid_for_allocation: boolean; num_nodes: number; resource_type: ResourceType; - } + }; } interface MirrorStatus { @@ -170,12 +175,12 @@ interface Bucket extends Base { read_resources: NamespaceResource[]; write_resource: NamespaceResource; }; - quota?: Object; + quota?: object; storage_stats: { last_update: number; }; - lifecycle_configuration_rules?: Object; - lambda_triggers?: Object; + lifecycle_configuration_rules?: object; + lambda_triggers?: object; } interface NamespaceResource { @@ -183,7 +188,7 @@ interface NamespaceResource { name: string; system: System; account: Account; - connection: Object; + connection: object; } interface ChunkConfig extends Base { @@ -365,7 +370,7 @@ interface ObjectMD { xattr: {}; stats: { reads: number; last_read: Date; }; encryption: { algorithm: string; kms_key_id: string; context_b64: string; key_md5_b64: string; key_b64: string; }; - tagging: { key: string; value: string; }[], + tagging: Array<{ key: string; value: string; }>; lock_settings: { retention: { mode: string; retain_until_date: Date; }, legal_hold: { status: string } }; } @@ -390,7 +395,7 @@ interface ObjectInfo { xattr: {}; stats: { reads: number; last_read: number; }; encryption: { algorithm: string; kms_key_id: string; context_b64: string; key_md5_b64: string; key_b64: string; }; - tagging: { key: string; value: string; }[], + tagging: Array<{ key: string; value: string; }>; tag_count: number; s3_signed_url?: string; capacity_size?: number; @@ -564,35 +569,53 @@ interface PartSchemaDB { * **********************************************************/ +type APIMethod = (params: object, options?: object) => Promise; + +interface APIGroup { + [key: string]: APIMethod; +} + interface APIClient { - RPC_BUFFERS: Symbol; + readonly auth: APIGroup; + readonly account: APIGroup; + readonly system: APIGroup; + readonly tier: APIGroup; + readonly node: APIGroup; + readonly host: APIGroup; + readonly bucket: APIGroup; + readonly events: APIGroup; + readonly object: APIGroup; + readonly agent: APIGroup; + readonly block_store: APIGroup; + readonly stats: APIGroup; + readonly scrubber: APIGroup; + readonly debug: APIGroup; + readonly redirector: APIGroup; + readonly tiering_policy: APIGroup; + readonly pool: APIGroup; + readonly cluster_server: APIGroup; + readonly cluster_internal: APIGroup; + readonly server_inter_process: APIGroup; + readonly hosted_agents: APIGroup; + readonly frontend_notifications: APIGroup; + readonly func: APIGroup; + readonly func_node: APIGroup; + + RPC_BUFFERS: symbol; create_auth_token(params: object): Promise; create_access_key_auth(params: object): Promise; create_k8s_auth(params: object): Promise; - - readonly auth: object; - readonly account: object; - readonly system: object; - readonly tier: object; - readonly node: object; - readonly host: object; - readonly bucket: object; - readonly events: object; - readonly object: object; - readonly agent: object; - readonly block_store: object; - readonly stats: object; - readonly scrubber: object; - readonly debug: object; - readonly redirector: object; - readonly tiering_policy: object; - readonly pool: object; - readonly cluster_server: object; - readonly cluster_internal: object; - readonly server_inter_process: object; - readonly hosted_agents: object; - readonly frontend_notifications: object; - readonly func: object; - readonly func_node: object; } + + +/********************************************************** + * + * SDK / NAMESPACE + * + **********************************************************/ + +interface ObjectNamespace { + get_write_resource(): ObjectNamespace; + is_same_namespace(other: ObjectNamespace): boolean; +} \ No newline at end of file diff --git a/src/sdk/object_sdk.js b/src/sdk/object_sdk.js index 65736fc603..4dcf7999ee 100644 --- a/src/sdk/object_sdk.js +++ b/src/sdk/object_sdk.js @@ -13,6 +13,7 @@ const http_utils = require('../util/http_utils'); const size_utils = require('../util/size_utils'); const signature_utils = require('../util/signature_utils'); const NamespaceNB = require('./namespace_nb'); +const NamespaceFS = require('./namespace_fs'); const NamespaceS3 = require('./namespace_s3'); const NamespaceBlob = require('./namespace_blob'); const NamespaceMerge = require('./namespace_merge'); @@ -54,6 +55,9 @@ class ObjectSDK { this.rpc_client = rpc_client; this.internal_rpc_client = internal_rpc_client; this.object_io = object_io; + if (process.env.NAMESPACE_FS) { + this.namespace_fs = new NamespaceFS({ data_path: process.env.NAMESPACE_FS }); + } this.namespace_nb = new NamespaceNB(); this.accountspace_nb = new AccountSpaceNB({ rpc_client @@ -161,9 +165,17 @@ class ObjectSDK { const time = Date.now(); dbg.log0('_load_bucket_namespace', util.inspect(bucket, true, null, true)); try { - if (bucket.namespace && bucket.namespace.read_resources && bucket.namespace.write_resource) { - dbg.log0('_setup_bucket_namespace', bucket.namespace); + if (this.namespace_fs) { + return { + ns: this.namespace_fs, + bucket, + valid_until: time + NAMESPACE_CACHE_EXPIRY, + }; + } + + if (bucket.namespace) { + if (bucket.namespace.caching) { return { ns: new NamespaceCache({ @@ -184,9 +196,11 @@ class ObjectSDK { valid_until: time + NAMESPACE_CACHE_EXPIRY, }; } + } catch (err) { dbg.error('Failed to setup bucket namespace (fallback to no namespace)', err); } + this.namespace_nb.set_triggers_for_bucket(bucket.name.unwrap(), bucket.active_triggers); return { ns: this.namespace_nb, diff --git a/src/server/utils/chunk_config_utils.js b/src/server/utils/chunk_config_utils.js index 4bae335929..c517219a6d 100644 --- a/src/server/utils/chunk_config_utils.js +++ b/src/server/utils/chunk_config_utils.js @@ -33,12 +33,9 @@ function new_chunk_code_config_defaults(chunk_coder_config) { function resolve_chunk_config(chunk_coder_config, account, system) { // Default config can be specified in the account / system level too - // It will only be used if no specific config was requested - const global_chunk_config = chunk_coder_config ? - undefined : - account.default_chunk_config || system.default_chunk_config; - - if (global_chunk_config) return global_chunk_config; + // Only used if no specific config was requested + const global_chunk_config = account.default_chunk_config || system.default_chunk_config; + if (!chunk_coder_config && global_chunk_config) return global_chunk_config; // Fill the config with default values we assume the caller // to send only the values that it want to change from the default diff --git a/src/test/unit_tests/index.js b/src/test/unit_tests/index.js index 6cb18f4b1e..032669d51f 100644 --- a/src/test/unit_tests/index.js +++ b/src/test/unit_tests/index.js @@ -19,11 +19,12 @@ require('./test_prefetch'); require('./test_promise_utils'); require('./test_rpc'); require('./test_semaphore'); +require('./test_sort_utils'); require('./test_fs_utils'); require('./test_signature_utils'); require('./test_http_utils'); require('./test_v8_optimizations'); -// require('./test_ssl_utils'); +require('./test_ssl_utils'); require('./test_zip_utils'); require('./test_wait_queue'); require('./test_kmeans'); diff --git a/src/test/unit_tests/test_fs_utils.js b/src/test/unit_tests/test_fs_utils.js index 5f219a053d..eaab26b711 100644 --- a/src/test/unit_tests/test_fs_utils.js +++ b/src/test/unit_tests/test_fs_utils.js @@ -5,7 +5,6 @@ var mocha = require('mocha'); var assert = require('assert'); -var P = require('../../util/promise'); var fs_utils = require('../../util/fs_utils'); function log(...args) { @@ -21,41 +20,50 @@ mocha.describe('fs_utils', function() { mocha.describe('disk_usage', function() { - mocha.it('should work on the src', function() { - return P.join( - fs_utils.disk_usage('src/server'), - fs_utils.disk_usage('src/test') - ).spread((server_usage, test_usage) => { - log('disk_usage of src:', server_usage); - log('disk_usage of src/test:', test_usage); - assert(test_usage.size / server_usage.size > 0.50, - 'disk usage size of src/test is less than 50% of src/server,', - 'what about some quality :)'); - assert(test_usage.count / server_usage.count > 0.50, - 'disk usage count of src/test is less than 50% of src/server,', - 'what about some quality :)'); - }); + mocha.it('should work on the src', async function() { + const server_usage = await fs_utils.disk_usage('src/server'); + const test_usage = await fs_utils.disk_usage('src/test'); + log('disk_usage of src:', server_usage); + log('disk_usage of src/test:', test_usage); + assert(test_usage.size / server_usage.size > 0.50, + 'disk usage size of src/test is less than 50% of src/server,', + 'what about some quality :)'); + assert(test_usage.count / server_usage.count > 0.50, + 'disk usage count of src/test is less than 50% of src/server,', + 'what about some quality :)'); }); }); mocha.describe('read_dir_recursive', function() { - mocha.it('should find this entry in source dir', function() { + mocha.it('should find this entry in source dir', async function() { let found = false; - return fs_utils.read_dir_recursive({ - root: 'src/test', - on_entry: entry => { - if (entry.path.endsWith('test_fs_utils.js')) { - found = true; - } + await fs_utils.read_dir_recursive({ + root: 'src/test', + on_entry: entry => { + if (entry.path.endsWith('test_fs_utils.js')) { + found = true; } - }) - .then(() => { - assert(found, 'Failed to find this test file in the src/test'); - }); + } + }); + assert(found, 'Failed to find this test file in the src/test'); }); }); + + mocha.describe('read_dir_sorted_limit', async function() { + mocha.it('should find this entry in source dir', async function() { + const res = await fs_utils.read_dir_sorted_limit({ + dir_path: 'src/test/unit_tests', + prefix: 'test_', + marker: 'test_fs', + limit: 3, + }); + console.log(res); + assert.strict.equal(res[0].name, 'test_fs_utils.js'); + }); + }); + }); diff --git a/src/test/unit_tests/test_namespace_fs.js b/src/test/unit_tests/test_namespace_fs.js new file mode 100644 index 0000000000..bf0d59c92c --- /dev/null +++ b/src/test/unit_tests/test_namespace_fs.js @@ -0,0 +1,129 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const mocha = require('mocha'); +const util = require('util'); +const assert = require('assert'); +const buffer_utils = require('../../util/buffer_utils'); +const NamespaceFS = require('../../sdk/namespace_fs'); + +const inspect = (x, max_arr = 5) => util.inspect(x, { colors: true, depth: null, maxArrayLength: max_arr }); + +mocha.describe('namespace_fs', function() { + + const nsfs = new NamespaceFS({ data_path: '.' }); + + mocha.it('list_objects', async function() { + const res = await nsfs.list_objects({ + bucket: 'src', + prefix: '', + key_marker: '', + delimiter: '/', + limit: 5, + }); + console.log(inspect(res, res.length)); + let prev_key = ''; + for (const { key } of res.objects) { + if (res.next_marker) { + assert(key < res.next_marker, 'bad next_marker at key ' + key); + } + assert(prev_key <= key, 'objects not sorted at key ' + key); + prev_key = key; + } + prev_key = ''; + for (const key of res.common_prefixes) { + if (res.next_marker) { + assert(key < res.next_marker, 'next_marker at key ' + key); + } + assert(prev_key <= key, 'prefixes not sorted at key ' + key); + prev_key = key; + } + }); + + mocha.it('list_uploads', async function() { + const res = await nsfs.list_uploads({ + bucket: 'src', + prefix: '', + key_marker: '', + delimiter: '/', + limit: 5, + }); + console.log(inspect(res, res.length)); + }); + + mocha.it('list_object_versions', async function() { + const res = await nsfs.list_object_versions({ + bucket: 'src', + prefix: '', + key_marker: '', + delimiter: '/', + limit: 5, + }); + console.log(inspect(res, res.length)); + }); + + mocha.it('read_object_md', async function() { + const res = await nsfs.read_object_md({ + bucket: 'src', + key: 'test/unit_tests/test_namespace_fs.js', + }); + console.log(inspect(res)); + }); + + mocha.it('read_object_stream full', async function() { + const stream = await nsfs.read_object_stream({ + bucket: 'src', + key: 'test/unit_tests/test_namespace_fs.js', + }); + const res = (await buffer_utils.read_stream_join(stream)).toString(); + assert.strict.equal(res.slice(13, 28), '(C) 2016 NooBaa'); + assert.strict.equal(res.slice(37, 43), 'strict'); + }); + + mocha.it('read_object_stream range', async function() { + const stream = await nsfs.read_object_stream({ + bucket: 'src', + key: 'test/unit_tests/test_namespace_fs.js', + start: 13, + end: 28, + }); + const res = (await buffer_utils.read_stream_join(stream)).toString(); + assert.strict.equal(res, '(C) 2016 NooBaa'); + }); + + mocha.it('read_object_stream range above size', async function() { + const too_high = 1000000000; + const stream = await nsfs.read_object_stream({ + bucket: 'src', + key: 'test/unit_tests/test_namespace_fs.js', + start: too_high, + end: too_high + 10, + }); + const res = (await buffer_utils.read_stream_join(stream)).toString(); + assert.strict.equal(res, ''); + }); + + mocha.it('upload_object', async function() { + const upload_res = await nsfs.upload_object({ + bucket: 'src', + key: 'test/unit_tests/test_namespace_fs.upload_object', + source_stream: buffer_utils.buffer_to_read_stream(Buffer.from('abc')) + }); + console.log(inspect(upload_res)); + + const stream = await nsfs.read_object_stream({ + bucket: 'src', + key: 'test/unit_tests/test_namespace_fs.upload_object', + source_stream: buffer_utils.buffer_to_read_stream(Buffer.from('abc')) + }); + const res = (await buffer_utils.read_stream_join(stream)).toString(); + assert.strict.equal(res, 'abc'); + + const delete_res = await nsfs.delete_object({ + bucket: 'src', + key: 'test/unit_tests/test_namespace_fs.upload_object', + }); + console.log(inspect(delete_res)); + }); + +}); diff --git a/src/test/unit_tests/test_sort_utils.js b/src/test/unit_tests/test_sort_utils.js new file mode 100644 index 0000000000..fc57808a0c --- /dev/null +++ b/src/test/unit_tests/test_sort_utils.js @@ -0,0 +1,76 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const _ = require('lodash'); +// const util = require('util'); +const mocha = require('mocha'); +const assert = require('assert'); +const crypto = require('crypto'); +const sort_utils = require('../../util/sort_utils'); + +// const inspect = (x, max_arr = 5) => util.inspect(x, { colors: true, depth: null, maxArrayLength: max_arr, sorted: true }); + +mocha.describe('sort_utils', function() { + + mocha.it(`compare_by_key(String)`, function() { + const arr = _.shuffle(_.times(100, i => Math.floor(100 * Math.random()))); + const a = arr.slice().sort(sort_utils.compare_by_key(String)); + const b = _.sortBy(arr, String); + assert.strict.deepEqual(a, b); + }); + + mocha.it(`compare_by_key(it => it.x)`, function() { + const key_getter = it => it.x; + const arr = _.shuffle(_.times(100, i => Math.floor(100 * Math.random())).map(x => ({ x }))); + const a = arr.slice().sort(sort_utils.compare_by_key(key_getter)); + const b = _.sortBy(arr, key_getter); + assert.strict.deepEqual(a, b); + }); + + mocha.describe('SortedLimit*', function() { + + const ctors = [ + sort_utils.SortedLimitEveryPush, + sort_utils.SortedLimitEveryBatch, + sort_utils.SortedLimitSplice, + sort_utils.SortedLimitShift, + ]; + + function test_all(limit, count, prefix = 'prefix-long-enough/and-sub-dir/') { + mocha.it(`limit=${limit} count=${count}`, function() { + const impls = ctors.map(Ctor => new Ctor(limit)); + const gen = crypto.createCipheriv('aes-128-gcm', crypto.randomBytes(16), crypto.randomBytes(12)); + for (let j = 0; j < count; ++j) { + const val = prefix + gen.update(Buffer.alloc(8)).toString('base64'); + for (const impl of impls) impl.push(val); + } + let results; + for (const impl of impls) { + const r = impl.end(); + console.log(`limit=${limit} count=${count} ctor=${impl.constructor.name} last=${r[r.length - 1]}`); + if (results) { + assert.strict.deepEqual(r, results); + } else { + results = r; + } + } + }); + } + + test_all(1, 0); + test_all(1, 1); + test_all(1, 2); + test_all(1, 10); + test_all(1, 100); + + test_all(2, 0); + test_all(2, 1); + test_all(2, 2); + test_all(2, 10); + + test_all(1000, 1000); + test_all(1000, 2000); + test_all(1000, 10000); + }); + +}); diff --git a/src/test/unrelated/measure_bind_perf.js b/src/test/unrelated/benchmark_self_bind.js similarity index 93% rename from src/test/unrelated/measure_bind_perf.js rename to src/test/unrelated/benchmark_self_bind.js index d37ff5a372..1f21ad3290 100644 --- a/src/test/unrelated/measure_bind_perf.js +++ b/src/test/unrelated/benchmark_self_bind.js @@ -2,7 +2,7 @@ 'use strict'; var _ = require('lodash'); -var js_utils = require('../util/js_utils'); +var js_utils = require('../../util/js_utils'); function Clazz() { /* Clazz? */ } @@ -17,7 +17,7 @@ Clazz.prototype.measure = function() { var count = 0; var run = true; while (run) { - for (var i = 0; i < 100000; ++i) { + for (var i = 0; i < 1000000; ++i) { if (self.func() !== self) { throw new Error('HUH'); } diff --git a/src/test/unrelated/benchmark_sorted_limit.js b/src/test/unrelated/benchmark_sorted_limit.js new file mode 100644 index 0000000000..87a65e094d --- /dev/null +++ b/src/test/unrelated/benchmark_sorted_limit.js @@ -0,0 +1,50 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const argv = require('minimist')(process.argv.slice(2)); +// const util = require('util'); +// const assert = require('assert'); +const crypto = require('crypto'); + +const sort_utils = require('../../util/sort_utils'); + +const gen = crypto.createCipheriv('aes-128-gcm', crypto.randomBytes(16), crypto.randomBytes(12)); + +function bench(Ctor, limit, compare, count, time) { + const start = Date.now(); + let loops = 0; + while (Date.now() - start < time) { + const impl = new Ctor(limit, compare); + for (let j = 0; j < count; ++j) { + const val = argv.prefix + gen.update(Buffer.alloc(8)).toString('base64'); + impl.push(val); + } + impl.end(); + loops += 1; + } + const end = Date.now(); + return loops / (end - start) * 1000; +} + +function main() { + argv.time = argv.time || 200; + argv.limit = argv.limit || 1000; + argv.prefix = argv.prefix || ''; + console.log(argv); + const ctors = [ + sort_utils.SortedLimitSplice, + sort_utils.SortedLimitShift, + sort_utils.SortedLimitEveryBatch, + sort_utils.SortedLimitEveryPush, + ]; + for (let count = 1; count < 200000; count *= 2) { + process.stdout.write(`count: ${count} loops/sec: { `); + for (const ctor of ctors) { + const loops_per_sec = bench(ctor, argv.limit, undefined, count, argv.time); + process.stdout.write(`${ctor.name}: ${loops_per_sec.toFixed(1)} `); + } + process.stdout.write('}\n'); + } +} + +main(); diff --git a/src/tools/mongo_md_stats.js b/src/tools/mongo_md_stats.js index 49fc6e33e0..80859af467 100644 --- a/src/tools/mongo_md_stats.js +++ b/src/tools/mongo_md_stats.js @@ -32,7 +32,7 @@ function stat_collection(name) { total_index_size += stats.totalIndexSize; print(`Collection ${name}:`); print(` Count : ${stats.count}`); - print(` Count per Object : ${count_per_object.toFixed(2)}`); + print(` Average per Object : ${count_per_object.toFixed(2)}`); print(` Average Storage Size : ${human_bytes(storage_size)}`); print(` Average Index Size : ${human_bytes(index_size)}`); print(); diff --git a/src/util/fips.js b/src/util/fips.js index feeb9b1c76..1e5d7abb3c 100644 --- a/src/util/fips.js +++ b/src/util/fips.js @@ -47,6 +47,24 @@ class HashWrap extends stream.Transform { } } +/** + * @param {string} algorithm + * @param {object} options + * @returns {crypto.Hash} + */ +function create_hash_fips_wrapper(algorithm, options) { + switch (algorithm) { + case 'md5': { + const MD5_MB = nb_native().MD5_MB; + const wrap = new HashWrap(new MD5_MB()); + // @ts-ignore + return wrap; + } + default: + return original_crypto.createHash(algorithm, options); + } +} + /** * @param {Boolean} mode */ @@ -55,15 +73,7 @@ function set_fips_mode(mode = detect_fips_mode()) { nb_native().set_fips_mode(mode); if (mode) { // monkey-patch the crypto.createHash() function to provide a non-crypto md5 flow - crypto.createHash = function(algorithm, options) { - switch (algorithm) { - case 'md5': { - return new HashWrap(new(nb_native().MD5_MB)()); - } - default: - return original_crypto.createHash(algorithm, options); - } - }; + crypto.createHash = create_hash_fips_wrapper; } else if (crypto.createHash !== original_crypto.createHash) { crypto.createHash = original_crypto.createHash; } @@ -94,3 +104,4 @@ exports.get_fips_mode = get_fips_mode; exports.set_fips_mode = set_fips_mode; exports.detect_fips_mode = detect_fips_mode; exports.original_crypto = original_crypto; +exports.create_hash_fips_wrapper = create_hash_fips_wrapper; diff --git a/src/util/fs_utils.js b/src/util/fs_utils.js index 29068b07ba..e90d7d5dee 100644 --- a/src/util/fs_utils.js +++ b/src/util/fs_utils.js @@ -11,6 +11,7 @@ const crypto = require('crypto'); const P = require('./promise'); const Semaphore = require('./semaphore'); +const sort_utils = require('./sort_utils'); const promise_utils = require('./promise_utils'); const get_folder_size = P.promisify(require('get-folder-size')); @@ -307,6 +308,41 @@ function ignore_enoent(err) { if (err.code !== 'ENOENT') throw err; } + +/** + * @param {fs.Dirent} entry + * @returns {string} + */ +function get_dirent_name(entry) { + return entry.name; +} + +/** + * @param {object} params + * @property {string} [params.dir_path] + * @property {string} [params.prefix] + * @property {string} [params.marker] + * @property {number} [params.limit] + * @property {number} [params.buffer] + * @returns {Promise} + */ +async function read_dir_sorted_limit({ + dir_path = '.', + prefix = '', + marker = '', + limit = 1000, + buffer = 1000, +}) { + const dir = await fs.promises.opendir(dir_path, { bufferSize: buffer }); + const sorter = new sort_utils.SortedLimitSplice(limit, get_dirent_name); + for await (const entry of dir) { + if (!entry.name.startsWith(prefix)) continue; + if (entry.name < marker) continue; + sorter.push(entry); + } + return sorter.end(); +} + // EXPORTS exports.file_must_not_exist = file_must_not_exist; exports.file_must_exist = file_must_exist; @@ -329,3 +365,4 @@ exports.ignore_enoent = ignore_enoent; exports.PRIVATE_DIR_PERMISSIONS = PRIVATE_DIR_PERMISSIONS; exports.get_folder_size = get_folder_size; exports.file_exists = file_exists; +exports.read_dir_sorted_limit = read_dir_sorted_limit; diff --git a/src/util/sort_utils.js b/src/util/sort_utils.js new file mode 100644 index 0000000000..257dcdb8d4 --- /dev/null +++ b/src/util/sort_utils.js @@ -0,0 +1,262 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const _ = require('lodash'); + +/** + * @template T + * @typedef {(a: T, b: T) => -1|1|0} CompareFunc + */ + +/** + * @template T + * @template K + * @typedef {(a: T) => K} KeyGetter + */ + +/** + * Compare function for arrays with comparable values + * such as numbers, booleans, dates, which do not sort well with the default string sort. + * @template T + * @param {T} a + * @param {T} b + * @returns {-1|1|0} + * @see https://www.ecma-international.org/ecma-262/#sec-sortcompare + * @example + * > [3,2020,1].sort(sort_utils.compare_by_value) + * [ 1, 3, 2020 ] + * > [3,2020,1].sort() + * [ 1, 2020, 3 ] + */ +function compare_by_value(a, b) { + if (a < b) return -1; + if (a > b) return 1; + return 0; +} + +/** + * Compare function for arrays with comparable values + * such as numbers, booleans, dates, which do not sort well with the default string sort. + * @template T + * @param {T} a + * @param {T} b + * @returns {-1|1|0} + * @example + * > [3,2020,1].sort(sort_utils.compare_by_value_reverse) + * [ 2020, 3, 1 ] + */ +function compare_by_value_reverse(a, b) { + if (a < b) return 1; + if (a > b) return -1; + return 0; +} + + +/** + * Returns a compare function from given key_getter function + * @template T The array item type + * @template K A comparable key type + * @param {(item: T) => K} key_getter takes array item and returns a comparable key + * @param {number} order 1 for ascending order, -1 for decending + * @returns {CompareFunc} + * @example + * > [{a:1,b:9},{a:2,b:1}].sort(sort_utils.compare_by_key(item => item.b)) + * [ { a: 2, b: 1 }, { a: 1, b: 9 } ] + */ +function compare_by_key(key_getter = undefined, order = 1) { + if (key_getter) { + return order >= 0 ? + function compare_by_key_func(a, b) { return compare_by_value(key_getter(a), key_getter(b)); } : + function compare_by_key_func(a, b) { return compare_by_value_reverse(key_getter(a), key_getter(b)); }; + } + return order >= 0 ? + function compare_by_key_func(a, b) { return compare_by_value(a, b); } : + function compare_by_key_func(a, b) { return compare_by_value_reverse(a, b); }; +} + +/** + * @template T + * @template K + */ +class SortedLimitSplice { + + /** + * @param {number} limit > 0 + * @param {KeyGetter} key_getter + */ + constructor(limit, key_getter = undefined) { + this._limit = limit; + this._key_getter = key_getter; + this._arr = []; + } + + /** + * @param {T} val + */ + push(val) { + const arr = this._arr; + const pos = _.sortedLastIndexBy(arr, val, this._key_getter); + if (pos < this._limit) { + arr.splice(pos, 0, val); + if (arr.length >= this._limit * 2) { + arr.length = this._limit; + } + } + } + + /** + * @returns {T[]} + */ + end() { + const arr = this._arr; + if (arr.length > this._limit) { + arr.length = this._limit; + } + return arr; + } +} + +/** + * @template T + * @template K + */ +class SortedLimitShift { + + /** + * @param {number} limit > 0 + * @param {KeyGetter} key_getter + */ + constructor(limit, key_getter = undefined) { + this._limit = limit; + this._compare = compare_by_key(key_getter); + this._pos = 0; + this._arr = new Array(limit); + } + + /** + * @param {T} val + */ + push(val) { + const arr = this._arr; + let i; + if (this._pos < arr.length) { + i = this._pos; + this._pos += 1; + } else { + i = arr.length - 1; + if (this._compare(val, arr[i]) >= 0) return; + } + while (i > 0 && this._compare(val, arr[i - 1]) < 0) { + arr[i] = arr[i - 1]; + i -= 1; + } + arr[i] = val; + } + + /** + * @returns {T[]} + */ + end() { + return this._pos < this._arr.length ? + this._arr.slice(0, this._pos) : + this._arr; + } +} + +/** + * @template T + * @template K + */ +class SortedLimitEveryBatch { + + /** + * @param {number} limit > 0 + * @param {KeyGetter} key_getter + */ + constructor(limit, key_getter = undefined) { + this._limit = limit; + this._compare = compare_by_key(key_getter); + this._pos = 0; + this._arr = new Array(limit * 2); + } + + /** + * @param {T} val + */ + push(val) { + const arr = this._arr; + arr[this._pos] = val; + this._pos += 1; + if (this._pos < this._limit) { + return; + } + if (this._pos === this._limit) { + arr.sort(this._compare); + } else if (this._pos >= arr.length) { + arr.sort(this._compare); + this._pos = this._limit; + } else if (this._compare(val, arr[this._limit - 1]) >= 0) { + this._pos -= 1; + } + } + + /** + * @returns {T[]} + */ + end() { + const arr = this._arr; + arr.length = this._pos; + arr.sort(this._compare); + if (arr.length > this._limit) { + arr.length = this._limit; + } + return arr; + } +} + +/** + * Simples and slowest. + * @template T + * @template K + */ +class SortedLimitEveryPush { + + /** + * @param {number} limit > 0 + * @param {KeyGetter} key_getter + */ + constructor(limit, key_getter = undefined) { + this._limit = limit; + this._compare = compare_by_key(key_getter); + this._pos = 0; + this._arr = new Array(limit + 1); + } + + /** + * @param {T} val + */ + push(val) { + this._arr[this._pos] = val; + this._arr.sort(this._compare); + if (this._pos < this._arr.length - 1) { + this._pos += 1; + } + } + + /** + * @returns {T[]} + */ + end() { + this._arr.length = this._pos; + return this._arr; + } +} + +exports.compare_by_value = compare_by_value; +exports.compare_by_value_reverse = compare_by_value_reverse; +exports.compare_by_key = compare_by_key; + +exports.SortedLimitSplice = SortedLimitSplice; +exports.SortedLimitShift = SortedLimitShift; +exports.SortedLimitEveryBatch = SortedLimitEveryBatch; +exports.SortedLimitEveryPush = SortedLimitEveryPush;