diff --git a/CHANGELOG-old.md b/CHANGELOG-old.md index 92494ac66c00..6cb0b9113265 100644 --- a/CHANGELOG-old.md +++ b/CHANGELOG-old.md @@ -19,6 +19,18 @@ # Historical Changelog +## [54.3.1](https://github.com/apache/arrow-rs/tree/54.3.1) (2025-03-26) + +[Full Changelog](https://github.com/apache/arrow-rs/compare/54.3.0...54.3.1) + +**Fixed bugs:** + +- Round trip encoding of list of fixed list fails when offset is not zero [\#7315](https://github.com/apache/arrow-rs/issues/7315) + +**Merged pull requests:** + +- Add missing type annotation [\#7326](https://github.com/apache/arrow-rs/pull/7326) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([mbrobbel](https://github.com/mbrobbel)) +- bugfix: correct offsets when serializing a list of fixed sized list and non-zero start offset [\#7318](https://github.com/apache/arrow-rs/pull/7318) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([timsaucer](https://github.com/timsaucer)) ## [54.3.0](https://github.com/apache/arrow-rs/tree/54.3.0) (2025-03-17) [Full Changelog](https://github.com/apache/arrow-rs/compare/53.4.1...54.3.0) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a72d99ffa38..26156f76bdea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,18 +19,81 @@ # Changelog -## [54.3.1](https://github.com/apache/arrow-rs/tree/54.3.1) (2025-03-26) +## [55.0.0](https://github.com/apache/arrow-rs/tree/55.0.0) (2025-04-07) -[Full Changelog](https://github.com/apache/arrow-rs/compare/54.3.0...54.3.1) +[Full Changelog](https://github.com/apache/arrow-rs/compare/54.3.1...55.0.0) + +**Breaking changes:** + +- Remove `AsyncFileReader::get_metadata_with_options`, add `options` to `AsyncFileReader::get_metadata` [\#7342](https://github.com/apache/arrow-rs/pull/7342) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([corwinjoy](https://github.com/corwinjoy)) +- Parquet: Support reading Parquet metadata via suffix range requests [\#7334](https://github.com/apache/arrow-rs/pull/7334) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([kylebarron](https://github.com/kylebarron)) +- Upgrade to `object_store` to `0.12.0` [\#7328](https://github.com/apache/arrow-rs/pull/7328) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([mbrobbel](https://github.com/mbrobbel)) +- Upgrade `pyo3` to `0.24` [\#7324](https://github.com/apache/arrow-rs/pull/7324) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([mbrobbel](https://github.com/mbrobbel)) +- Reapply Box `FlightErrror::tonic` to reduce size \(fixes nightly clippy\) [\#7277](https://github.com/apache/arrow-rs/pull/7277) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] [[arrow-flight](https://github.com/apache/arrow-rs/labels/arrow-flight)] ([alamb](https://github.com/alamb)) +- Improve parquet gzip compression performance using zlib-rs [\#7200](https://github.com/apache/arrow-rs/pull/7200) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([psvri](https://github.com/psvri)) +- Fix: `date_part` to extract only the requested part \(not the overall interval\) [\#7189](https://github.com/apache/arrow-rs/pull/7189) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([delamarch3](https://github.com/delamarch3)) +- chore: upgrade flatbuffer version to `25.2.10` [\#7134](https://github.com/apache/arrow-rs/pull/7134) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([tisonkun](https://github.com/tisonkun)) +- Add hooks to json encoder to override default encoding or add support for unsupported types [\#7015](https://github.com/apache/arrow-rs/pull/7015) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([adriangb](https://github.com/adriangb)) + +**Implemented enhancements:** + +- Improve the performance of `concat` [\#7357](https://github.com/apache/arrow-rs/issues/7357) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] +- Pushdown predictions to Parquet in-memory row group fetches [\#7348](https://github.com/apache/arrow-rs/issues/7348) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] +- Improve CSV parsing errors: Print the row that makes csv parsing fails [\#7344](https://github.com/apache/arrow-rs/issues/7344) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] +- Support ColumnMetaData `encoding_stats` in Parquet Writing [\#7341](https://github.com/apache/arrow-rs/issues/7341) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] +- Support writing Parquet with modular encryption [\#7327](https://github.com/apache/arrow-rs/issues/7327) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] +- Parquet reader: option to pass INT96 as bytes instead of as Timestamp [\#7220](https://github.com/apache/arrow-rs/issues/7220) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] **Fixed bugs:** -- Round trip encoding of list of fixed list fails when offset is not zero [\#7315](https://github.com/apache/arrow-rs/issues/7315) +- New clippy failures in code base with release of rustc 1.86 [\#7381](https://github.com/apache/arrow-rs/issues/7381) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] +- Fix bug in `ParquetMetaDataReader` and add test of suffix metadata reads with encryption [\#7372](https://github.com/apache/arrow-rs/pull/7372) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([etseidl](https://github.com/etseidl)) + +**Documentation updates:** + +- Improve documentation on `ArrayData::offset` [\#7385](https://github.com/apache/arrow-rs/pull/7385) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([alamb](https://github.com/alamb)) +- Improve documentation for `AsyncFileReader::get_metadata` [\#7380](https://github.com/apache/arrow-rs/pull/7380) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([alamb](https://github.com/alamb)) +- Improve documentation on implementing Parquet predicate pushdown [\#7370](https://github.com/apache/arrow-rs/pull/7370) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([alamb](https://github.com/alamb)) +- Add documentation and examples for pretty printing, make `pretty_format_columns_with_options` pub [\#7346](https://github.com/apache/arrow-rs/pull/7346) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([alamb](https://github.com/alamb)) +- Improve documentation on writing parquet, including multiple threads [\#7321](https://github.com/apache/arrow-rs/pull/7321) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([alamb](https://github.com/alamb)) **Merged pull requests:** +- chore: apply clippy suggestions newly introduced in rust 1.86 [\#7382](https://github.com/apache/arrow-rs/pull/7382) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([westonpace](https://github.com/westonpace)) +- bench: add more {boolean, string, int} benchmarks for concat kernel [\#7376](https://github.com/apache/arrow-rs/pull/7376) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([rluvaton](https://github.com/rluvaton)) +- Add more examples of using Parquet encryption [\#7374](https://github.com/apache/arrow-rs/pull/7374) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([adamreeve](https://github.com/adamreeve)) +- Clean up `ArrowReaderMetadata::load_async` [\#7369](https://github.com/apache/arrow-rs/pull/7369) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([etseidl](https://github.com/etseidl)) +- bump pyo3 for RUSTSEC-2025-0020 [\#7368](https://github.com/apache/arrow-rs/pull/7368) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([onursatici](https://github.com/onursatici)) +- Test int96 Parquet file from Spark [\#7367](https://github.com/apache/arrow-rs/pull/7367) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([mbutrovich](https://github.com/mbutrovich)) +- fix: respect offset/length when converting ArrayData to StructArray [\#7366](https://github.com/apache/arrow-rs/pull/7366) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([westonpace](https://github.com/westonpace)) +- Print row, data present, expected type, and row number in error messages for arrow-csv [\#7361](https://github.com/apache/arrow-rs/pull/7361) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([psiayn](https://github.com/psiayn)) +- Use rust builtins for round\_upto\_multiple\_of\_64 and ceil [\#7358](https://github.com/apache/arrow-rs/pull/7358) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([psvri](https://github.com/psvri)) +- Write parquet PageEncodingStats [\#7354](https://github.com/apache/arrow-rs/pull/7354) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([jhorstmann](https://github.com/jhorstmann)) +- Move `sysinfo` to `dev-dependencies` [\#7353](https://github.com/apache/arrow-rs/pull/7353) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([mbrobbel](https://github.com/mbrobbel)) +- chore\(deps\): update sysinfo requirement from 0.33.0 to 0.34.0 [\#7352](https://github.com/apache/arrow-rs/pull/7352) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([dependabot[bot]](https://github.com/apps/dependabot)) +- Add additional benchmarks for utf8view comparison kernels [\#7351](https://github.com/apache/arrow-rs/pull/7351) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([zhuqi-lucas](https://github.com/zhuqi-lucas)) +- Upgrade to twox-hash 2.0 [\#7347](https://github.com/apache/arrow-rs/pull/7347) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([alamb](https://github.com/alamb)) +- refactor: apply borrowed chunk reader to Sbbf::read\_from\_column\_chunk [\#7345](https://github.com/apache/arrow-rs/pull/7345) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([ethe](https://github.com/ethe)) +- Merge changelog and version from 54.3.1 into main [\#7340](https://github.com/apache/arrow-rs/pull/7340) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([timsaucer](https://github.com/timsaucer)) +- Remove `object-store` label from `.asf.yaml` [\#7339](https://github.com/apache/arrow-rs/pull/7339) ([mbrobbel](https://github.com/mbrobbel)) +- Encapsulate encryption code more in readers [\#7337](https://github.com/apache/arrow-rs/pull/7337) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([alamb](https://github.com/alamb)) +- Bump MSRV to 1.81 [\#7336](https://github.com/apache/arrow-rs/pull/7336) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] [[arrow-flight](https://github.com/apache/arrow-rs/labels/arrow-flight)] ([mbrobbel](https://github.com/mbrobbel)) +- Add an option to show column type [\#7335](https://github.com/apache/arrow-rs/pull/7335) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([blaginin](https://github.com/blaginin)) - Add missing type annotation [\#7326](https://github.com/apache/arrow-rs/pull/7326) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([mbrobbel](https://github.com/mbrobbel)) +- Minor: Improve parallel parquet encoding example [\#7323](https://github.com/apache/arrow-rs/pull/7323) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([alamb](https://github.com/alamb)) +- feat: allow if expressions for fallbacks in downcast macro [\#7322](https://github.com/apache/arrow-rs/pull/7322) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([rluvaton](https://github.com/rluvaton)) +- Minor: rename `ParquetRecordBatchStream::reader` to `ParquetRecordBatchStream::reader_factory` [\#7319](https://github.com/apache/arrow-rs/pull/7319) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([alamb](https://github.com/alamb)) - bugfix: correct offsets when serializing a list of fixed sized list and non-zero start offset [\#7318](https://github.com/apache/arrow-rs/pull/7318) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([timsaucer](https://github.com/timsaucer)) +- Remove object\_store references in Readme.md [\#7317](https://github.com/apache/arrow-rs/pull/7317) ([alamb](https://github.com/alamb)) +- Adopt MSRV policy [\#7314](https://github.com/apache/arrow-rs/pull/7314) ([psvri](https://github.com/psvri)) +- fix: correct array length validation error message [\#7313](https://github.com/apache/arrow-rs/pull/7313) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([wkalt](https://github.com/wkalt)) +- chore: remove trailing space in debug print [\#7311](https://github.com/apache/arrow-rs/pull/7311) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([xxchan](https://github.com/xxchan)) +- Improve `concat` performance, and add `append_array` for some array builder implementations [\#7309](https://github.com/apache/arrow-rs/pull/7309) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([rluvaton](https://github.com/rluvaton)) +- feat: add `append_buffer` for `NullBufferBuilder` [\#7308](https://github.com/apache/arrow-rs/pull/7308) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([rluvaton](https://github.com/rluvaton)) +- MINOR: fix incorrect method name in deprecate node [\#7306](https://github.com/apache/arrow-rs/pull/7306) [[arrow](https://github.com/apache/arrow-rs/labels/arrow)] ([waynexia](https://github.com/waynexia)) +- Allow retrieving Parquet decryption keys using the key metadata [\#7286](https://github.com/apache/arrow-rs/pull/7286) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([adamreeve](https://github.com/adamreeve)) +- Support different TimeUnits and timezones when reading Timestamps from INT96 [\#7285](https://github.com/apache/arrow-rs/pull/7285) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([mbutrovich](https://github.com/mbutrovich)) +- Add Parquet Modular encryption support \(write\) [\#7111](https://github.com/apache/arrow-rs/pull/7111) [[parquet](https://github.com/apache/arrow-rs/labels/parquet)] ([rok](https://github.com/rok)) diff --git a/Cargo.toml b/Cargo.toml index 5ae05b3add07..7e7cae206a3f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ exclude = [ ] [workspace.package] -version = "54.3.1" +version = "55.0.0" homepage = "https://github.com/apache/arrow-rs" repository = "https://github.com/apache/arrow-rs" authors = ["Apache Arrow "] @@ -77,21 +77,21 @@ edition = "2021" rust-version = "1.81" [workspace.dependencies] -arrow = { version = "54.3.1", path = "./arrow", default-features = false } -arrow-arith = { version = "54.3.1", path = "./arrow-arith" } -arrow-array = { version = "54.3.1", path = "./arrow-array" } -arrow-buffer = { version = "54.3.1", path = "./arrow-buffer" } -arrow-cast = { version = "54.3.1", path = "./arrow-cast" } -arrow-csv = { version = "54.3.1", path = "./arrow-csv" } -arrow-data = { version = "54.3.1", path = "./arrow-data" } -arrow-ipc = { version = "54.3.1", path = "./arrow-ipc" } -arrow-json = { version = "54.3.1", path = "./arrow-json" } -arrow-ord = { version = "54.3.1", path = "./arrow-ord" } -arrow-row = { version = "54.3.1", path = "./arrow-row" } -arrow-schema = { version = "54.3.1", path = "./arrow-schema" } -arrow-select = { version = "54.3.1", path = "./arrow-select" } -arrow-string = { version = "54.3.1", path = "./arrow-string" } -parquet = { version = "54.3.1", path = "./parquet", default-features = false } +arrow = { version = "55.0.0", path = "./arrow", default-features = false } +arrow-arith = { version = "55.0.0", path = "./arrow-arith" } +arrow-array = { version = "55.0.0", path = "./arrow-array" } +arrow-buffer = { version = "55.0.0", path = "./arrow-buffer" } +arrow-cast = { version = "55.0.0", path = "./arrow-cast" } +arrow-csv = { version = "55.0.0", path = "./arrow-csv" } +arrow-data = { version = "55.0.0", path = "./arrow-data" } +arrow-ipc = { version = "55.0.0", path = "./arrow-ipc" } +arrow-json = { version = "55.0.0", path = "./arrow-json" } +arrow-ord = { version = "55.0.0", path = "./arrow-ord" } +arrow-row = { version = "55.0.0", path = "./arrow-row" } +arrow-schema = { version = "55.0.0", path = "./arrow-schema" } +arrow-select = { version = "55.0.0", path = "./arrow-select" } +arrow-string = { version = "55.0.0", path = "./arrow-string" } +parquet = { version = "55.0.0", path = "./parquet", default-features = false } chrono = { version = "0.4.40", default-features = false, features = ["clock"] } diff --git a/README.md b/README.md index 00a12a7da420..6140f9e902ea 100644 --- a/README.md +++ b/README.md @@ -63,12 +63,17 @@ is described in the [contributing] guide. Planned Release Schedule -| Approximate Date | Version | Notes | -| ---------------- | -------- | --------------------------------------- | -| Mar 2025 | `54.2.0` | Minor, NO breaking API changes | -| Apr 2025 | `55.0.0` | Major, potentially breaking API changes | -| May 2025 | `55.1.0` | Minor, NO breaking API changes | - +| Approximate Date | Version | Notes | +| ---------------- | ---------- | --------------------------------------- | +| Apr 2025 | [`55.0.0`] | Major, potentially breaking API changes | +| May 2025 | [`55.1.0`] | Minor, NO breaking API changes | +| June 2025 | [`55.2.0`] | Minor, NO breaking API changes | +| July 2025 | [`56.0.0`] | Major, potentially breaking API changes | + +[`55.0.0`]: https://github.com/apache/arrow-rs/issues/7084 +[`55.1.0`]: https://github.com/apache/arrow-rs/issues/7393 +[`55.2.0`]: https://github.com/apache/arrow-rs/issues/7394 +[`56.0.0`]: https://github.com/apache/arrow-rs/issues/7395 [ticket #5368]: https://github.com/apache/arrow-rs/issues/5368 [semantic versioning]: https://semver.org/ diff --git a/dev/release/update_change_log.sh b/dev/release/update_change_log.sh index a8d68fbf005e..346a828c4c35 100755 --- a/dev/release/update_change_log.sh +++ b/dev/release/update_change_log.sh @@ -29,8 +29,8 @@ set -e -SINCE_TAG="54.3.0" -FUTURE_RELEASE="54.3.1" +SINCE_TAG="54.3.1" +FUTURE_RELEASE="55.0.0" SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" SOURCE_TOP_DIR="$(cd "${SOURCE_DIR}/../../" && pwd)" diff --git a/parquet/examples/external_metadata.rs b/parquet/examples/external_metadata.rs index 5ca322ee2890..2c3250782c0f 100644 --- a/parquet/examples/external_metadata.rs +++ b/parquet/examples/external_metadata.rs @@ -112,7 +112,7 @@ async fn get_metadata_from_remote_parquet_file( // tell the reader to read the page index ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(remote_file, file_size as usize) + .load_and_finish(remote_file, file_size) .await .unwrap() } diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 52b3d112274d..5d1ff0770f9e 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -166,9 +166,7 @@ impl InMemoryRowGroup { for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); - let data = reader - .get_bytes(start as usize..(start + len) as usize) - .await?; + let data = reader.get_bytes(start..(start + len)).await?; vs[leaf_idx] = Some(Arc::new(ColumnChunkData { offset: start as usize, diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index ffcf39df0e23..c53d47be2e56 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -162,8 +162,8 @@ impl RowSelection { /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) - pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { - let mut ranges = vec![]; + pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { + let mut ranges: Vec> = vec![]; let mut row_offset = 0; let mut pages = page_locations.iter().peekable(); @@ -175,8 +175,8 @@ impl RowSelection { while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { if !(selector.skip || current_page_included) { - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; ranges.push(start..end); current_page_included = true; } @@ -200,8 +200,8 @@ impl RowSelection { } } else { if !(selector.skip || current_page_included) { - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; ranges.push(start..end); } current_selector = selectors.next() diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index ff183f418538..e0f7bdbbe902 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -48,12 +48,13 @@ use std::ops::Range; /// file: tokio::fs::File, /// } /// impl MetadataFetch for TokioFileMetadata { -/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { +/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { /// // return a future that fetches data in range /// async move { -/// let mut buf = vec![0; range.len()]; // target buffer +/// let len = (range.end - range.start).try_into().unwrap(); +/// let mut buf = vec![0; len]; // target buffer /// // seek to the start of the range and read the data -/// self.file.seek(SeekFrom::Start(range.start as u64)).await?; +/// self.file.seek(SeekFrom::Start(range.start)).await?; /// self.file.read_exact(&mut buf).await?; /// Ok(Bytes::from(buf)) // convert to Bytes /// } @@ -66,11 +67,11 @@ pub trait MetadataFetch { /// /// Note the returned type is a boxed future, often created by /// [FutureExt::boxed]. See the trait documentation for an example - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; } impl MetadataFetch for &mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { self.get_bytes(range) } } @@ -117,7 +118,7 @@ impl MetadataLoader { file_size - FOOTER_SIZE }; - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?; let suffix_len = suffix.len(); let mut footer = [0; FOOTER_SIZE]; @@ -137,7 +138,9 @@ impl MetadataLoader { // Did not fetch the entire file metadata in the initial read, need to make a second request let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let meta = fetch + .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64) + .await?; (ParquetMetaDataReader::decode_metadata(&meta)?, None) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; @@ -187,16 +190,18 @@ impl MetadataLoader { }; let data = match &self.remainder { - Some((remainder_start, remainder)) if *remainder_start <= range.start => { - let offset = range.start - *remainder_start; - remainder.slice(offset..range.end - *remainder_start + offset) + Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => { + let remainder_start = *remainder_start as u64; + let range_start = usize::try_from(range.start - remainder_start)?; + let range_end = usize::try_from(range.end - remainder_start)?; + remainder.slice(range_start..range_end) } // Note: this will potentially fetch data already in remainder, this keeps things simple _ => self.fetch.fetch(range.start..range.end).await?, }; // Sanity check - assert_eq!(data.len(), range.end - range.start); + assert_eq!(data.len(), (range.end - range.start) as usize); let offset = range.start; if column_index { @@ -208,10 +213,11 @@ impl MetadataLoader { x.columns() .iter() .map(|c| match c.column_index_range() { - Some(r) => decode_column_index( - &data[r.start - offset..r.end - offset], - c.column_type(), - ), + Some(r) => { + let r_start = usize::try_from(r.start - offset)?; + let r_end = usize::try_from(r.end - offset)?; + decode_column_index(&data[r_start..r_end], c.column_type()) + } None => Ok(Index::NONE), }) .collect::>>() @@ -230,7 +236,11 @@ impl MetadataLoader { x.columns() .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]), + Some(r) => { + let r_start = usize::try_from(r.start - offset)?; + let r_end = usize::try_from(r.end - offset)?; + decode_offset_index(&data[r_start..r_end]) + } None => Err(general_err!("missing offset index")), }) .collect::>>() @@ -256,8 +266,8 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { self.0(range).await }.boxed() + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { self.0(range.start.try_into()?..range.end.try_into()?).await }.boxed() } } @@ -287,6 +297,7 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { + let file_size = u64::try_from(file_size)?; let fetch = MetadataFetchFn(fetch); ParquetMetaDataReader::new() .with_prefetch_hint(prefetch) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index cbbb6c415086..fc33608a4c21 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -80,10 +80,10 @@ pub use store::*; /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html pub trait AsyncFileReader: Send { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { async move { let mut result = Vec::with_capacity(ranges.len()); @@ -121,11 +121,11 @@ pub trait AsyncFileReader: Send { /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { self.as_mut().get_bytes(range) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { self.as_mut().get_byte_ranges(ranges) } @@ -150,14 +150,14 @@ impl Metadat } impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { - self.seek(SeekFrom::Start(range.start as u64)).await?; + self.seek(SeekFrom::Start(range.start)).await?; let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read); - let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; - if read != to_read { + let mut buffer = Vec::with_capacity(to_read.try_into()?); + let read = self.take(to_read).read_to_end(&mut buffer).await?; + if read as u64 != to_read { return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); } @@ -424,7 +424,7 @@ impl ParquetRecordBatchStreamBuilder { let metadata = self.metadata.row_group(row_group_idx); let column_metadata = metadata.column(column_idx); - let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() { + let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() { offset .try_into() .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))? @@ -433,16 +433,16 @@ impl ParquetRecordBatchStreamBuilder { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => self.input.0.get_bytes(offset..offset + length as usize), + Some(length) => self.input.0.get_bytes(offset..offset + length as u64), None => self .input .0 - .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE), + .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64), } .await?; let (header, bitset_offset) = - chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?; + chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { @@ -461,14 +461,17 @@ impl ParquetRecordBatchStreamBuilder { } let bitset = match column_metadata.bloom_filter_length() { - Some(_) => buffer.slice((bitset_offset as usize - offset)..), + Some(_) => buffer.slice( + (TryInto::::try_into(bitset_offset).unwrap() + - TryInto::::try_into(offset).unwrap()).., + ), None => { - let bitset_length: usize = header.num_bytes.try_into().map_err(|_| { + let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; self.input .0 - .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length) + .get_bytes(bitset_offset..bitset_offset + bitset_length) .await? } }; @@ -880,7 +883,7 @@ impl InMemoryRowGroup<'_> { if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` - let mut page_start_offsets: Vec> = vec![]; + let mut page_start_offsets: Vec> = vec![]; let fetch_ranges = self .column_chunks @@ -893,11 +896,11 @@ impl InMemoryRowGroup<'_> { .flat_map(|(idx, (_chunk, chunk_meta))| { // If the first page does not start at the beginning of the column, // then we need to also fetch a dictionary page. - let mut ranges = vec![]; + let mut ranges: Vec> = vec![]; let (start, _len) = chunk_meta.byte_range(); match offset_index[idx].page_locations.first() { Some(first) if first.offset as u64 != start => { - ranges.push(start as usize..first.offset as usize); + ranges.push(start..first.offset as u64); } _ => (), } @@ -925,7 +928,11 @@ impl InMemoryRowGroup<'_> { *chunk = Some(Arc::new(ColumnChunkData::Sparse { length: metadata.column(idx).byte_range().1 as usize, - data: offsets.into_iter().zip(chunks.into_iter()).collect(), + data: offsets + .into_iter() + .map(|x| x as usize) + .zip(chunks.into_iter()) + .collect(), })) } } @@ -938,7 +945,7 @@ impl InMemoryRowGroup<'_> { .map(|(idx, _chunk)| { let column = metadata.column(idx); let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + start..(start + length) }) .collect(); @@ -1108,9 +1115,16 @@ mod tests { } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.requests.lock().unwrap().push(range.clone()); - futures::future::ready(Ok(self.data.slice(range))).boxed() + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + let range = range.clone(); + self.requests + .lock() + .unwrap() + .push(range.start as usize..range.end as usize); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() } fn get_metadata<'a>( @@ -2238,7 +2252,7 @@ mod tests { let file_size = file.metadata().await.unwrap().len(); let mut metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(&mut file, file_size as usize) + .load_and_finish(&mut file, file_size) .await .unwrap(); @@ -2263,7 +2277,7 @@ mod tests { let file_size = file.metadata().await.unwrap().len(); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(&mut file, file_size as usize) + .load_and_finish(&mut file, file_size) .await .unwrap(); @@ -2309,7 +2323,7 @@ mod tests { let file_size = file.metadata().await.unwrap().len(); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(&mut file, file_size as usize) + .load_and_finish(&mut file, file_size) .await .unwrap(); diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index d5595a83be6e..8eaf7183e822 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -46,7 +46,7 @@ use tokio::runtime::Handle; /// println!("Found Blob with {}B at {}", meta.size, meta.location); /// /// // Show Parquet metadata -/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size.try_into().unwrap()); +/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size); /// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); /// print_parquet_metadata(&mut stdout(), builder.metadata()); /// # } @@ -55,7 +55,7 @@ use tokio::runtime::Handle; pub struct ParquetObjectReader { store: Arc, path: Path, - file_size: Option, + file_size: Option, metadata_size_hint: Option, preload_column_index: bool, preload_offset_index: bool, @@ -94,7 +94,7 @@ impl ParquetObjectReader { /// underlying store does not support suffix range requests. /// /// The file size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]. - pub fn with_file_size(self, file_size: usize) -> Self { + pub fn with_file_size(self, file_size: u64) -> Self { Self { file_size: Some(file_size), ..self @@ -177,19 +177,14 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader { } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - let range = range.start as u64..range.end as u64; + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { self.spawn(|store, path| store.get_range(path, range)) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - let ranges = ranges - .into_iter() - .map(|range| range.start as u64..range.end as u64) - .collect::>(); self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) } @@ -259,8 +254,8 @@ mod tests { #[tokio::test] async fn test_simple() { let (meta, store) = get_meta_store().await; - let object_reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()); + let object_reader = + ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await @@ -290,8 +285,8 @@ mod tests { let (mut meta, store) = get_meta_store().await; meta.location = Path::from("I don't exist.parquet"); - let object_reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()); + let object_reader = + ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"), @@ -325,7 +320,7 @@ mod tests { let initial_actions = num_actions.load(Ordering::Relaxed); let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()) + .with_file_size(meta.size) .with_runtime(rt.handle().clone()); let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); @@ -353,7 +348,7 @@ mod tests { let (meta, store) = get_meta_store().await; let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()) + .with_file_size(meta.size) .with_runtime(rt.handle().clone()); let current_id = std::thread::current().id(); @@ -378,7 +373,7 @@ mod tests { let (meta, store) = get_meta_store().await; let mut reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()) + .with_file_size(meta.size) .with_runtime(rt.handle().clone()); rt.shutdown_background(); diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 233a55778721..ea9976f3a746 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -1103,9 +1103,9 @@ impl ColumnChunkMetaData { } /// Returns the range for the offset index if any - pub(crate) fn column_index_range(&self) -> Option> { - let offset = usize::try_from(self.column_index_offset?).ok()?; - let length = usize::try_from(self.column_index_length?).ok()?; + pub(crate) fn column_index_range(&self) -> Option> { + let offset = u64::try_from(self.column_index_offset?).ok()?; + let length = u64::try_from(self.column_index_length?).ok()?; Some(offset..(offset + length)) } @@ -1120,9 +1120,9 @@ impl ColumnChunkMetaData { } /// Returns the range for the offset index if any - pub(crate) fn offset_index_range(&self) -> Option> { - let offset = usize::try_from(self.offset_index_offset?).ok()?; - let length = usize::try_from(self.offset_index_length?).ok()?; + pub(crate) fn offset_index_range(&self) -> Option> { + let offset = u64::try_from(self.offset_index_offset?).ok()?; + let length = u64::try_from(self.offset_index_length?).ok()?; Some(offset..(offset + length)) } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 7203c3a00522..aebf1a890621 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -209,7 +209,7 @@ impl ParquetMetaDataReader { /// the request, and must include the Parquet footer. If page indexes are desired, the buffer /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. pub fn try_parse(&mut self, reader: &R) -> Result<()> { - self.try_parse_sized(reader, reader.len() as usize) + self.try_parse_sized(reader, reader.len()) } /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader` @@ -232,10 +232,10 @@ impl ParquetMetaDataReader { /// # use parquet::file::metadata::ParquetMetaDataReader; /// # use parquet::errors::ParquetError; /// # use crate::parquet::file::reader::Length; - /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } + /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } /// let file = open_parquet_file("some_path.parquet"); - /// let len = file.len() as usize; + /// let len = file.len(); /// // Speculatively read 1 kilobyte from the end of the file /// let bytes = get_bytes(&file, len - 1024..len); /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); @@ -243,7 +243,7 @@ impl ParquetMetaDataReader { /// Ok(_) => (), /// Err(ParquetError::NeedMoreData(needed)) => { /// // Read the needed number of bytes from the end of the file - /// let bytes = get_bytes(&file, len - needed..len); + /// let bytes = get_bytes(&file, len - needed as u64..len); /// reader.try_parse_sized(&bytes, len).unwrap(); /// } /// _ => panic!("unexpected error") @@ -259,10 +259,10 @@ impl ParquetMetaDataReader { /// # use parquet::file::metadata::ParquetMetaDataReader; /// # use parquet::errors::ParquetError; /// # use crate::parquet::file::reader::Length; - /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } + /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } /// let file = open_parquet_file("some_path.parquet"); - /// let len = file.len() as usize; + /// let len = file.len(); /// // Speculatively read 1 kilobyte from the end of the file /// let mut bytes = get_bytes(&file, len - 1024..len); /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); @@ -272,7 +272,7 @@ impl ParquetMetaDataReader { /// Ok(_) => break, /// Err(ParquetError::NeedMoreData(needed)) => { /// // Read the needed number of bytes from the end of the file - /// bytes = get_bytes(&file, len - needed..len); + /// bytes = get_bytes(&file, len - needed as u64..len); /// // If file metadata was read only read page indexes, otherwise continue loop /// if reader.has_metadata() { /// reader.read_page_indexes_sized(&bytes, len); @@ -284,13 +284,13 @@ impl ParquetMetaDataReader { /// } /// let metadata = reader.finish().unwrap(); /// ``` - pub fn try_parse_sized(&mut self, reader: &R, file_size: usize) -> Result<()> { + pub fn try_parse_sized(&mut self, reader: &R, file_size: u64) -> Result<()> { self.metadata = match self.parse_metadata(reader) { Ok(metadata) => Some(metadata), Err(ParquetError::NeedMoreData(needed)) => { // If reader is the same length as `file_size` then presumably there is no more to // read, so return an EOF error. - if file_size == reader.len() as usize || needed > file_size { + if file_size == reader.len() || needed as u64 > file_size { return Err(eof_err!( "Parquet file too small. Size is {} but need {}", file_size, @@ -315,7 +315,7 @@ impl ParquetMetaDataReader { /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. pub fn read_page_indexes(&mut self, reader: &R) -> Result<()> { - self.read_page_indexes_sized(reader, reader.len() as usize) + self.read_page_indexes_sized(reader, reader.len()) } /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. @@ -326,7 +326,7 @@ impl ParquetMetaDataReader { pub fn read_page_indexes_sized( &mut self, reader: &R, - file_size: usize, + file_size: u64, ) -> Result<()> { if self.metadata.is_none() { return Err(general_err!( @@ -350,7 +350,7 @@ impl ParquetMetaDataReader { // Check to see if needed range is within `file_range`. Checking `range.end` seems // redundant, but it guards against `range_for_page_index()` returning garbage. - let file_range = file_size.saturating_sub(reader.len() as usize)..file_size; + let file_range = file_size.saturating_sub(reader.len())..file_size; if !(file_range.contains(&range.start) && file_range.contains(&range.end)) { // Requested range starts beyond EOF if range.end > file_size { @@ -360,14 +360,16 @@ impl ParquetMetaDataReader { )); } else { // Ask for a larger buffer - return Err(ParquetError::NeedMoreData(file_size - range.start)); + return Err(ParquetError::NeedMoreData( + (file_size - range.start).try_into()?, + )); } } // Perform extra sanity check to make sure `range` and the footer metadata don't // overlap. if let Some(metadata_size) = self.metadata_size { - let metadata_range = file_size.saturating_sub(metadata_size)..file_size; + let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size; if range.end > metadata_range.start { return Err(eof_err!( "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}", @@ -377,8 +379,8 @@ impl ParquetMetaDataReader { } } - let bytes_needed = range.end - range.start; - let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?; + let bytes_needed = usize::try_from(range.end - range.start)?; + let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?; let offset = range.start; self.parse_column_index(&bytes, offset)?; @@ -397,7 +399,7 @@ impl ParquetMetaDataReader { pub async fn load_and_finish( mut self, fetch: F, - file_size: usize, + file_size: u64, ) -> Result { self.try_load(fetch, file_size).await?; self.finish() @@ -423,11 +425,7 @@ impl ParquetMetaDataReader { /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches /// performed by this function. #[cfg(all(feature = "async", feature = "arrow"))] - pub async fn try_load( - &mut self, - mut fetch: F, - file_size: usize, - ) -> Result<()> { + pub async fn try_load(&mut self, mut fetch: F, file_size: u64) -> Result<()> { let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?; self.metadata = Some(metadata); @@ -487,9 +485,10 @@ impl ParquetMetaDataReader { }; let bytes = match &remainder { - Some((remainder_start, remainder)) if *remainder_start <= range.start => { - let offset = range.start - *remainder_start; - let end = offset + range.end - range.start; + Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => { + let remainder_start = *remainder_start as u64; + let offset = usize::try_from(range.start - remainder_start)?; + let end = usize::try_from(range.end - remainder_start)?; assert!(end <= remainder.len()); remainder.slice(offset..end) } @@ -498,16 +497,15 @@ impl ParquetMetaDataReader { }; // Sanity check - assert_eq!(bytes.len(), range.end - range.start); - let offset = range.start; + assert_eq!(bytes.len() as u64, range.end - range.start); - self.parse_column_index(&bytes, offset)?; - self.parse_offset_index(&bytes, offset)?; + self.parse_column_index(&bytes, range.start)?; + self.parse_offset_index(&bytes, range.start)?; Ok(()) } - fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { let metadata = self.metadata.as_mut().unwrap(); if self.column_index { let index = metadata @@ -517,10 +515,11 @@ impl ParquetMetaDataReader { x.columns() .iter() .map(|c| match c.column_index_range() { - Some(r) => decode_column_index( - &bytes[r.start - start_offset..r.end - start_offset], - c.column_type(), - ), + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + decode_column_index(&bytes[r_start..r_end], c.column_type()) + } None => Ok(Index::NONE), }) .collect::>>() @@ -531,7 +530,7 @@ impl ParquetMetaDataReader { Ok(()) } - fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { let metadata = self.metadata.as_mut().unwrap(); if self.offset_index { let index = metadata @@ -541,9 +540,11 @@ impl ParquetMetaDataReader { x.columns() .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_offset_index( - &bytes[r.start - start_offset..r.end - start_offset], - ), + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + decode_offset_index(&bytes[r_start..r_end]) + } None => Err(general_err!("missing offset index")), }) .collect::>>() @@ -555,7 +556,7 @@ impl ParquetMetaDataReader { Ok(()) } - fn range_for_page_index(&self) -> Option> { + fn range_for_page_index(&self) -> Option> { // sanity check self.metadata.as_ref()?; @@ -592,7 +593,7 @@ impl ParquetMetaDataReader { let footer_metadata_len = FOOTER_SIZE + metadata_len; self.metadata_size = Some(footer_metadata_len); - if footer_metadata_len > file_size as usize { + if footer_metadata_len as u64 > file_size { return Err(ParquetError::NeedMoreData(footer_metadata_len)); } @@ -620,11 +621,11 @@ impl ParquetMetaDataReader { async fn load_metadata( &self, fetch: &mut F, - file_size: usize, + file_size: u64, ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { - let prefetch = self.get_prefetch_size(); + let prefetch = self.get_prefetch_size() as u64; - if file_size < FOOTER_SIZE { + if file_size < FOOTER_SIZE as u64 { return Err(eof_err!("file size of {} is less than footer", file_size)); } @@ -635,7 +636,9 @@ impl ParquetMetaDataReader { let suffix = fetch.fetch(footer_start..file_size).await?; let suffix_len = suffix.len(); - let fetch_len = file_size - footer_start; + let fetch_len = (file_size - footer_start) + .try_into() + .expect("footer size should never be larger than u32"); if suffix_len < fetch_len { return Err(eof_err!( "metadata requires {} bytes, but could only read {}", @@ -650,7 +653,7 @@ impl ParquetMetaDataReader { let footer = Self::decode_footer_tail(&footer)?; let length = footer.metadata_length(); - if file_size < length + FOOTER_SIZE { + if file_size < (length + FOOTER_SIZE) as u64 { return Err(eof_err!( "file size of {} is less than footer + metadata {}", file_size, @@ -660,15 +663,19 @@ impl ParquetMetaDataReader { // Did not fetch the entire file metadata in the initial read, need to make a second request if length > suffix_len - FOOTER_SIZE { - let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let metadata_start = file_size - (length + FOOTER_SIZE) as u64; + let meta = fetch + .fetch(metadata_start..(file_size - FOOTER_SIZE as u64)) + .await?; Ok((self.decode_footer_metadata(&meta, &footer)?, None)) } else { - let metadata_start = file_size - length - FOOTER_SIZE - footer_start; + let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start) + .try_into() + .expect("metadata length should never be larger than u32"); let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; Ok(( self.decode_footer_metadata(slice, &footer)?, - Some((footer_start, suffix.slice(..metadata_start))), + Some((footer_start as usize, suffix.slice(..metadata_start))), )) } } @@ -680,7 +687,7 @@ impl ParquetMetaDataReader { ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { let prefetch = self.get_prefetch_size(); - let suffix = fetch.fetch_suffix(prefetch).await?; + let suffix = fetch.fetch_suffix(prefetch as _).await?; let suffix_len = suffix.len(); if suffix_len < FOOTER_SIZE { @@ -1089,12 +1096,12 @@ mod tests { #[test] fn test_try_parse() { let file = get_test_file("alltypes_tiny_pages.parquet"); - let len = file.len() as usize; + let len = file.len(); let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); - let bytes_for_range = |range: Range| { - file.get_bytes(range.start as u64, range.end - range.start) + let bytes_for_range = |range: Range| { + file.get_bytes(range.start, (range.end - range.start).try_into().unwrap()) .unwrap() }; @@ -1125,7 +1132,7 @@ mod tests { match reader.try_parse_sized(&bytes, len).unwrap_err() { // expected error, try again with provided bounds ParquetError::NeedMoreData(needed) => { - let bytes = bytes_for_range(len - needed..len); + let bytes = bytes_for_range(len - needed as u64..len); reader.try_parse_sized(&bytes, len).unwrap(); let metadata = reader.finish().unwrap(); assert!(metadata.column_index.is_some()); @@ -1141,7 +1148,7 @@ mod tests { match reader.try_parse_sized(&bytes, len) { Ok(_) => break, Err(ParquetError::NeedMoreData(needed)) => { - bytes = bytes_for_range(len - needed..len); + bytes = bytes_for_range(len - needed as u64..len); if reader.has_metadata() { reader.read_page_indexes_sized(&bytes, len).unwrap(); break; @@ -1169,7 +1176,7 @@ mod tests { match reader.try_parse_sized(&bytes, len).unwrap_err() { // expected error, try again with provided bounds ParquetError::NeedMoreData(needed) => { - let bytes = bytes_for_range(len - needed..len); + let bytes = bytes_for_range(len - needed as u64..len); reader.try_parse_sized(&bytes, len).unwrap(); reader.finish().unwrap(); } @@ -1220,10 +1227,10 @@ mod async_tests { impl MetadataFetch for MetadataFetchFn where - F: FnMut(Range) -> Fut + Send, + F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } @@ -1232,18 +1239,18 @@ mod async_tests { impl MetadataFetch for MetadataSuffixFetchFn where - F1: FnMut(Range) -> Fut + Send, + F1: FnMut(Range) -> Fut + Send, Fut: Future> + Send, F2: Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } impl MetadataSuffixFetch for MetadataSuffixFetchFn where - F1: FnMut(Range) -> Fut + Send, + F1: FnMut(Range) -> Fut + Send, F2: FnMut(usize) -> Fut + Send, Fut: Future> + Send, { @@ -1252,10 +1259,10 @@ mod async_tests { } } - fn read_range(file: &mut File, range: Range) -> Result { + fn read_range(file: &mut File, range: Range) -> Result { file.seek(SeekFrom::Start(range.start as _))?; let len = range.end - range.start; - let mut buf = Vec::with_capacity(len); + let mut buf = Vec::with_capacity(len.try_into().unwrap()); file.take(len as _).read_to_end(&mut buf)?; Ok(buf.into()) } @@ -1263,7 +1270,7 @@ mod async_tests { fn read_suffix(file: &mut File, suffix: usize) -> Result { let file_len = file.len(); // Don't seek before beginning of file - file.seek(SeekFrom::End(0 - suffix.min(file_len as usize) as i64))?; + file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?; let mut buf = Vec::with_capacity(suffix); file.take(suffix as _).read_to_end(&mut buf)?; Ok(buf.into()) @@ -1272,7 +1279,7 @@ mod async_tests { #[tokio::test] async fn test_simple() { let mut file = get_test_file("nulls.snappy.parquet"); - let len = file.len() as usize; + let len = file.len(); let expected = ParquetMetaDataReader::new() .parse_and_finish(&file) @@ -1466,7 +1473,7 @@ mod async_tests { #[tokio::test] async fn test_page_index() { let mut file = get_test_file("alltypes_tiny_pages.parquet"); - let len = file.len() as usize; + let len = file.len(); let fetch_count = AtomicUsize::new(0); let mut fetch = |range| { fetch_count.fetch_add(1, Ordering::SeqCst); @@ -1519,7 +1526,7 @@ mod async_tests { let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .with_prefetch_hint(Some(len - 1000)) // prefetch entire file + .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file .load_and_finish(f, len) .await .unwrap(); @@ -1531,7 +1538,7 @@ mod async_tests { let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .with_prefetch_hint(Some(len)) // prefetch entire file + .with_prefetch_hint(Some(len as usize)) // prefetch entire file .load_and_finish(f, len) .await .unwrap(); @@ -1543,7 +1550,7 @@ mod async_tests { let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .with_prefetch_hint(Some(len + 1000)) // prefetch entire file + .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file .load_and_finish(f, len) .await .unwrap(); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index fd3639ac3069..c472ceb29128 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -31,7 +31,7 @@ use std::ops::Range; /// Computes the covering range of two optional ranges /// /// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)` -pub(crate) fn acc_range(a: Option>, b: Option>) -> Option> { +pub(crate) fn acc_range(a: Option>, b: Option>) -> Option> { match (a, b) { (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)), (None, x) | (x, None) => x, @@ -61,14 +61,17 @@ pub fn read_columns_indexes( None => return Ok(None), }; - let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; - let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?; Some( chunks .iter() .map(|c| match c.column_index_range() { - Some(r) => decode_column_index(get(r), c.column_type()), + Some(r) => decode_column_index( + &bytes[usize::try_from(r.start - fetch.start)? + ..usize::try_from(r.end - fetch.start)?], + c.column_type(), + ), None => Ok(Index::NONE), }) .collect(), @@ -101,13 +104,15 @@ pub fn read_pages_locations( None => return Ok(vec![]), }; - let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; - let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?; chunks .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_page_locations(get(r)), + Some(r) => decode_page_locations( + &bytes[usize::try_from(r.start - fetch.start)? + ..usize::try_from(r.end - fetch.start)?], + ), None => Err(general_err!("missing offset index")), }) .collect() @@ -136,14 +141,16 @@ pub fn read_offset_indexes( None => return Ok(None), }; - let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; - let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?; Some( chunks .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_offset_index(get(r)), + Some(r) => decode_offset_index( + &bytes[usize::try_from(r.start - fetch.start)? + ..usize::try_from(r.end - fetch.start)?], + ), None => Err(general_err!("missing offset index")), }) .collect(), diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index 7de5d7e346d6..b427bd4302e2 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -156,7 +156,7 @@ async fn bad_metadata_err() { let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin")); - let metadata_length = metadata_buffer.len(); + let metadata_length = metadata_buffer.len() as u64; let mut reader = std::io::Cursor::new(&metadata_buffer); let mut loader = ParquetMetaDataReader::new(); diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 9deadece9544..11448207c6fc 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -310,8 +310,7 @@ async fn test_read_encrypted_file_from_object_store() { .unwrap(); let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); - let mut reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()); + let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); let metadata = reader.get_metadata(Some(&options)).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await