diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9b343923f0145..b7d66e4f27898 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -134,7 +134,7 @@ mod tests { use datafusion_datasource::{ListingTableUrl, PartitionedFile}; use datafusion_datasource_parquet::{ fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc, - ParquetFormat, ParquetFormatFactory, ParquetSink, + ObjectStoreFetch, ParquetFormat, ParquetFormatFactory, ParquetSink, }; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; @@ -180,8 +180,8 @@ mod tests { let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); - let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?; + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?; let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; @@ -193,11 +193,17 @@ mod tests { ForceViews::No => false, }; let format = ParquetFormat::default().with_force_view_types(force_views); - let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + let schema = format.infer_schema(&ctx, &store, &meta).await?; - let stats = - fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None, None) - .await?; + let stats = fetch_statistics( + store.as_ref(), + schema.clone(), + &meta[0], + None, + None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -205,8 +211,15 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!(c2_stats.null_count, Precision::Exact(3)); - let stats = - fetch_statistics(store.as_ref(), schema, &meta[1], None, None).await?; + let stats = fetch_statistics( + store.as_ref(), + schema, + &meta[1], + None, + None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; let c2_stats = &stats.column_statistics[1]; @@ -240,11 +253,9 @@ mod tests { let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); let batch1 = - RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())]) - .unwrap(); + RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())])?; let batch2 = - RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())]) - .unwrap(); + RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())])?; let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; @@ -252,7 +263,7 @@ mod tests { let session = SessionContext::new(); let ctx = session.state(); let format = ParquetFormat::default(); - let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + let schema = format.infer_schema(&ctx, &store, &meta).await?; let order: Vec<_> = ["a", "b", "c", "d"] .into_iter() @@ -368,29 +379,66 @@ mod tests { let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); - let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?; + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?; let store = Arc::new(RequestCountingObjectStore::new(Arc::new( LocalFileSystem::new(), ))); let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; + let session = SessionContext::new(); + let ctx = session.state(); + // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata fetch_parquet_metadata( - store.as_ref() as &dyn ObjectStore, + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), &meta[0], Some(9), None, + None, ) .await .expect("error reading metadata with hint"); - assert_eq!(store.request_count(), 2); - let session = SessionContext::new(); - let ctx = session.state(); + // Increases by 3 because cache has no entries yet + fetch_parquet_metadata( + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), + &meta[0], + Some(9), + None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 5); + + // No increase because cache has an entry + fetch_parquet_metadata( + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), + &meta[0], + Some(9), + None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 5); + + // Increase by 2 because `get_file_metadata_cache()` is None + fetch_parquet_metadata( + ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), + &meta[0], + Some(9), + None, + None, + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 7); + let force_views = match force_views { ForceViews::Yes => true, ForceViews::No => false, @@ -398,19 +446,24 @@ mod tests { let format = ParquetFormat::default() .with_metadata_size_hint(Some(9)) .with_force_view_types(force_views); - let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); - + // Increase by 3, partial cache being used. + let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 10); + // No increase, full cache being used. + let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 10); + + // No increase, cache being used let stats = fetch_statistics( store.upcast().as_ref(), schema.clone(), &meta[0], Some(9), None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; + assert_eq!(store.request_count(), 10); assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -425,28 +478,76 @@ mod tests { // Use the file size as the hint so we can get the full metadata from the first fetch let size_hint = meta[0].size as usize; - fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint), None) - .await - .expect("error reading metadata with hint"); - + fetch_parquet_metadata( + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), + &meta[0], + Some(size_hint), + None, + None, + ) + .await + .expect("error reading metadata with hint"); // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); + let session = SessionContext::new(); + let ctx = session.state(); + // Increases by 1 because cache has no entries yet and new session context + fetch_parquet_metadata( + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), + &meta[0], + Some(size_hint), + None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 2); + + // No increase because cache has an entry + fetch_parquet_metadata( + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), + &meta[0], + Some(size_hint), + None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 2); + + // Increase by 1 because `get_file_metadata_cache` is None + fetch_parquet_metadata( + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), + &meta[0], + Some(size_hint), + None, + None, + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 3); + let format = ParquetFormat::default() .with_metadata_size_hint(Some(size_hint)) .with_force_view_types(force_views); - let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) - .await - .unwrap(); + // Increase by 1, partial cache being used. + let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 4); + // No increase, full cache being used. + let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; + assert_eq!(store.request_count(), 4); + // No increase, cache being used let stats = fetch_statistics( store.upcast().as_ref(), schema.clone(), &meta[0], Some(size_hint), None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; + assert_eq!(store.request_count(), 4); assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -460,11 +561,27 @@ mod tests { // Use the a size hint larger than the file size to make sure we don't panic let size_hint = (meta[0].size + 100) as usize; + fetch_parquet_metadata( + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), + &meta[0], + Some(size_hint), + None, + None, + ) + .await + .expect("error reading metadata with hint"); + assert_eq!(store.request_count(), 1); - fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint), None) - .await - .expect("error reading metadata with hint"); - + // No increase because cache has an entry + fetch_parquet_metadata( + ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), + &meta[0], + Some(size_hint), + None, + Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await + .expect("error reading metadata with hint"); assert_eq!(store.request_count(), 1); Ok(()) @@ -483,25 +600,37 @@ mod tests { // Data for column c_dic: ["a", "b", "c", "d"] let values = StringArray::from_iter_values(["a", "b", "c", "d"]); let keys = Int32Array::from_iter_values([0, 1, 2, 3]); - let dic_array = - DictionaryArray::::try_new(keys, Arc::new(values)).unwrap(); + let dic_array = DictionaryArray::::try_new(keys, Arc::new(values))?; let c_dic: ArrayRef = Arc::new(dic_array); - let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)])?; // Use store_parquet to write each batch to its own file // . batch1 written into first file and includes: // - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available. - let store = Arc::new(LocalFileSystem::new()) as _; + let store = Arc::new(RequestCountingObjectStore::new(Arc::new( + LocalFileSystem::new(), + ))); let (files, _file_names) = store_parquet(vec![batch1], false).await?; let state = SessionContext::new().state(); let format = ParquetFormat::default(); - let schema = format.infer_schema(&state, &store, &files).await.unwrap(); - - // Fetch statistics for first file - let pq_meta = - fetch_parquet_metadata(store.as_ref(), &files[0], None, None).await?; + let _schema = format.infer_schema(&state, &store.upcast(), &files).await?; + assert_eq!(store.request_count(), 3); + // No increase, cache being used. + let schema = format.infer_schema(&state, &store.upcast(), &files).await?; + assert_eq!(store.request_count(), 3); + + // No increase in request count because cache is not empty + let pq_meta = fetch_parquet_metadata( + ObjectStoreFetch::new(store.as_ref(), &files[0]), + &files[0], + None, + None, + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await?; + assert_eq!(store.request_count(), 3); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(4)); @@ -527,18 +656,20 @@ mod tests { // Data for column c1: ["Foo", null, "bar"] let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); - let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap(); + let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?; // Data for column c2: [1, 2, null] let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); - let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); + let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?; // Use store_parquet to write each batch to its own file // . batch1 written into first file and includes: // - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values // . batch2 written into second file and includes: // - column c2 that has 3 rows with one null. Stats min and max of int are available and 1 and 2 respectively - let store = Arc::new(LocalFileSystem::new()) as _; + let store = Arc::new(RequestCountingObjectStore::new(Arc::new( + LocalFileSystem::new(), + ))); let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?; let force_views = match force_views { @@ -549,7 +680,8 @@ mod tests { let mut state = SessionContext::new().state(); state = set_view_state(state, force_views); let format = ParquetFormat::default().with_force_view_types(force_views); - let schema = format.infer_schema(&state, &store, &files).await.unwrap(); + let schema = format.infer_schema(&state, &store.upcast(), &files).await?; + assert_eq!(store.request_count(), 6); let null_i64 = ScalarValue::Int64(None); let null_utf8 = if force_views { @@ -558,9 +690,16 @@ mod tests { Utf8(None) }; - // Fetch statistics for first file - let pq_meta = - fetch_parquet_metadata(store.as_ref(), &files[0], None, None).await?; + // No increase in request count because cache is not empty + let pq_meta = fetch_parquet_metadata( + ObjectStoreFetch::new(store.as_ref(), &files[0]), + &files[0], + None, + None, + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await?; + assert_eq!(store.request_count(), 6); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 @@ -585,9 +724,16 @@ mod tests { assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone())); assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); - // Fetch statistics for second file - let pq_meta = - fetch_parquet_metadata(store.as_ref(), &files[1], None, None).await?; + // No increase in request count because cache is not empty + let pq_meta = fetch_parquet_metadata( + ObjectStoreFetch::new(store.as_ref(), &files[1]), + &files[1], + None, + None, + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), + ) + .await?; + assert_eq!(store.request_count(), 6); let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null @@ -1018,22 +1164,20 @@ mod tests { async fn test_read_parquet_page_index() -> Result<()> { let testdata = datafusion_common::test_util::parquet_test_data(); let path = format!("{testdata}/alltypes_tiny_pages.parquet"); - let file = File::open(path).await.unwrap(); + let file = File::open(path).await?; let options = ArrowReaderOptions::new().with_page_index(true); let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone()) - .await - .unwrap() + .await? .metadata() .clone(); check_page_index_validation(builder.column_index(), builder.offset_index()); let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); - let file = File::open(path).await.unwrap(); + let file = File::open(path).await?; let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options) - .await - .unwrap() + .await? .metadata() .clone(); check_page_index_validation(builder.column_index(), builder.offset_index()); @@ -1115,7 +1259,7 @@ mod tests { /// Test that 0-byte files don't break while reading #[tokio::test] async fn test_read_empty_parquet() -> Result<()> { - let tmp_dir = tempfile::TempDir::new().unwrap(); + let tmp_dir = tempfile::TempDir::new()?; let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy()); File::create(&path).await?; @@ -1139,12 +1283,10 @@ mod tests { /// Test that 0-byte files don't break while reading #[tokio::test] async fn test_read_partitioned_empty_parquet() -> Result<()> { - let tmp_dir = tempfile::TempDir::new().unwrap(); + let tmp_dir = tempfile::TempDir::new()?; let partition_dir = tmp_dir.path().join("col1=a"); - std::fs::create_dir(&partition_dir).unwrap(); - File::create(partition_dir.join("empty.parquet")) - .await - .unwrap(); + std::fs::create_dir(&partition_dir)?; + File::create(partition_dir.join("empty.parquet")).await?; let ctx = SessionContext::new(); @@ -1270,7 +1412,7 @@ mod tests { ) .expect("Failed to create empty RecordBatch"); - let tmp_dir = tempfile::TempDir::new().unwrap(); + let tmp_dir = tempfile::TempDir::new()?; let path = format!("{}/empty2.parquet", tmp_dir.path().to_string_lossy()); let ctx = SessionContext::new(); @@ -1465,7 +1607,7 @@ mod tests { // create data let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); - let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?; // write stream FileSink::write_all( @@ -1544,7 +1686,7 @@ mod tests { // create data with 2 partitions let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); - let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?; // write stream FileSink::write_all( @@ -1637,8 +1779,7 @@ mod tests { // create data let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"])); let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"])); - let batch = - RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap(); + let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?; // create task context let task_context = build_ctx(object_store_url.as_ref()); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 5fc3513ff745b..f7e48fa9cb910 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -38,6 +38,7 @@ use datafusion_common::Result; use bytes::Bytes; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource_parquet::ObjectStoreFetch; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; @@ -237,11 +238,13 @@ impl AsyncFileReader for ParquetFileReader { _options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { Box::pin(async move { + let fetch = ObjectStoreFetch::new(self.store.as_ref(), &self.meta); let metadata = fetch_parquet_metadata( - self.store.as_ref(), + fetch, &self.meta, self.metadata_size_hint, None, + None, ) .await .map_err(|e| { @@ -249,7 +252,7 @@ impl AsyncFileReader for ParquetFileReader { "AsyncChunkReader::get_metadata error: {e}" )) })?; - Ok(Arc::new(metadata)) + Ok(metadata) }) } } diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index f851c199eb4d6..27bee10234b57 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -904,7 +904,7 @@ async fn without_pushdown_filter() { .unwrap(); // Same amount of bytes are scanned when defaulting to cache parquet metadata - assert!(bytes_scanned_with_filter == bytes_scanned_without_filter); + assert_eq!(bytes_scanned_with_filter, bytes_scanned_without_filter); } #[tokio::test] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index e7c4494685435..ab4d84ee368e4 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -63,7 +63,7 @@ use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use crate::reader::CachedParquetFileReaderFactory; +use crate::reader::{CachedParquetFileReaderFactory, CachedParquetMetaData}; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -84,6 +84,7 @@ use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; +use datafusion_execution::cache::cache_manager::FileMetadataCache; use parquet::errors::ParquetError; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; @@ -312,6 +313,7 @@ async fn fetch_schema_with_location( file: &ObjectMeta, metadata_size_hint: Option, coerce_int96: Option, + file_metadata_cache: Option>, ) -> Result<(Path, Schema)> { let file_decryption_properties = get_file_decryption_properties(state, options, &file.location)?; @@ -322,6 +324,7 @@ async fn fetch_schema_with_location( metadata_size_hint, file_decryption_properties.as_ref(), coerce_int96, + file_metadata_cache, ) .await?; Ok((loc_path, schema)) @@ -405,6 +408,7 @@ impl FileFormat for ParquetFormat { object, self.metadata_size_hint(), coerce_int96, + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 @@ -461,6 +465,7 @@ impl FileFormat for ParquetFormat { object, self.metadata_size_hint(), file_decryption_properties.as_ref(), + Some(state.runtime_env().cache_manager.get_file_metadata_cache()), ) .await?; Ok(stats) @@ -1004,13 +1009,13 @@ pub fn transform_binary_to_string(schema: &Schema) -> Schema { } /// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`] -struct ObjectStoreFetch<'a> { +pub struct ObjectStoreFetch<'a> { store: &'a dyn ObjectStore, meta: &'a ObjectMeta, } impl<'a> ObjectStoreFetch<'a> { - fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self { + pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self { Self { store, meta } } } @@ -1033,24 +1038,62 @@ impl MetadataFetch for ObjectStoreFetch<'_> { /// through [`ParquetFileReaderFactory`]. /// /// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory -pub async fn fetch_parquet_metadata( - store: &dyn ObjectStore, - meta: &ObjectMeta, +pub async fn fetch_parquet_metadata( + fetch: F, + object_meta: &ObjectMeta, size_hint: Option, #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, -) -> Result { - let file_size = meta.size; - let fetch = ObjectStoreFetch::new(store, meta); + file_metadata_cache: Option>, +) -> Result> { + let cache_metadata = + !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); + + if cache_metadata { + if let Some(parquet_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::() + .map(|cached_parquet_metadata| { + Arc::clone(cached_parquet_metadata.parquet_metadata()) + }) + }) + { + return Ok(parquet_metadata); + } + } - let reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); + let mut reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); #[cfg(feature = "parquet_encryption")] - let reader = reader.with_decryption_properties(decryption_properties); + if let Some(decryption_properties) = decryption_properties { + reader = reader.with_decryption_properties(Some(decryption_properties)); + } - reader - .load_and_finish(fetch, file_size) - .await - .map_err(DataFusionError::from) + if cache_metadata && file_metadata_cache.is_some() { + // Need to retrieve the entire metadata for the caching to be effective. + reader = reader.with_page_indexes(true); + } + + let metadata = Arc::new( + reader + .load_and_finish(fetch, object_meta.size) + .await + .map_err(DataFusionError::from)?, + ); + + if cache_metadata { + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put( + object_meta, + Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), + ); + } + } + + Ok(metadata) } /// Read and parse the schema of the Parquet file at location `path` @@ -1060,12 +1103,15 @@ async fn fetch_schema( metadata_size_hint: Option, file_decryption_properties: Option<&FileDecryptionProperties>, coerce_int96: Option, + file_metadata_cache: Option>, ) -> Result { + let fetch = ObjectStoreFetch::new(store, file); let metadata = fetch_parquet_metadata( - store, + fetch, file, metadata_size_hint, file_decryption_properties, + file_metadata_cache, ) .await?; let file_metadata = metadata.file_metadata(); @@ -1090,10 +1136,17 @@ pub async fn fetch_statistics( file: &ObjectMeta, metadata_size_hint: Option, decryption_properties: Option<&FileDecryptionProperties>, + file_metadata_cache: Option>, ) -> Result { - let metadata = - fetch_parquet_metadata(store, file, metadata_size_hint, decryption_properties) - .await?; + let fetch = ObjectStoreFetch::new(store, file); + let metadata = fetch_parquet_metadata( + fetch, + file, + metadata_size_hint, + decryption_properties, + file_metadata_cache, + ) + .await?; statistics_from_parquet_meta_calc(&metadata, table_schema) } diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 0b4e862403837..ad59e7261cba3 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -34,7 +34,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use file_format::*; pub use metrics::ParquetFileMetrics; pub use page_filter::PagePruningAccessPlanFilter; -pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; +pub use reader::*; // Expose so downstream crates can use it pub use row_filter::build_row_filter; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use row_group_filter::RowGroupAccessPlanFilter; diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 648ed7c0bc6b4..df375818689ca 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -18,7 +18,7 @@ //! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for //! low level control of parquet file readers -use crate::ParquetFileMetrics; +use crate::{fetch_parquet_metadata, ParquetFileMetrics}; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; @@ -28,7 +28,7 @@ use futures::FutureExt; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::metadata::ParquetMetaData; use std::any::Any; use std::fmt::Debug; use std::ops::Range; @@ -91,7 +91,7 @@ impl DefaultParquetFileReaderFactory { /// This implementation does not coalesce I/O operations or cache bytes. Such /// optimizations can be done either at the object store level or by providing a /// custom implementation of [`ParquetFileReaderFactory`]. -pub(crate) struct ParquetFileReader { +pub struct ParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, } @@ -212,7 +212,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { /// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata /// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then /// updates the cache. -pub(crate) struct CachedParquetFileReader { +pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, file_meta: FileMeta, @@ -249,40 +249,44 @@ impl AsyncFileReader for CachedParquetFileReader { let metadata_cache = Arc::clone(&self.metadata_cache); async move { - let object_meta = &file_meta.object_meta; + #[cfg(feature = "parquet_encryption")] + let file_decryption_properties = + options.and_then(|o| o.file_decryption_properties()); - // lookup if the metadata is already cached - if let Some(metadata) = metadata_cache.get(object_meta) { - if let Some(parquet_metadata) = - metadata.as_any().downcast_ref::() - { - return Ok(Arc::clone(&parquet_metadata.0)); - } - } + #[cfg(not(feature = "parquet_encryption"))] + let file_decryption_properties = None; - let mut reader = ParquetMetaDataReader::new(); - // the page index can only be loaded with unencrypted files - if let Some(file_decryption_properties) = - options.and_then(|o| o.file_decryption_properties()) - { - reader = - reader.with_decryption_properties(Some(file_decryption_properties)); - } else { - reader = reader.with_page_indexes(true); - } - reader.try_load(&mut self.inner, object_meta.size).await?; - let metadata = Arc::new(reader.finish()?); - let cached_metadata = Arc::new(CachedParquetMetaData(Arc::clone(&metadata))); - - metadata_cache.put(object_meta, cached_metadata); - Ok(metadata) + fetch_parquet_metadata( + &mut self.inner, + &file_meta.object_meta, + None, + file_decryption_properties, + Some(metadata_cache), + ) + .await + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch metadata for file {}: {e}", + file_meta.object_meta.location, + )) + }) } .boxed() } } /// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. -struct CachedParquetMetaData(Arc); +pub struct CachedParquetMetaData(Arc); + +impl CachedParquetMetaData { + pub fn new(metadata: Arc) -> Self { + Self(metadata) + } + + pub fn parquet_metadata(&self) -> &Arc { + &self.0 + } +} impl FileMetadata for CachedParquetMetaData { fn as_any(&self) -> &dyn Any {