diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5d5a7036eefb..6ce33c784e26 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -53,7 +53,6 @@ use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; -use crate::file::FOOTER_SIZE; use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; mod metadata; @@ -126,6 +125,18 @@ impl AsyncFileReader for Box { } } +impl MetadataSuffixFetch for T { + fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result> { + async move { + self.seek(SeekFrom::End(-(suffix as i64))).await?; + let mut buf = Vec::with_capacity(suffix); + self.take(suffix as _).read_to_end(&mut buf).await?; + Ok(buf.into()) + } + .boxed() + } +} + impl AsyncFileReader for T { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { @@ -147,31 +158,16 @@ impl AsyncFileReader for T { &'a mut self, options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { - const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64; async move { - self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?; - - let mut buf = [0_u8; FOOTER_SIZE]; - self.read_exact(&mut buf).await?; - - let footer = ParquetMetaDataReader::decode_footer_tail(&buf)?; - let metadata_len = footer.metadata_length(); - - self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64)) - .await?; - - let mut buf = Vec::with_capacity(metadata_len); - self.take(metadata_len as _).read_to_end(&mut buf).await?; - - let metadata_reader = ParquetMetaDataReader::new(); + let metadata_reader = ParquetMetaDataReader::new() + .with_page_indexes(options.is_some_and(|o| o.page_index)); #[cfg(feature = "encryption")] let metadata_reader = metadata_reader.with_decryption_properties( options.and_then(|o| o.file_decryption_properties.as_ref()), ); - let parquet_metadata = metadata_reader.decode_footer_metadata(&buf, &footer)?; - + let parquet_metadata = metadata_reader.load_via_suffix_and_finish(self).await?; Ok(Arc::new(parquet_metadata)) } .boxed() @@ -182,36 +178,11 @@ impl ArrowReaderMetadata { /// Returns a new [`ArrowReaderMetadata`] for this builder /// /// See [`ParquetRecordBatchStreamBuilder::new_with_metadata`] for how this can be used - /// - /// # Notes - /// - /// If `options` has [`ArrowReaderOptions::with_page_index`] true, but - /// `Self::metadata` is missing the page index, this function will attempt - /// to load the page index by making an object store request. pub async fn load_async( input: &mut T, options: ArrowReaderOptions, ) -> Result { - // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata - // took an argument to fetch the page indexes. - let mut metadata = input.get_metadata(Some(&options)).await?; - - if options.page_index - && metadata.column_index().is_none() - && metadata.offset_index().is_none() - { - let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); - let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); - - #[cfg(feature = "encryption")] - { - reader = - reader.with_decryption_properties(options.file_decryption_properties.as_ref()); - } - - reader.load_page_index(input).await?; - metadata = Arc::new(reader.finish()?) - } + let metadata = input.get_metadata(Some(&options)).await?; Self::try_new(metadata, options) } } @@ -1120,7 +1091,7 @@ mod tests { #[derive(Clone)] struct TestReader { data: Bytes, - metadata: Arc, + metadata: Option>, requests: Arc>>>, } @@ -1132,9 +1103,14 @@ mod tests { fn get_metadata<'a>( &'a mut self, - _options: Option<&'a ArrowReaderOptions>, + options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, Result>> { - futures::future::ready(Ok(self.metadata.clone())).boxed() + let metadata_reader = ParquetMetaDataReader::new() + .with_page_indexes(options.is_some_and(|o| o.page_index)); + self.metadata = Some(Arc::new( + metadata_reader.parse_and_finish(&self.data).unwrap(), + )); + futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed() } } @@ -1144,16 +1120,9 @@ mod tests { let path = format!("{testdata}/alltypes_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - assert_eq!(metadata.num_row_groups(), 1); - let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1162,6 +1131,9 @@ mod tests { .await .unwrap(); + let metadata = builder.metadata().clone(); + assert_eq!(metadata.num_row_groups(), 1); + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); let stream = builder .with_projection(mask.clone()) @@ -1201,16 +1173,9 @@ mod tests { let path = format!("{testdata}/alltypes_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - assert_eq!(metadata.num_row_groups(), 1); - let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1219,6 +1184,9 @@ mod tests { .await .unwrap(); + let metadata = builder.metadata().clone(); + assert_eq!(metadata.num_row_groups(), 1); + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); let mut stream = builder .with_projection(mask.clone()) @@ -1266,16 +1234,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - assert_eq!(metadata.num_row_groups(), 1); - let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1286,6 +1247,7 @@ mod tests { // The builder should have page and offset indexes loaded now let metadata_with_index = builder.metadata(); + assert_eq!(metadata_with_index.num_row_groups(), 1); // Check offset indexes are present for all columns let offset_index = metadata_with_index.offset_index().unwrap(); @@ -1343,7 +1305,7 @@ mod tests { let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1351,6 +1313,8 @@ mod tests { .await .unwrap(); + assert_eq!(builder.metadata().num_row_groups(), 1); + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); let stream = builder .with_projection(mask.clone()) @@ -1380,16 +1344,9 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - assert_eq!(metadata.num_row_groups(), 1); - let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1398,6 +1355,8 @@ mod tests { .await .unwrap(); + assert_eq!(builder.metadata().num_row_groups(), 1); + let selection = RowSelection::from(vec![ RowSelector::skip(21), // Skip first page RowSelector::select(21), // Select page to boundary @@ -1438,13 +1397,6 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - assert_eq!(metadata.num_row_groups(), 1); - let mut rand = rng(); for _ in 0..100 { @@ -1472,7 +1424,7 @@ mod tests { let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1481,6 +1433,8 @@ mod tests { .await .unwrap(); + assert_eq!(builder.metadata().num_row_groups(), 1); + let col_idx: usize = rand.random_range(0..13); let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]); @@ -1505,13 +1459,6 @@ mod tests { let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - assert_eq!(metadata.num_row_groups(), 1); - let mut rand = rng(); let mut expected_rows = 0; @@ -1543,7 +1490,7 @@ mod tests { let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1552,6 +1499,8 @@ mod tests { .await .unwrap(); + assert_eq!(builder.metadata().num_row_groups(), 1); + let col_idx: usize = rand.random_range(0..13); let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![col_idx]); @@ -1593,7 +1542,7 @@ mod tests { let test = TestReader { data, - metadata: Arc::new(metadata), + metadata: Default::default(), requests: Default::default(), }; let requests = test.requests.clone(); @@ -1670,7 +1619,7 @@ mod tests { let test = TestReader { data, - metadata: Arc::new(metadata), + metadata: Default::default(), requests: Default::default(), }; @@ -1756,13 +1705,12 @@ mod tests { .parse_and_finish(&data) .unwrap(); let parquet_schema = metadata.file_metadata().schema_descr_ptr(); - let metadata = Arc::new(metadata); assert_eq!(metadata.num_row_groups(), 1); let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1830,7 +1778,7 @@ mod tests { let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1898,15 +1846,9 @@ mod tests { let path = format!("{testdata}/alltypes_plain.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let file_rows = metadata.file_metadata().num_rows() as usize; - let metadata = Arc::new(metadata); - let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -1914,6 +1856,8 @@ mod tests { .await .unwrap(); + let file_rows = builder.metadata().file_metadata().num_rows() as usize; + let stream = builder .with_projection(ProjectionMask::all()) .with_batch_size(1024) @@ -2045,13 +1989,9 @@ mod tests { let testdata = arrow::util::test_util::parquet_test_data(); let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); let data = Bytes::from(std::fs::read(path).unwrap()); - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; let builder = ParquetRecordBatchStreamBuilder::new(async_reader) @@ -2076,19 +2016,9 @@ mod tests { } async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) { - let metadata = ParquetMetaDataReader::new() - .parse_and_finish(&data) - .unwrap(); - let metadata = Arc::new(metadata); - - assert_eq!(metadata.num_row_groups(), 1); - let row_group = metadata.row_group(0); - let column = row_group.column(0); - assert_eq!(column.bloom_filter_length().is_some(), with_length); - let async_reader = TestReader { data: data.clone(), - metadata: metadata.clone(), + metadata: Default::default(), requests: Default::default(), }; @@ -2096,6 +2026,12 @@ mod tests { .await .unwrap(); + let metadata = builder.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + let row_group = metadata.row_group(0); + let column = row_group.column(0); + assert_eq!(column.bloom_filter_length().is_some(), with_length); + let sbbf = builder .get_row_group_column_bloom_filter(0, 0) .await @@ -2225,7 +2161,7 @@ mod tests { let test = TestReader { data, - metadata: Arc::new(metadata), + metadata: Default::default(), requests: Default::default(), }; let requests = test.requests.clone();