diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 8cccc7fe14ac..a6b7ec858314 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -166,9 +166,7 @@ impl InMemoryRowGroup { for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); - let data = reader - .get_bytes(start as usize..(start + len) as usize) - .await?; + let data = reader.get_bytes(start..(start + len)).await?; vs[leaf_idx] = Some(Arc::new(ColumnChunkData { offset: start as usize, diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 378d2253f19a..ca850bc81fe2 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -162,8 +162,8 @@ impl RowSelection { /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) - pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { - let mut ranges = vec![]; + pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { + let mut ranges: Vec> = vec![]; let mut row_offset = 0; let mut pages = page_locations.iter().peekable(); @@ -175,8 +175,8 @@ impl RowSelection { while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { if !(selector.skip || current_page_included) { - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; ranges.push(start..end); current_page_included = true; } @@ -202,7 +202,7 @@ impl RowSelection { if !(selector.skip || current_page_included) { let start = page.offset as usize; let end = start + page.compressed_page_size as usize; - ranges.push(start..end); + ranges.push(start as u64..end as u64); } current_selector = selectors.next() } diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 71d2e57ddd50..53efa08f88f0 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -48,12 +48,13 @@ use std::ops::Range; /// file: tokio::fs::File, /// } /// impl MetadataFetch for TokioFileMetadata { -/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { +/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { /// // return a future that fetches data in range /// async move { -/// let mut buf = vec![0; range.len()]; // target buffer +/// let len = (range.end - range.start) as usize; +/// let mut buf = vec![0; len]; // target buffer /// // seek to the start of the range and read the data -/// self.file.seek(SeekFrom::Start(range.start as u64)).await?; +/// self.file.seek(SeekFrom::Start(range.start)).await?; /// self.file.read_exact(&mut buf).await?; /// Ok(Bytes::from(buf)) // convert to Bytes /// } @@ -66,12 +67,12 @@ pub trait MetadataFetch { /// /// Note the returned type is a boxed future, often created by /// [FutureExt::boxed]. See the trait documentation for an example - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; } impl MetadataFetch for &mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.get_bytes(range) + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.get_bytes(range.start..range.end) } } @@ -107,7 +108,7 @@ impl MetadataLoader { file_size - FOOTER_SIZE }; - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?; let suffix_len = suffix.len(); let mut footer = [0; FOOTER_SIZE]; @@ -127,7 +128,9 @@ impl MetadataLoader { // Did not fetch the entire file metadata in the initial read, need to make a second request let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let meta = fetch + .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64) + .await?; (ParquetMetaDataReader::decode_metadata(&meta)?, None) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; @@ -182,7 +185,11 @@ impl MetadataLoader { remainder.slice(offset..range.end - *remainder_start + offset) } // Note: this will potentially fetch data already in remainder, this keeps things simple - _ => self.fetch.fetch(range.start..range.end).await?, + _ => { + self.fetch + .fetch(range.start as u64..range.end as u64) + .await? + } }; // Sanity check @@ -246,8 +253,8 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { self.0(range).await }.boxed() + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { self.0(range.start as usize..range.end as usize).await }.boxed() } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index fd49ad22934d..637c94764f8e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -86,10 +86,10 @@ pub use store::*; /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html pub trait AsyncFileReader: Send { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { async move { let mut result = Vec::with_capacity(ranges.len()); @@ -124,11 +124,11 @@ pub trait AsyncFileReader: Send { /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { self.as_mut().get_bytes(range) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { self.as_mut().get_byte_ranges(ranges) } @@ -145,14 +145,14 @@ impl AsyncFileReader for Box { } impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { - self.seek(SeekFrom::Start(range.start as u64)).await?; + self.seek(SeekFrom::Start(range.start)).await?; let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read); - let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; - if read != to_read { + let mut buffer = Vec::with_capacity(to_read as usize); + let read = self.take(to_read).read_to_end(&mut buffer).await?; + if read != to_read as usize { return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); } @@ -482,7 +482,7 @@ impl ParquetRecordBatchStreamBuilder { let metadata = self.metadata.row_group(row_group_idx); let column_metadata = metadata.column(column_idx); - let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() { + let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() { offset .try_into() .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))? @@ -491,16 +491,16 @@ impl ParquetRecordBatchStreamBuilder { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => self.input.0.get_bytes(offset..offset + length as usize), + Some(length) => self.input.0.get_bytes(offset..offset + length as u64), None => self .input .0 - .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE), + .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64), } .await?; let (header, bitset_offset) = - chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?; + chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { @@ -519,14 +519,14 @@ impl ParquetRecordBatchStreamBuilder { } let bitset = match column_metadata.bloom_filter_length() { - Some(_) => buffer.slice((bitset_offset as usize - offset)..), + Some(_) => buffer.slice((bitset_offset as usize - offset as usize)..), None => { - let bitset_length: usize = header.num_bytes.try_into().map_err(|_| { + let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; self.input .0 - .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length) + .get_bytes(bitset_offset..bitset_offset + bitset_length) .await? } }; @@ -939,7 +939,7 @@ impl InMemoryRowGroup<'_> { if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` - let mut page_start_offsets: Vec> = vec![]; + let mut page_start_offsets: Vec> = vec![]; let fetch_ranges = self .column_chunks @@ -952,11 +952,11 @@ impl InMemoryRowGroup<'_> { .flat_map(|(idx, (_chunk, chunk_meta))| { // If the first page does not start at the beginning of the column, // then we need to also fetch a dictionary page. - let mut ranges = vec![]; + let mut ranges: Vec> = vec![]; let (start, _len) = chunk_meta.byte_range(); match offset_index[idx].page_locations.first() { Some(first) if first.offset as u64 != start => { - ranges.push(start as usize..first.offset as usize); + ranges.push(start..first.offset as u64); } _ => (), } @@ -984,7 +984,11 @@ impl InMemoryRowGroup<'_> { *chunk = Some(Arc::new(ColumnChunkData::Sparse { length: metadata.column(idx).byte_range().1 as usize, - data: offsets.into_iter().zip(chunks.into_iter()).collect(), + data: offsets + .into_iter() + .map(|x| x as usize) + .zip(chunks.into_iter()) + .collect(), })) } } @@ -997,7 +1001,7 @@ impl InMemoryRowGroup<'_> { .map(|(idx, _chunk)| { let column = metadata.column(idx); let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + start..(start + length) }) .collect(); @@ -1193,9 +1197,16 @@ mod tests { } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.requests.lock().unwrap().push(range.clone()); - futures::future::ready(Ok(self.data.slice(range))).boxed() + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + let range = range.clone(); + self.requests + .lock() + .unwrap() + .push(range.start as usize..range.end as usize); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() } fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index a1e94efd1451..cc0a5dd7c54c 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -146,15 +146,27 @@ impl ParquetObjectReader { } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.spawn(|store, path| store.get_range(path, range)) + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + self.spawn(move |store, path| { + let usize_range = (range.start as usize)..(range.end as usize); + store.get_range(path, usize_range) + }) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) + self.spawn(|store, path| { + async move { + let ranges: Vec> = ranges + .into_iter() + .map(|r| r.start as usize..r.end as usize) + .collect(); + store.get_ranges(path, &ranges).await + } + .boxed() + }) } // This method doesn't directly call `self.spawn` because all of the IO that is done down the diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index b80e76d7929a..f0b5a714c132 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -460,7 +460,7 @@ impl ParquetMetaDataReader { remainder.slice(offset..end) } // Note: this will potentially fetch data already in remainder, this keeps things simple - _ => fetch.fetch(range.start..range.end).await?, + _ => fetch.fetch(range.start as u64..range.end as u64).await?, }; // Sanity check @@ -598,7 +598,7 @@ impl ParquetMetaDataReader { // Note: prefetch > file_size is ok since we're using saturating_sub. let footer_start = file_size.saturating_sub(prefetch); - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?; let suffix_len = suffix.len(); let fetch_len = file_size - footer_start; if suffix_len < fetch_len { @@ -626,7 +626,9 @@ impl ParquetMetaDataReader { // Did not fetch the entire file metadata in the initial read, need to make a second request if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let meta = fetch + .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64) + .await?; Ok((self.decode_footer_metadata(&meta, &footer)?, None)) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; @@ -1112,18 +1114,18 @@ mod async_tests { impl MetadataFetch for MetadataFetchFn where - F: FnMut(Range) -> Fut + Send, + F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } - fn read_range(file: &mut File, range: Range) -> Result { - file.seek(SeekFrom::Start(range.start as _))?; + fn read_range(file: &mut File, range: Range) -> Result { + file.seek(SeekFrom::Start(range.start))?; let len = range.end - range.start; - let mut buf = Vec::with_capacity(len); + let mut buf = Vec::with_capacity(len as usize); file.take(len as _).read_to_end(&mut buf)?; Ok(buf.into()) }