diff --git a/.vscode/settings.json b/.vscode/settings.json index e1b3178f354c..679b359629fd 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,7 @@ { "rust-analyzer.cargo.allTargets": true, "rust-analyzer.cargo.features": "all", + "rust-analyzer.procMacro.ignored": { "napi-derive": ["napi"] }, "rust-analyzer.linkedProjects": [ "${workspaceFolder}/core/Cargo.toml", "${workspaceFolder}/bindings/python/Cargo.toml", diff --git a/bindings/nodejs/generated.d.ts b/bindings/nodejs/generated.d.ts index 6f7db8c76cad..aabb1b7f303e 100644 --- a/bindings/nodejs/generated.d.ts +++ b/bindings/nodejs/generated.d.ts @@ -74,6 +74,158 @@ export interface StatOptions { */ overrideContentDisposition?: string } +export interface ReadOptions { + /** + * Set `version` for this operation. + * + * This option can be used to retrieve the data of a specified version of the given path. + */ + version?: string + /** + * Set `concurrent` for the operation. + * + * OpenDAL by default to read file without concurrent. This is not efficient for cases when users + * read large chunks of data. By setting `concurrent`, opendal will reading files concurrently + * on support storage services. + * + * By setting `concurrent`, opendal will fetch chunks concurrently with + * the give chunk size. + */ + concurrent?: number + /** + * Sets the chunk size for this operation. + * + * OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs. + */ + chunk?: number + /** + * Controls the optimization strategy for range reads in [`Reader::fetch`]. + * + * When performing range reads, if the gap between two requested ranges is smaller than + * the configured `gap` size, OpenDAL will merge these ranges into a single read request + * and discard the unrequested data in between. This helps reduce the number of API calls + * to remote storage services. + * + * This optimization is particularly useful when performing multiple small range reads + * that are close to each other, as it reduces the overhead of multiple network requests + * at the cost of transferring some additional data. + */ + gap?: bigint + /** + * Sets the offset (starting position) for range read operations. + * The read will start from this position in the file. + */ + offset?: bigint + /** + * Sets the size (length) for range read operations. + * The read will continue for this many bytes after the offset. + */ + size?: bigint + /** + * Sets if-match condition for this operation. + * If file exists and its etag doesn't match, an error will be returned. + */ + ifMatch?: string + /** + * Sets if-none-match condition for this operation. + * If file exists and its etag matches, an error will be returned. + */ + ifNoneMatch?: string + /** + * Sets if-modified-since condition for this operation. + * If file exists and hasn't been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + ifModifiedSince?: string + /** + * Sets if-unmodified-since condition for this operation. + * If file exists and has been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + ifUnmodifiedSince?: string + /** + * Specify the `content-type` header that should be sent back by the operation. + * + * This option is only meaningful when used along with presign. + */ + contentType?: string + /** + * Specify the `cache-control` header that should be sent back by the operation. + * + * This option is only meaningful when used along with presign. + */ + cacheControl?: string + /** + * Specify the `content-disposition` header that should be sent back by the operation. + * + * This option is only meaningful when used along with presign. + */ + contentDisposition?: string +} +export interface ReaderOptions { + /** + * Set `version` for this operation. + * + * This option can be used to retrieve the data of a specified version of the given path. + */ + version?: string + /** + * Set `concurrent` for the operation. + * + * OpenDAL by default to read file without concurrent. This is not efficient for cases when users + * read large chunks of data. By setting `concurrent`, opendal will reading files concurrently + * on support storage services. + * + * By setting `concurrent`, opendal will fetch chunks concurrently with + * the give chunk size. + */ + concurrent?: number + /** + * Sets the chunk size for this operation. + * + * OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs. + */ + chunk?: number + /** + * Controls the optimization strategy for range reads in [`Reader::fetch`]. + * + * When performing range reads, if the gap between two requested ranges is smaller than + * the configured `gap` size, OpenDAL will merge these ranges into a single read request + * and discard the unrequested data in between. This helps reduce the number of API calls + * to remote storage services. + * + * This optimization is particularly useful when performing multiple small range reads + * that are close to each other, as it reduces the overhead of multiple network requests + * at the cost of transferring some additional data. + */ + gap?: bigint + /** + * Sets if-match condition for this operation. + * If file exists and its etag doesn't match, an error will be returned. + */ + ifMatch?: string + /** + * Sets if-none-match condition for this operation. + * If file exists and its etag matches, an error will be returned. + */ + ifNoneMatch?: string + /** + * Sets if-modified-since condition for this operation. + * If file exists and hasn't been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + ifModifiedSince?: string + /** + * Sets if-unmodified-since condition for this operation. + * If file exists and has been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + ifUnmodifiedSince?: string +} export const enum EntryMode { /** FILE means the path has data to read. */ FILE = 0, @@ -157,6 +309,12 @@ export class Capability { get statWithOverrideContentDisposition(): boolean /** If operator supports read. */ get read(): boolean + /** If operator supports read with version. */ + get readWithVersion(): boolean + /** If operator supports read with range. */ + get readWithIfModifiedSince(): boolean + /** If operator supports read with if unmodified since. */ + get readWithIfUnmodifiedSince(): boolean /** If operator supports read with if matched. */ get readWithIfMatch(): boolean /** If operator supports read with if not match. */ @@ -332,13 +490,13 @@ export class Operator { * const buf = await op.read("path/to/file"); * ``` */ - read(path: string): Promise + read(path: string, options?: ReadOptions | undefined | null): Promise /** * Create a reader to read the given path. * * It could be used to read large file in a streaming way. */ - reader(path: string): Promise + reader(path: string, options?: ReaderOptions | undefined | null): Promise /** * Read the whole path into a buffer synchronously. * @@ -347,13 +505,13 @@ export class Operator { * const buf = op.readSync("path/to/file"); * ``` */ - readSync(path: string): Buffer + readSync(path: string, options?: ReadOptions | undefined | null): Buffer /** * Create a reader to read the given path synchronously. * * It could be used to read large file in a streaming way. */ - readerSync(path: string): BlockingReader + readerSync(path: string, options?: ReaderOptions | undefined | null): BlockingReader /** * Write bytes into a path. * diff --git a/bindings/nodejs/src/capability.rs b/bindings/nodejs/src/capability.rs index a0e904415d0d..6f50118b82a5 100644 --- a/bindings/nodejs/src/capability.rs +++ b/bindings/nodejs/src/capability.rs @@ -96,6 +96,24 @@ impl Capability { self.0.read } + /// If operator supports read with version. + #[napi(getter)] + pub fn read_with_version(&self) -> bool { + self.0.read_with_version + } + + /// If operator supports read with range. + #[napi(getter)] + pub fn read_with_if_modified_since(&self) -> bool { + self.0.read_with_if_modified_since + } + + /// If operator supports read with if unmodified since. + #[napi(getter)] + pub fn read_with_if_unmodified_since(&self) -> bool { + self.0.read_with_if_unmodified_since + } + /// If operator supports read with if matched. #[napi(getter)] pub fn read_with_if_match(&self) -> bool { diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 2a71d22381c7..392f66acd7a6 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -27,7 +27,7 @@ use std::time::Duration; use futures::AsyncReadExt; use futures::TryStreamExt; use napi::bindgen_prelude::*; -use opendal::options::StatOptions; +use opendal::options::{ReadOptions, ReaderOptions, StatOptions}; mod capability; mod options; @@ -214,10 +214,15 @@ impl Operator { /// const buf = await op.read("path/to/file"); /// ``` #[napi] - pub async fn read(&self, path: String) -> Result { + pub async fn read( + &self, + path: String, + options: Option, + ) -> Result { + let options = options.map_or_else(ReadOptions::default, ReadOptions::from); let res = self .async_op - .read(&path) + .read_options(&path, options) .await .map_err(format_napi_error)? .to_vec(); @@ -228,15 +233,20 @@ impl Operator { /// /// It could be used to read large file in a streaming way. #[napi] - pub async fn reader(&self, path: String) -> Result { + pub async fn reader( + &self, + path: String, + options: Option, + ) -> Result { + let options = options.map_or_else(ReaderOptions::default, ReaderOptions::from); let r = self .async_op - .reader(&path) + .reader_options(&path, options) .await .map_err(format_napi_error)?; Ok(Reader { inner: r - .into_futures_async_read(..) + .into_futures_async_read(std::ops::RangeFull) .await .map_err(format_napi_error)?, }) @@ -249,10 +259,11 @@ impl Operator { /// const buf = op.readSync("path/to/file"); /// ``` #[napi] - pub fn read_sync(&self, path: String) -> Result { + pub fn read_sync(&self, path: String, options: Option) -> Result { + let options = options.map_or_else(ReadOptions::default, ReadOptions::from); let res = self .blocking_op - .read(&path) + .read_options(&path, options) .map_err(format_napi_error)? .to_vec(); Ok(res.into()) @@ -262,8 +273,16 @@ impl Operator { /// /// It could be used to read large file in a streaming way. #[napi] - pub fn reader_sync(&self, path: String) -> Result { - let r = self.blocking_op.reader(&path).map_err(format_napi_error)?; + pub fn reader_sync( + &self, + path: String, + options: Option, + ) -> Result { + let options = options.map_or_else(ReaderOptions::default, ReaderOptions::from); + let r = self + .blocking_op + .reader_options(&path, options) + .map_err(format_napi_error)?; Ok(BlockingReader { inner: r.into_std_read(..).map_err(format_napi_error)?, }) diff --git a/bindings/nodejs/src/options.rs b/bindings/nodejs/src/options.rs index 93657d45a8a2..6b67546fd77a 100644 --- a/bindings/nodejs/src/options.rs +++ b/bindings/nodejs/src/options.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -use opendal::raw::parse_datetime_from_rfc3339; +use napi::bindgen_prelude::BigInt; +use opendal::raw::{parse_datetime_from_rfc3339, BytesRange}; #[napi(object)] #[derive(Debug)] @@ -94,3 +95,237 @@ impl From for opendal::options::StatOptions { } } } + +#[napi(object)] +#[derive(Default, Debug)] +pub struct ReadOptions { + /** + * Set `version` for this operation. + * + * This option can be used to retrieve the data of a specified version of the given path. + */ + pub version: Option, + + /** + * Set `concurrent` for the operation. + * + * OpenDAL by default to read file without concurrent. This is not efficient for cases when users + * read large chunks of data. By setting `concurrent`, opendal will reading files concurrently + * on support storage services. + * + * By setting `concurrent`, opendal will fetch chunks concurrently with + * the give chunk size. + */ + pub concurrent: Option, + + /** + * Sets the chunk size for this operation. + * + * OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs. + */ + pub chunk: Option, + + /** + * Controls the optimization strategy for range reads in [`Reader::fetch`]. + * + * When performing range reads, if the gap between two requested ranges is smaller than + * the configured `gap` size, OpenDAL will merge these ranges into a single read request + * and discard the unrequested data in between. This helps reduce the number of API calls + * to remote storage services. + * + * This optimization is particularly useful when performing multiple small range reads + * that are close to each other, as it reduces the overhead of multiple network requests + * at the cost of transferring some additional data. + */ + pub gap: Option, + + /** + * Sets the offset (starting position) for range read operations. + * The read will start from this position in the file. + */ + pub offset: Option, + + /** + * Sets the size (length) for range read operations. + * The read will continue for this many bytes after the offset. + */ + pub size: Option, + + /** + * Sets if-match condition for this operation. + * If file exists and its etag doesn't match, an error will be returned. + */ + pub if_match: Option, + + /** + * Sets if-none-match condition for this operation. + * If file exists and its etag matches, an error will be returned. + */ + pub if_none_match: Option, + + /** + * Sets if-modified-since condition for this operation. + * If file exists and hasn't been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + pub if_modified_since: Option, + + /** + * Sets if-unmodified-since condition for this operation. + * If file exists and has been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + pub if_unmodified_since: Option, + + /** + * Specify the `content-type` header that should be sent back by the operation. + * + * This option is only meaningful when used along with presign. + */ + pub content_type: Option, + + /** + * Specify the `cache-control` header that should be sent back by the operation. + * + * This option is only meaningful when used along with presign. + */ + pub cache_control: Option, + + /** + * Specify the `content-disposition` header that should be sent back by the operation. + * + * This option is only meaningful when used along with presign. + */ + pub content_disposition: Option, +} + +impl ReadOptions { + pub fn make_range(&self) -> BytesRange { + let offset = self.offset.clone().map(|offset| offset.get_u64().1); + let size = self.size.clone().map(|size| size.get_u64().1); + BytesRange::new(offset.unwrap_or_default(), size) + } +} + +impl From for opendal::options::ReadOptions { + fn from(value: ReadOptions) -> Self { + let range = value.make_range(); + let if_modified_since = value + .if_modified_since + .and_then(|v| parse_datetime_from_rfc3339(&v).ok()); + let if_unmodified_since = value + .if_unmodified_since + .and_then(|v| parse_datetime_from_rfc3339(&v).ok()); + + Self { + version: value.version, + concurrent: value.concurrent.unwrap_or_default() as usize, + chunk: value.chunk.map(|chunk| chunk as usize), + gap: value.gap.map(|gap| gap.get_u64().1 as usize), + range, + if_match: value.if_match, + if_none_match: value.if_none_match, + if_modified_since, + if_unmodified_since, + override_content_type: value.content_type, + override_cache_control: value.cache_control, + override_content_disposition: value.content_disposition, + } + } +} + +#[napi(object)] +#[derive(Default)] +pub struct ReaderOptions { + /** + * Set `version` for this operation. + * + * This option can be used to retrieve the data of a specified version of the given path. + */ + pub version: Option, + + /** + * Set `concurrent` for the operation. + * + * OpenDAL by default to read file without concurrent. This is not efficient for cases when users + * read large chunks of data. By setting `concurrent`, opendal will reading files concurrently + * on support storage services. + * + * By setting `concurrent`, opendal will fetch chunks concurrently with + * the give chunk size. + */ + pub concurrent: Option, + + /** + * Sets the chunk size for this operation. + * + * OpenDAL will use services' preferred chunk size by default. Users can set chunk based on their own needs. + */ + pub chunk: Option, + + /** + * Controls the optimization strategy for range reads in [`Reader::fetch`]. + * + * When performing range reads, if the gap between two requested ranges is smaller than + * the configured `gap` size, OpenDAL will merge these ranges into a single read request + * and discard the unrequested data in between. This helps reduce the number of API calls + * to remote storage services. + * + * This optimization is particularly useful when performing multiple small range reads + * that are close to each other, as it reduces the overhead of multiple network requests + * at the cost of transferring some additional data. + */ + pub gap: Option, + + /** + * Sets if-match condition for this operation. + * If file exists and its etag doesn't match, an error will be returned. + */ + pub if_match: Option, + + /** + * Sets if-none-match condition for this operation. + * If file exists and its etag matches, an error will be returned. + */ + pub if_none_match: Option, + + /** + * Sets if-modified-since condition for this operation. + * If file exists and hasn't been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + pub if_modified_since: Option, + + /** + * Sets if-unmodified-since condition for this operation. + * If file exists and has been modified since the specified time, an error will be returned. + * ISO 8601 formatted date string + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString + */ + pub if_unmodified_since: Option, +} + +impl From for opendal::options::ReaderOptions { + fn from(value: ReaderOptions) -> Self { + let if_modified_since = value + .if_modified_since + .and_then(|v| parse_datetime_from_rfc3339(&v).ok()); + let if_unmodified_since = value + .if_unmodified_since + .and_then(|v| parse_datetime_from_rfc3339(&v).ok()); + + Self { + version: value.version, + concurrent: value.concurrent.unwrap_or_default() as usize, + chunk: value.chunk.map(|chunk| chunk as usize), + gap: value.gap.map(|gap| gap.get_u64().1 as usize), + if_match: value.if_match, + if_none_match: value.if_none_match, + if_modified_since, + if_unmodified_since, + } + } +} diff --git a/bindings/nodejs/tests/suites/asyncReadOptions.suite.mjs b/bindings/nodejs/tests/suites/asyncReadOptions.suite.mjs new file mode 100644 index 000000000000..0cb376fc364e --- /dev/null +++ b/bindings/nodejs/tests/suites/asyncReadOptions.suite.mjs @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { randomUUID } from 'node:crypto' +import { test, describe, expect, assert } from 'vitest' +import { Writable } from 'node:stream' +import { finished, pipeline } from 'node:stream/promises' + +import { generateBytes, generateFixedBytes, sleep } from '../utils.mjs' + +/** + * @param {import("../../index").Operator} op + */ +export function run(op) { + const capability = op.capability() + + describe.runIf(capability.read && capability.write)('async read options', () => { + test('read with range', async () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + const offset = Math.floor(Math.random() * (size - 1)) + const maxLen = size - offset + const length = Math.floor(Math.random() * (maxLen - 1)) + 1 + + await op.write(filename, content) + + const bs = await op.read(filename, { + offset: BigInt(offset), + size: BigInt(length), + }) + expect(bs.length).toBe(length) + assert.equal(Buffer.compare(bs, content.subarray(offset, offset + length)), 0) + + await op.delete(filename) + }) + + test.runIf(capability.readWithIfMatch)('read with if match', async () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + await op.write(filename, content) + const meta = await op.stat(filename) + + const invalidOptions = { + ifMatch: '"invalid_etag"', + } + + await expect(op.read(filename, invalidOptions)).rejects.toThrowError('ConditionNotMatch') + + const bs = await op.read(filename, { + ifMatch: meta.etag, + }) + + assert.equal(Buffer.compare(bs, content), 0) + + await op.delete(filename) + }) + + test.runIf(capability.readWithIfNoneMatch)('read with if none match', async () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + await op.write(filename, content) + const meta = await op.stat(filename) + + const invalidOptions = { + ifNoneMatch: meta.etag, + } + + await expect(op.read(filename, invalidOptions)).rejects.toThrowError('ConditionNotMatch') + + const bs = await op.read(filename, { + ifNoneMatch: '"invalid_etag"', + }) + + assert.equal(Buffer.compare(bs, content), 0) + + await op.delete(filename) + }) + + test.runIf(capability.readWithIfModifiedSince)('read with if modified since', async () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateBytes(size) + + await op.write(filename, content) + const meta = await op.stat(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 1) + const bs = await op.read(filename, { ifModifiedSince: sinceMinus.toISOString() }) + assert.equal(Buffer.compare(bs, content), 0) + + await sleep(1000) + + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + + await expect(op.read(filename, { ifModifiedSince: sinceAdd.toISOString() })).rejects.toThrowError( + 'ConditionNotMatch', + ) + await op.delete(filename) + }) + + test.runIf(capability.readWithIfUnmodifiedSince)('read with if unmodified since', async () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateBytes(size) + + await op.write(filename, content) + const meta = await op.stat(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 3600) + await expect(op.read(filename, { ifUnmodifiedSince: sinceMinus.toISOString() })).rejects.toThrowError( + 'ConditionNotMatch', + ) + + await sleep(1000) + + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + + const bs = await op.read(filename, { ifUnmodifiedSince: sinceAdd.toISOString() }) + assert.equal(Buffer.compare(bs, content), 0) + + await op.delete(filename) + }) + + test.runIf(capability.readWithVersion)('read with version', async () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + await op.write(filename, content) + const meta = await op.stat(filename) + const data = await op.read(filename, { version: meta.version }) + assert.equal(Buffer.compare(data, content), 0) + + await op.write(filename, Buffer.from('1')) + // After writing new data, we can still read the first version data + const secondData = await op.read(filename, { version: meta.version }) + assert.equal(Buffer.compare(secondData, content), 0) + + await op.delete(filename) + }) + + test.runIf(capability.readWithVersion)('read with not existing version', async () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + await op.write(filename, content) + const meta = await op.stat(filename) + + const filename2 = `random_file_${randomUUID()}` + const content2 = generateBytes() + await op.write(filename2, content2) + + await expect(op.read(filename2, { version: meta.version })).rejects.toThrowError('NotFound') + + await op.delete(filename) + await op.delete(filename2) + }) + }) + + describe.runIf(capability.read && capability.write)('async reader options', () => { + test.runIf(capability.readWithIfMatch)('reader with if match', async () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + await op.write(filename, content) + const meta = await op.stat(filename) + + const invalidOptions = { + ifMatch: '"invalid_etag"', + } + + const reader = await op.reader(filename, invalidOptions) + const buf = Buffer.alloc(content.length) + await expect(reader.read(buf)).rejects.toThrowError('ConditionNotMatch') + + const r = await op.reader(filename, { ifMatch: meta.etag }) + const rs = r.createReadStream() + let chunks = [] + await pipeline( + rs, + new Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk) + callback() + }, + }), + ) + + await finished(rs) + const bs = Buffer.concat(chunks) + + assert.equal(Buffer.compare(bs, content), 0) + await op.delete(filename) + }) + + test.runIf(capability.readWithIfNoneMatch)('reader with if none match', async () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + await op.write(filename, content) + const meta = await op.stat(filename) + + const reader = await op.reader(filename, { ifNoneMatch: meta.etag }) + const buf = Buffer.alloc(content.length) + await expect(reader.read(buf)).rejects.toThrowError('ConditionNotMatch') + + const r = await op.reader(filename, { ifNoneMatch: '"invalid_etag"' }) + const rs = r.createReadStream() + let chunks = [] + await pipeline( + rs, + new Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk) + callback() + }, + }), + ) + + await finished(rs) + const bs = Buffer.concat(chunks) + + assert.equal(Buffer.compare(bs, content), 0) + await op.delete(filename) + }) + + test.runIf(capability.readWithIfModifiedSince)('reader with if modified since', async () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + await op.write(filename, content) + const meta = await op.stat(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 1) + const reader = await op.reader(filename, { ifModifiedSince: sinceMinus.toISOString() }) + const rs = reader.createReadStream() + let chunks = [] + await pipeline( + rs, + new Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk) + callback() + }, + }), + ) + + await finished(rs) + const buf = Buffer.concat(chunks) + assert.equal(Buffer.compare(buf, content), 0) + + await sleep(1000) + + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + const r = await op.reader(filename, { ifModifiedSince: sinceAdd.toISOString() }) + const bs2 = Buffer.alloc(content.length) + await expect(r.read(bs2)).rejects.toThrowError('ConditionNotMatch') + + await op.delete(filename) + }) + + test.runIf(capability.readWithIfUnmodifiedSince)('reader with if unmodified since', async () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + await op.write(filename, content) + const meta = await op.stat(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 1) + + const r = await op.reader(filename, { ifUnmodifiedSince: sinceMinus.toISOString() }) + const bs = Buffer.alloc(content.length) + await expect(r.read(bs)).rejects.toThrowError('ConditionNotMatch') + + await sleep(1000) + + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + + const reader = await op.reader(filename, { ifUnmodifiedSince: sinceAdd.toISOString() }) + const rs = reader.createReadStream() + let chunks = [] + await pipeline( + rs, + new Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk) + callback() + }, + }), + ) + + await finished(rs) + const bs2 = Buffer.concat(chunks) + assert.equal(Buffer.compare(bs2, content), 0) + + await op.delete(filename) + }) + }) +} diff --git a/bindings/nodejs/tests/suites/index.mjs b/bindings/nodejs/tests/suites/index.mjs index dea86b72e682..a0bf4cda061b 100644 --- a/bindings/nodejs/tests/suites/index.mjs +++ b/bindings/nodejs/tests/suites/index.mjs @@ -26,6 +26,8 @@ import { run as ServicesTestRun } from './services.suite.mjs' import { run as SyncIOTestRun } from './sync.suite.mjs' import { run as AsyncStatOptionsTestRun } from './asyncStatOptions.suite.mjs' import { run as SyncStatOptionsTestRun } from './syncStatOptions.suite.mjs' +import { run as AsyncReadOptionsTestRun } from './asyncReadOptions.suite.mjs' +import { run as SyncReadOptionsTestRun } from './syncReadOptions.suite.mjs' export function runner(testName, scheme) { if (!scheme) { @@ -57,5 +59,7 @@ export function runner(testName, scheme) { SyncIOTestRun(operator) AsyncStatOptionsTestRun(operator) SyncStatOptionsTestRun(operator) + AsyncReadOptionsTestRun(operator) + SyncReadOptionsTestRun(operator) }) } diff --git a/bindings/nodejs/tests/suites/syncReadOptions.suite.mjs b/bindings/nodejs/tests/suites/syncReadOptions.suite.mjs new file mode 100644 index 000000000000..8843091793c1 --- /dev/null +++ b/bindings/nodejs/tests/suites/syncReadOptions.suite.mjs @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { randomUUID } from 'node:crypto' +import { test, describe, expect, assert } from 'vitest' + +import { generateFixedBytes, generateBytes } from '../utils.mjs' + +/** + * @param {import("../../index").Operator} op + */ +export function run(op) { + const capability = op.capability() + + describe.runIf(capability.read && capability.write)('sync read options', () => { + test('read with range', () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + const offset = Math.floor(Math.random() * (size - 1)) + const maxLen = size - offset + const length = Math.floor(Math.random() * (maxLen - 1)) + 1 + + op.writeSync(filename, content) + + const bs = op.readSync(filename, { + offset: BigInt(offset), + size: BigInt(length), + }) + expect(bs.length).toBe(length) + assert.equal(Buffer.compare(bs, content.subarray(offset, offset + length)), 0) + + op.deleteSync(filename) + }) + + test.runIf(capability.readWithIfMatch)('read with if match', () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const invalidOptions = { + ifMatch: '"invalid_etag"', + } + + expect(() => op.readSync(filename, invalidOptions)).toThrowError('ConditionNotMatch') + + const bs = op.readSync(filename, { + ifMatch: meta.etag, + }) + + assert.equal(Buffer.compare(bs, content), 0) + + op.deleteSync(filename) + }) + + test.runIf(capability.readWithIfNoneMatch)('read with if none match', () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const invalidOptions = { + ifNoneMatch: meta.etag, + } + + expect(() => op.readSync(filename, invalidOptions)).toThrowError('ConditionNotMatch') + + const bs = op.readSync(filename, { + ifNoneMatch: '"invalid_etag"', + }) + + assert.equal(Buffer.compare(bs, content), 0) + + op.deleteSync(filename) + }) + + test.runIf(capability.readWithIfModifiedSince)('read with if modified since', () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateBytes(size) + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 1) + const bs = op.readSync(filename, { ifModifiedSince: sinceMinus.toISOString() }) + assert.equal(Buffer.compare(bs, content), 0) + + setTimeout(() => { + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + + expect(() => op.readSync(filename, { ifModifiedSince: sinceAdd.toISOString() })).toThrowError( + 'ConditionNotMatch', + ) + op.deleteSync(filename) + }, 1000) + }) + + test.runIf(capability.readWithIfUnmodifiedSince)('read with if unmodified since', () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateBytes(size) + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 3600) + expect(() => op.readSync(filename, { ifUnmodifiedSince: sinceMinus.toISOString() })).toThrowError( + 'ConditionNotMatch', + ) + + setTimeout(() => { + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + + const bs = op.readSync(filename, { ifUnmodifiedSince: sinceAdd.toISOString() }) + assert.equal(Buffer.compare(bs, content), 0) + + op.deleteSync(filename) + }, 1000) + }) + + test.runIf(capability.readWithVersion)('read with version', () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + op.writeSync(filename, content) + const meta = op.statSync(filename) + const data = op.readSync(filename, { version: meta.version }) + assert.equal(Buffer.compare(data, content), 0) + + op.writeSync(filename, Buffer.from('1')) + // After writing new data, we can still read the first version data + const secondData = op.readSync(filename, { version: meta.version }) + assert.equal(Buffer.compare(secondData, content), 0) + + op.deleteSync(filename) + }) + + test.runIf(capability.readWithVersion)('read with not existing version', () => { + const filename = `random_file_${randomUUID()}` + const content = generateBytes() + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const filename2 = `random_file_${randomUUID()}` + const content2 = generateBytes() + op.writeSync(filename2, content2) + + expect(() => op.readSync(filename2, { version: meta.version })).toThrowError('NotFound') + + op.deleteSync(filename) + op.deleteSync(filename2) + }) + }) + + describe.runIf(capability.read && capability.write)('sync reader options', () => { + test.runIf(capability.readWithIfMatch)('reader with if match', () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const invalidOptions = { + ifMatch: '"invalid_etag"', + } + + const reader = op.readerSync(filename, invalidOptions) + const buf = Buffer.alloc(content.length) + expect(() => reader.read(buf)).toThrowError('ConditionNotMatch') + + const r = op.readerSync(filename, { ifMatch: meta.etag }) + const rs = r.createReadStream() + + let chunks = [] + rs.on('data', (chunk) => { + chunks.push(chunk) + }) + + rs.on('end', () => { + const buf = Buffer.concat(chunks) + assert.equal(Buffer.compare(buf, content), 0) + + op.deleteSync(filename) + }) + }) + + test.runIf(capability.readWithIfNoneMatch)('reader with if none match', () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const reader = op.readerSync(filename, { ifNoneMatch: meta.etag }) + const buf = Buffer.alloc(content.length) + expect(() => reader.read(buf)).toThrowError('ConditionNotMatch') + + const r = op.readerSync(filename, { ifNoneMatch: '"invalid_etag"' }) + const rs = r.createReadStream() + + let chunks = [] + rs.on('data', (chunk) => { + chunks.push(chunk) + }) + + rs.on('end', () => { + const buf = Buffer.concat(chunks) + assert.equal(Buffer.compare(buf, content), 0) + + op.deleteSync(filename) + }) + }) + + test.runIf(capability.readWithIfModifiedSince)('reader with if modified since', () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 1) + const reader = op.readerSync(filename, { ifModifiedSince: sinceMinus.toISOString() }) + const rs = reader.createReadStream() + + let chunks = [] + rs.on('data', (chunk) => { + chunks.push(chunk) + }) + + rs.on('end', () => { + const buf = Buffer.concat(chunks) + assert.equal(Buffer.compare(buf, content), 0) + }) + + setTimeout(() => { + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + const r = op.readerSync(filename, { ifModifiedSince: sinceAdd.toISOString() }) + const bs2 = Buffer.alloc(content.length) + expect(() => r.read(bs2)).toThrowError('ConditionNotMatch') + + op.deleteSync(filename) + }, 1000) + }) + + test.runIf(capability.readWithIfUnmodifiedSince)('reader with if unmodified since', () => { + const size = 3 * 1024 * 1024 + const filename = `random_file_${randomUUID()}` + const content = generateFixedBytes(size) + + op.writeSync(filename, content) + const meta = op.statSync(filename) + + const sinceMinus = new Date(meta.lastModified) + sinceMinus.setSeconds(sinceMinus.getSeconds() - 1) + + const r = op.readerSync(filename, { ifUnmodifiedSince: sinceMinus.toISOString() }) + const bs = Buffer.alloc(content.length) + expect(() => r.read(bs)).toThrowError('ConditionNotMatch') + + setTimeout(() => { + const sinceAdd = new Date(meta.lastModified) + sinceAdd.setSeconds(sinceAdd.getSeconds() + 1) + + const reader = op.readerSync(filename, { ifUnmodifiedSince: sinceAdd.toISOString() }) + const rs = reader.createReadStream() + + let chunks = [] + rs.on('data', (chunk) => { + chunks.push(chunk) + }) + + rs.on('end', () => { + const buf = Buffer.concat(chunks) + assert.equal(Buffer.compare(buf, content), 0) + op.deleteSync(filename) + }) + }, 1000) + }) + }) +}