From 11aa2cb1d6e63b7e550ce5bf7df564d893c0455c Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Sat, 8 Mar 2025 15:44:34 +0530 Subject: [PATCH 1/3] Change AsyncFileReader trait for u64 Signed-off-by: Arpit Bandejiya --- parquet/examples/read_with_rowgroup.rs | 2 +- parquet/src/arrow/arrow_reader/selection.rs | 10 ++--- parquet/src/arrow/async_reader/metadata.rs | 2 +- parquet/src/arrow/async_reader/mod.rs | 49 +++++++++++---------- parquet/src/arrow/async_reader/store.rs | 15 +++++-- 5 files changed, 43 insertions(+), 35 deletions(-) diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 8cccc7fe14ac..23dd0d92ea43 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -167,7 +167,7 @@ impl InMemoryRowGroup { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); let data = reader - .get_bytes(start as usize..(start + len) as usize) + .get_bytes(start..(start + len)) .await?; vs[leaf_idx] = Some(Arc::new(ColumnChunkData { diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 378d2253f19a..dd4032bcda37 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -162,8 +162,8 @@ impl RowSelection { /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) - pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { - let mut ranges = vec![]; + pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { + let mut ranges:Vec> = vec![]; let mut row_offset = 0; let mut pages = page_locations.iter().peekable(); @@ -175,8 +175,8 @@ impl RowSelection { while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { if !(selector.skip || current_page_included) { - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; ranges.push(start..end); current_page_included = true; } @@ -202,7 +202,7 @@ impl RowSelection { if !(selector.skip || current_page_included) { let start = page.offset as usize; let end = start + page.compressed_page_size as usize; - ranges.push(start..end); + ranges.push(start as u64..end as u64); } current_selector = selectors.next() } diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 71d2e57ddd50..332f7c67bf4a 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -71,7 +71,7 @@ pub trait MetadataFetch { impl MetadataFetch for &mut T { fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.get_bytes(range) + self.get_bytes(range.start as u64..range.end as u64) } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index fd49ad22934d..da576f4d7db9 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -86,10 +86,10 @@ pub use store::*; /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html pub trait AsyncFileReader: Send { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { async move { let mut result = Vec::with_capacity(ranges.len()); @@ -124,11 +124,11 @@ pub trait AsyncFileReader: Send { /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { self.as_mut().get_bytes(range) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { self.as_mut().get_byte_ranges(ranges) } @@ -145,14 +145,14 @@ impl AsyncFileReader for Box { } impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { - self.seek(SeekFrom::Start(range.start as u64)).await?; + self.seek(SeekFrom::Start(range.start)).await?; let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read); - let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; - if read != to_read { + let mut buffer = Vec::with_capacity(to_read as usize); + let read = self.take(to_read).read_to_end(&mut buffer).await?; + if read != to_read as usize { return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); } @@ -482,7 +482,7 @@ impl ParquetRecordBatchStreamBuilder { let metadata = self.metadata.row_group(row_group_idx); let column_metadata = metadata.column(column_idx); - let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() { + let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() { offset .try_into() .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))? @@ -491,16 +491,16 @@ impl ParquetRecordBatchStreamBuilder { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => self.input.0.get_bytes(offset..offset + length as usize), + Some(length) => self.input.0.get_bytes(offset..offset + length as u64), None => self .input .0 - .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE), + .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64), } .await?; let (header, bitset_offset) = - chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?; + chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { @@ -519,14 +519,14 @@ impl ParquetRecordBatchStreamBuilder { } let bitset = match column_metadata.bloom_filter_length() { - Some(_) => buffer.slice((bitset_offset as usize - offset)..), + Some(_) => buffer.slice((bitset_offset as usize - offset as usize)..), None => { - let bitset_length: usize = header.num_bytes.try_into().map_err(|_| { + let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; self.input .0 - .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length) + .get_bytes(bitset_offset ..bitset_offset + bitset_length) .await? } }; @@ -939,7 +939,7 @@ impl InMemoryRowGroup<'_> { if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` - let mut page_start_offsets: Vec> = vec![]; + let mut page_start_offsets: Vec> = vec![]; let fetch_ranges = self .column_chunks @@ -952,11 +952,11 @@ impl InMemoryRowGroup<'_> { .flat_map(|(idx, (_chunk, chunk_meta))| { // If the first page does not start at the beginning of the column, // then we need to also fetch a dictionary page. - let mut ranges = vec![]; + let mut ranges: Vec> = vec![]; let (start, _len) = chunk_meta.byte_range(); match offset_index[idx].page_locations.first() { Some(first) if first.offset as u64 != start => { - ranges.push(start as usize..first.offset as usize); + ranges.push(start..first.offset as u64); } _ => (), } @@ -984,7 +984,7 @@ impl InMemoryRowGroup<'_> { *chunk = Some(Arc::new(ColumnChunkData::Sparse { length: metadata.column(idx).byte_range().1 as usize, - data: offsets.into_iter().zip(chunks.into_iter()).collect(), + data: offsets.into_iter().map(|x| {x as usize}).zip(chunks.into_iter()).collect(), })) } } @@ -997,7 +997,7 @@ impl InMemoryRowGroup<'_> { .map(|(idx, _chunk)| { let column = metadata.column(idx); let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + start..(start + length) }) .collect(); @@ -1193,9 +1193,10 @@ mod tests { } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.requests.lock().unwrap().push(range.clone()); - futures::future::ready(Ok(self.data.slice(range))).boxed() + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + let range = range.clone(); + self.requests.lock().unwrap().push(range.start as usize..range.end as usize); + futures::future::ready(Ok(self.data.slice(range.start as usize..range.end as usize))).boxed() } fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index a1e94efd1451..6d93ab2b91e6 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -146,15 +146,22 @@ impl ParquetObjectReader { } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.spawn(|store, path| store.get_range(path, range)) + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.spawn(move |store, path| { + let usize_range = (range.start as usize)..(range.end as usize); + store.get_range(path, usize_range) + }) + } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) + self.spawn(|store, path| async move { + let ranges:Vec> = ranges.into_iter().map(|r| r.start as usize..r.end as usize).collect(); + store.get_ranges(path, &ranges).await + }.boxed()) } // This method doesn't directly call `self.spawn` because all of the IO that is done down the From fef371960c0da155466cae493bb0b9f3e61cd0c6 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Sat, 15 Mar 2025 13:44:54 +0530 Subject: [PATCH 2/3] update metadatafetch trait Signed-off-by: Arpit Bandejiya --- parquet/src/arrow/async_reader/metadata.rs | 16 ++++++++-------- parquet/src/file/metadata/reader.rs | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 332f7c67bf4a..5d427d5f05d4 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -66,12 +66,12 @@ pub trait MetadataFetch { /// /// Note the returned type is a boxed future, often created by /// [FutureExt::boxed]. See the trait documentation for an example - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; } impl MetadataFetch for &mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.get_bytes(range.start as u64..range.end as u64) + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.get_bytes(range.start..range.end) } } @@ -107,7 +107,7 @@ impl MetadataLoader { file_size - FOOTER_SIZE }; - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?; let suffix_len = suffix.len(); let mut footer = [0; FOOTER_SIZE]; @@ -127,7 +127,7 @@ impl MetadataLoader { // Did not fetch the entire file metadata in the initial read, need to make a second request let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let meta = fetch.fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64).await?; (ParquetMetaDataReader::decode_metadata(&meta)?, None) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; @@ -182,7 +182,7 @@ impl MetadataLoader { remainder.slice(offset..range.end - *remainder_start + offset) } // Note: this will potentially fetch data already in remainder, this keeps things simple - _ => self.fetch.fetch(range.start..range.end).await?, + _ => self.fetch.fetch(range.start as u64..range.end as u64).await?, }; // Sanity check @@ -246,8 +246,8 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { self.0(range).await }.boxed() + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { self.0(range.start as usize..range.end as usize).await }.boxed() } } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index b80e76d7929a..a0b8d5b08f74 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -460,7 +460,7 @@ impl ParquetMetaDataReader { remainder.slice(offset..end) } // Note: this will potentially fetch data already in remainder, this keeps things simple - _ => fetch.fetch(range.start..range.end).await?, + _ => fetch.fetch(range.start as u64..range.end as u64).await?, }; // Sanity check @@ -598,7 +598,7 @@ impl ParquetMetaDataReader { // Note: prefetch > file_size is ok since we're using saturating_sub. let footer_start = file_size.saturating_sub(prefetch); - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?; let suffix_len = suffix.len(); let fetch_len = file_size - footer_start; if suffix_len < fetch_len { @@ -626,7 +626,7 @@ impl ParquetMetaDataReader { // Did not fetch the entire file metadata in the initial read, need to make a second request if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let meta = fetch.fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64).await?; Ok((self.decode_footer_metadata(&meta, &footer)?, None)) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; @@ -1112,18 +1112,18 @@ mod async_tests { impl MetadataFetch for MetadataFetchFn where - F: FnMut(Range) -> Fut + Send, + F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } - fn read_range(file: &mut File, range: Range) -> Result { - file.seek(SeekFrom::Start(range.start as _))?; + fn read_range(file: &mut File, range: Range) -> Result { + file.seek(SeekFrom::Start(range.start))?; let len = range.end - range.start; - let mut buf = Vec::with_capacity(len); + let mut buf = Vec::with_capacity(len as usize); file.take(len as _).read_to_end(&mut buf)?; Ok(buf.into()) } From 917b0e673beafbb9b92735aae07ec245bab6f932 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Sat, 22 Mar 2025 15:18:44 +0530 Subject: [PATCH 3/3] Fix lint issue --- parquet/examples/read_with_rowgroup.rs | 4 +--- parquet/src/arrow/arrow_reader/selection.rs | 2 +- parquet/src/arrow/async_reader/metadata.rs | 17 ++++++++++++----- parquet/src/arrow/async_reader/mod.rs | 20 +++++++++++++++----- parquet/src/arrow/async_reader/store.rs | 15 ++++++++++----- parquet/src/file/metadata/reader.rs | 4 +++- 6 files changed, 42 insertions(+), 20 deletions(-) diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 23dd0d92ea43..a6b7ec858314 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -166,9 +166,7 @@ impl InMemoryRowGroup { for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); - let data = reader - .get_bytes(start..(start + len)) - .await?; + let data = reader.get_bytes(start..(start + len)).await?; vs[leaf_idx] = Some(Arc::new(ColumnChunkData { offset: start as usize, diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index dd4032bcda37..ca850bc81fe2 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -163,7 +163,7 @@ impl RowSelection { /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { - let mut ranges:Vec> = vec![]; + let mut ranges: Vec> = vec![]; let mut row_offset = 0; let mut pages = page_locations.iter().peekable(); diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 5d427d5f05d4..53efa08f88f0 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -48,12 +48,13 @@ use std::ops::Range; /// file: tokio::fs::File, /// } /// impl MetadataFetch for TokioFileMetadata { -/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { +/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { /// // return a future that fetches data in range /// async move { -/// let mut buf = vec![0; range.len()]; // target buffer +/// let len = (range.end - range.start) as usize; +/// let mut buf = vec![0; len]; // target buffer /// // seek to the start of the range and read the data -/// self.file.seek(SeekFrom::Start(range.start as u64)).await?; +/// self.file.seek(SeekFrom::Start(range.start)).await?; /// self.file.read_exact(&mut buf).await?; /// Ok(Bytes::from(buf)) // convert to Bytes /// } @@ -127,7 +128,9 @@ impl MetadataLoader { // Did not fetch the entire file metadata in the initial read, need to make a second request let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64).await?; + let meta = fetch + .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64) + .await?; (ParquetMetaDataReader::decode_metadata(&meta)?, None) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; @@ -182,7 +185,11 @@ impl MetadataLoader { remainder.slice(offset..range.end - *remainder_start + offset) } // Note: this will potentially fetch data already in remainder, this keeps things simple - _ => self.fetch.fetch(range.start as u64..range.end as u64).await?, + _ => { + self.fetch + .fetch(range.start as u64..range.end as u64) + .await? + } }; // Sanity check diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index da576f4d7db9..637c94764f8e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -519,14 +519,14 @@ impl ParquetRecordBatchStreamBuilder { } let bitset = match column_metadata.bloom_filter_length() { - Some(_) => buffer.slice((bitset_offset as usize - offset as usize)..), + Some(_) => buffer.slice((bitset_offset as usize - offset as usize)..), None => { let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; self.input .0 - .get_bytes(bitset_offset ..bitset_offset + bitset_length) + .get_bytes(bitset_offset..bitset_offset + bitset_length) .await? } }; @@ -984,7 +984,11 @@ impl InMemoryRowGroup<'_> { *chunk = Some(Arc::new(ColumnChunkData::Sparse { length: metadata.column(idx).byte_range().1 as usize, - data: offsets.into_iter().map(|x| {x as usize}).zip(chunks.into_iter()).collect(), + data: offsets + .into_iter() + .map(|x| x as usize) + .zip(chunks.into_iter()) + .collect(), })) } } @@ -1195,8 +1199,14 @@ mod tests { impl AsyncFileReader for TestReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { let range = range.clone(); - self.requests.lock().unwrap().push(range.start as usize..range.end as usize); - futures::future::ready(Ok(self.data.slice(range.start as usize..range.end as usize))).boxed() + self.requests + .lock() + .unwrap() + .push(range.start as usize..range.end as usize); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() } fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 6d93ab2b91e6..cc0a5dd7c54c 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -151,17 +151,22 @@ impl AsyncFileReader for ParquetObjectReader { let usize_range = (range.start as usize)..(range.end as usize); store.get_range(path, usize_range) }) - } fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - self.spawn(|store, path| async move { - let ranges:Vec> = ranges.into_iter().map(|r| r.start as usize..r.end as usize).collect(); - store.get_ranges(path, &ranges).await - }.boxed()) + self.spawn(|store, path| { + async move { + let ranges: Vec> = ranges + .into_iter() + .map(|r| r.start as usize..r.end as usize) + .collect(); + store.get_ranges(path, &ranges).await + } + .boxed() + }) } // This method doesn't directly call `self.spawn` because all of the IO that is done down the diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index a0b8d5b08f74..f0b5a714c132 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -626,7 +626,9 @@ impl ParquetMetaDataReader { // Did not fetch the entire file metadata in the initial read, need to make a second request if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64).await?; + let meta = fetch + .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64) + .await?; Ok((self.decode_footer_metadata(&meta, &footer)?, None)) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start;