diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index ac7c39bbdb94..f0328098b406 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -15,66 +15,55 @@ // specific language governing permissions and limitations // under the License. -//! Execution plan for reading Parquet files +//! [`ParquetExec`] Execution plan for reading Parquet files use std::any::Any; use std::fmt::Debug; -use std::ops::Range; use std::sync::Arc; use crate::datasource::listing::PartitionedFile; -use crate::datasource::physical_plan::file_stream::{ - FileOpenFuture, FileOpener, FileStream, -}; +use crate::datasource::physical_plan::file_stream::FileStream; use crate::datasource::physical_plan::{ parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner, - FileMeta, FileScanConfig, + FileScanConfig, }; use crate::{ config::{ConfigOptions, TableParquetOptions}, - datasource::listing::ListingTableUrl, - error::{DataFusionError, Result}, + error::Result, execution::context::TaskContext, physical_optimizer::pruning::PruningPredicate, physical_plan::{ metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }, }; use arrow::datatypes::{DataType, SchemaRef}; -use arrow::error::ArrowError; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; -use object_store::buffered::BufWriter; -use object_store::path::Path; -use object_store::ObjectStore; -use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; -use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; -use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; -use tokio::task::JoinSet; mod metrics; +mod opener; mod page_filter; +mod reader; mod row_filter; mod row_groups; mod statistics; +mod writer; -use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; pub use metrics::ParquetFileMetrics; +use opener::ParquetOpener; +pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; pub use statistics::{RequestedStatistics, StatisticsConverter}; +pub use writer::plan_to_parquet; /// Execution plan for reading one or more Parquet files. /// @@ -201,7 +190,7 @@ pub struct ParquetExec { schema_adapter_factory: Option>, } -/// [`ParquetExecBuilder`]`, builder for [`ParquetExec`]. +/// [`ParquetExecBuilder`], builder for [`ParquetExec`]. /// /// See example on [`ParquetExec`]. pub struct ParquetExecBuilder { @@ -279,7 +268,9 @@ impl ParquetExecBuilder { /// instance using individual I/O operations for the footer and each page. /// /// If a custom `ParquetFileReaderFactory` is provided, then data access - /// operations will be routed to this factory instead of `ObjectStore`. + /// operations will be routed to this factory instead of [`ObjectStore`]. + /// + /// [`ObjectStore`]: object_store::ObjectStore pub fn with_parquet_file_reader_factory( mut self, parquet_file_reader_factory: Arc, @@ -698,175 +689,6 @@ impl ExecutionPlan for ParquetExec { } } -/// Implements [`FileOpener`] for a parquet file -struct ParquetOpener { - partition_index: usize, - projection: Arc<[usize]>, - batch_size: usize, - limit: Option, - predicate: Option>, - pruning_predicate: Option>, - page_pruning_predicate: Option>, - table_schema: SchemaRef, - metadata_size_hint: Option, - metrics: ExecutionPlanMetricsSet, - parquet_file_reader_factory: Arc, - pushdown_filters: bool, - reorder_filters: bool, - enable_page_index: bool, - enable_bloom_filter: bool, - schema_adapter_factory: Arc, -} - -impl FileOpener for ParquetOpener { - fn open(&self, file_meta: FileMeta) -> Result { - let file_range = file_meta.range.clone(); - let file_metrics = ParquetFileMetrics::new( - self.partition_index, - file_meta.location().as_ref(), - &self.metrics, - ); - - let reader: Box = - self.parquet_file_reader_factory.create_reader( - self.partition_index, - file_meta, - self.metadata_size_hint, - &self.metrics, - )?; - - let batch_size = self.batch_size; - let projection = self.projection.clone(); - let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); - let schema_adapter = self.schema_adapter_factory.create(projected_schema); - let predicate = self.predicate.clone(); - let pruning_predicate = self.pruning_predicate.clone(); - let page_pruning_predicate = self.page_pruning_predicate.clone(); - let table_schema = self.table_schema.clone(); - let reorder_predicates = self.reorder_filters; - let pushdown_filters = self.pushdown_filters; - let enable_page_index = should_enable_page_index( - self.enable_page_index, - &self.page_pruning_predicate, - ); - let enable_bloom_filter = self.enable_bloom_filter; - let limit = self.limit; - - Ok(Box::pin(async move { - let options = ArrowReaderOptions::new().with_page_index(enable_page_index); - let mut builder = - ParquetRecordBatchStreamBuilder::new_with_options(reader, options) - .await?; - - let file_schema = builder.schema().clone(); - - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&file_schema)?; - // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?; - - let mask = ProjectionMask::roots( - builder.parquet_schema(), - adapted_projections.iter().cloned(), - ); - - // Filter pushdown: evaluate predicates during scan - if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &file_schema, - &table_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); - - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{:?}': {}", - predicate, e - ); - } - }; - }; - - // Determine which row groups to actually read. The idea is to skip - // as many row groups as possible based on the metadata and query - let file_metadata = builder.metadata().clone(); - let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); - let rg_metadata = file_metadata.row_groups(); - // track which row groups to actually read - let mut row_groups = RowGroupSet::new(rg_metadata.len()); - // if there is a range restricting what parts of the file to read - if let Some(range) = file_range.as_ref() { - row_groups.prune_by_range(rg_metadata, range); - } - // If there is a predicate that can be evaluated against the metadata - if let Some(predicate) = predicate.as_ref() { - row_groups.prune_by_statistics( - &file_schema, - builder.parquet_schema(), - rg_metadata, - predicate, - &file_metrics, - ); - - if enable_bloom_filter && !row_groups.is_empty() { - row_groups - .prune_by_bloom_filters( - &file_schema, - &mut builder, - predicate, - &file_metrics, - ) - .await; - } - } - - // page index pruning: if all data on individual pages can - // be ruled using page metadata, rows from other columns - // with that range can be skipped as well - if enable_page_index && !row_groups.is_empty() { - if let Some(p) = page_pruning_predicate { - let pruned = p.prune( - &file_schema, - builder.parquet_schema(), - &row_groups, - file_metadata.as_ref(), - &file_metrics, - )?; - if let Some(row_selection) = pruned { - builder = builder.with_row_selection(row_selection); - } - } - } - - if let Some(limit) = limit { - builder = builder.with_limit(limit) - } - - let stream = builder - .with_projection(mask) - .with_batch_size(batch_size) - .with_row_groups(row_groups.indexes()) - .build()?; - - let adapted = stream - .map_err(|e| ArrowError::ExternalError(Box::new(e))) - .map(move |maybe_batch| { - maybe_batch - .and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) - }); - - Ok(adapted.boxed()) - })) - } -} - fn should_enable_page_index( enable_page_index: bool, page_pruning_predicate: &Option>, @@ -879,168 +701,6 @@ fn should_enable_page_index( .unwrap_or(false) } -/// Interface for reading parquet files. -/// -/// The combined implementations of [`ParquetFileReaderFactory`] and -/// [`AsyncFileReader`] can be used to provide custom data access operations -/// such as pre-cached data, I/O coalescing, etc. -/// -/// See [`DefaultParquetFileReaderFactory`] for a simple implementation. -pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { - /// Provides an `AsyncFileReader` for reading data from a parquet file specified - /// - /// # Arguments - /// * partition_index - Index of the partition (for reporting metrics) - /// * file_meta - The file to be read - /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer - /// * metrics - Execution metrics - fn create_reader( - &self, - partition_index: usize, - file_meta: FileMeta, - metadata_size_hint: Option, - metrics: &ExecutionPlanMetricsSet, - ) -> Result>; -} - -/// Default implementation of [`ParquetFileReaderFactory`] -/// -/// This implementation: -/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance. -/// 2. Reads the footer and page metadata on demand. -/// 3. Does not cache metadata or coalesce I/O operations. -#[derive(Debug)] -pub struct DefaultParquetFileReaderFactory { - store: Arc, -} - -impl DefaultParquetFileReaderFactory { - /// Create a new `DefaultParquetFileReaderFactory`. - pub fn new(store: Arc) -> Self { - Self { store } - } -} - -/// Implements [`AsyncFileReader`] for a parquet file in object storage. -/// -/// This implementation uses the [`ParquetObjectReader`] to read data from the -/// object store on demand, as required, tracking the number of bytes read. -/// -/// This implementation does not coalesce I/O operations or cache bytes. Such -/// optimizations can be done either at the object store level or by providing a -/// custom implementation of [`ParquetFileReaderFactory`]. -pub(crate) struct ParquetFileReader { - file_metrics: ParquetFileMetrics, - inner: ParquetObjectReader, -} - -impl AsyncFileReader for ParquetFileReader { - fn get_bytes( - &mut self, - range: Range, - ) -> BoxFuture<'_, parquet::errors::Result> { - self.file_metrics.bytes_scanned.add(range.end - range.start); - self.inner.get_bytes(range) - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> - where - Self: Send, - { - let total = ranges.iter().map(|r| r.end - r.start).sum(); - self.file_metrics.bytes_scanned.add(total); - self.inner.get_byte_ranges(ranges) - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - self.inner.get_metadata() - } -} - -impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { - fn create_reader( - &self, - partition_index: usize, - file_meta: FileMeta, - metadata_size_hint: Option, - metrics: &ExecutionPlanMetricsSet, - ) -> Result> { - let file_metrics = ParquetFileMetrics::new( - partition_index, - file_meta.location().as_ref(), - metrics, - ); - let store = Arc::clone(&self.store); - let mut inner = ParquetObjectReader::new(store, file_meta.object_meta); - - if let Some(hint) = metadata_size_hint { - inner = inner.with_footer_size_hint(hint) - }; - - Ok(Box::new(ParquetFileReader { - inner, - file_metrics, - })) - } -} - -/// Executes a query and writes the results to a partitioned Parquet file. -pub async fn plan_to_parquet( - task_ctx: Arc, - plan: Arc, - path: impl AsRef, - writer_properties: Option, -) -> Result<()> { - let path = path.as_ref(); - let parsed = ListingTableUrl::parse(path)?; - let object_store_url = parsed.object_store(); - let store = task_ctx.runtime_env().object_store(&object_store_url)?; - let mut join_set = JoinSet::new(); - for i in 0..plan.output_partitioning().partition_count() { - let plan: Arc = plan.clone(); - let filename = format!("{}/part-{i}.parquet", parsed.prefix()); - let file = Path::parse(filename)?; - let propclone = writer_properties.clone(); - - let storeref = store.clone(); - let buf_writer = BufWriter::new(storeref, file.clone()); - let mut stream = plan.execute(i, task_ctx.clone())?; - join_set.spawn(async move { - let mut writer = - AsyncArrowWriter::try_new(buf_writer, plan.schema(), propclone)?; - while let Some(next_batch) = stream.next().await { - let batch = next_batch?; - writer.write(&batch).await?; - } - writer - .close() - .await - .map_err(DataFusionError::from) - .map(|_| ()) - }); - } - - while let Some(result) = join_set.join_next().await { - match result { - Ok(res) => res?, - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } - } - - Ok(()) -} - // Convert parquet column schema to arrow data type, and just consider the // decimal data type. pub(crate) fn parquet_to_arrow_decimal_type( @@ -1098,9 +758,13 @@ mod tests { use datafusion_physical_expr::create_physical_expr; use chrono::{TimeZone, Utc}; + use datafusion_physical_plan::ExecutionPlanProperties; + use futures::StreamExt; use object_store::local::LocalFileSystem; + use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; + use parquet::file::properties::WriterProperties; use tempfile::TempDir; use url::Url; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs new file mode 100644 index 000000000000..3aec1e1d2037 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ParquetOpener`] for opening Parquet files + +use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate; +use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet; +use crate::datasource::physical_plan::parquet::{row_filter, should_enable_page_index}; +use crate::datasource::physical_plan::{ + FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory, +}; +use crate::datasource::schema_adapter::SchemaAdapterFactory; +use crate::physical_optimizer::pruning::PruningPredicate; +use arrow_schema::{ArrowError, SchemaRef}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use futures::{StreamExt, TryStreamExt}; +use log::debug; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use std::sync::Arc; + +/// Implements [`FileOpener`] for a parquet file +pub(super) struct ParquetOpener { + pub partition_index: usize, + pub projection: Arc<[usize]>, + pub batch_size: usize, + pub limit: Option, + pub predicate: Option>, + pub pruning_predicate: Option>, + pub page_pruning_predicate: Option>, + pub table_schema: SchemaRef, + pub metadata_size_hint: Option, + pub metrics: ExecutionPlanMetricsSet, + pub parquet_file_reader_factory: Arc, + pub pushdown_filters: bool, + pub reorder_filters: bool, + pub enable_page_index: bool, + pub enable_bloom_filter: bool, + pub schema_adapter_factory: Arc, +} + +impl FileOpener for ParquetOpener { + fn open(&self, file_meta: FileMeta) -> datafusion_common::Result { + let file_range = file_meta.range.clone(); + let file_metrics = ParquetFileMetrics::new( + self.partition_index, + file_meta.location().as_ref(), + &self.metrics, + ); + + let reader: Box = + self.parquet_file_reader_factory.create_reader( + self.partition_index, + file_meta, + self.metadata_size_hint, + &self.metrics, + )?; + + let batch_size = self.batch_size; + let projection = self.projection.clone(); + let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?); + let schema_adapter = self.schema_adapter_factory.create(projected_schema); + let predicate = self.predicate.clone(); + let pruning_predicate = self.pruning_predicate.clone(); + let page_pruning_predicate = self.page_pruning_predicate.clone(); + let table_schema = self.table_schema.clone(); + let reorder_predicates = self.reorder_filters; + let pushdown_filters = self.pushdown_filters; + let enable_page_index = should_enable_page_index( + self.enable_page_index, + &self.page_pruning_predicate, + ); + let enable_bloom_filter = self.enable_bloom_filter; + let limit = self.limit; + + Ok(Box::pin(async move { + let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + let mut builder = + ParquetRecordBatchStreamBuilder::new_with_options(reader, options) + .await?; + + let file_schema = builder.schema().clone(); + + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(&file_schema)?; + + let mask = ProjectionMask::roots( + builder.parquet_schema(), + adapted_projections.iter().cloned(), + ); + + // Filter pushdown: evaluate predicates during scan + if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { + let row_filter = row_filter::build_row_filter( + &predicate, + &file_schema, + &table_schema, + builder.metadata(), + reorder_predicates, + &file_metrics, + ); + + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!( + "Ignoring error building row filter for '{:?}': {}", + predicate, e + ); + } + }; + }; + + // Determine which row groups to actually read. The idea is to skip + // as many row groups as possible based on the metadata and query + let file_metadata = builder.metadata().clone(); + let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); + let rg_metadata = file_metadata.row_groups(); + // track which row groups to actually read + let mut row_groups = RowGroupSet::new(rg_metadata.len()); + // if there is a range restricting what parts of the file to read + if let Some(range) = file_range.as_ref() { + row_groups.prune_by_range(rg_metadata, range); + } + // If there is a predicate that can be evaluated against the metadata + if let Some(predicate) = predicate.as_ref() { + row_groups.prune_by_statistics( + &file_schema, + builder.parquet_schema(), + rg_metadata, + predicate, + &file_metrics, + ); + + if enable_bloom_filter && !row_groups.is_empty() { + row_groups + .prune_by_bloom_filters( + &file_schema, + &mut builder, + predicate, + &file_metrics, + ) + .await; + } + } + + // page index pruning: if all data on individual pages can + // be ruled using page metadata, rows from other columns + // with that range can be skipped as well + if enable_page_index && !row_groups.is_empty() { + if let Some(p) = page_pruning_predicate { + let pruned = p.prune( + &file_schema, + builder.parquet_schema(), + &row_groups, + file_metadata.as_ref(), + &file_metrics, + )?; + if let Some(row_selection) = pruned { + builder = builder.with_row_selection(row_selection); + } + } + } + + if let Some(limit) = limit { + builder = builder.with_limit(limit) + } + + let stream = builder + .with_projection(mask) + .with_batch_size(batch_size) + .with_row_groups(row_groups.indexes()) + .build()?; + + let adapted = stream + .map_err(|e| ArrowError::ExternalError(Box::new(e))) + .map(move |maybe_batch| { + maybe_batch + .and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) + }); + + Ok(adapted.boxed()) + })) + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader.rs b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs new file mode 100644 index 000000000000..265fb9d570cc --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/reader.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for +//! creating parquet file readers + +use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics}; +use bytes::Bytes; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use futures::future::BoxFuture; +use object_store::ObjectStore; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; +use parquet::file::metadata::ParquetMetaData; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::Arc; + +/// Interface for reading parquet files. +/// +/// The combined implementations of [`ParquetFileReaderFactory`] and +/// [`AsyncFileReader`] can be used to provide custom data access operations +/// such as pre-cached data, I/O coalescing, etc. +/// +/// See [`DefaultParquetFileReaderFactory`] for a simple implementation. +pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { + /// Provides an `AsyncFileReader` for reading data from a parquet file specified + /// + /// # Arguments + /// * partition_index - Index of the partition (for reporting metrics) + /// * file_meta - The file to be read + /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer + /// * metrics - Execution metrics + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result>; +} + +/// Default implementation of [`ParquetFileReaderFactory`] +/// +/// This implementation: +/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance. +/// 2. Reads the footer and page metadata on demand. +/// 3. Does not cache metadata or coalesce I/O operations. +#[derive(Debug)] +pub struct DefaultParquetFileReaderFactory { + store: Arc, +} + +impl DefaultParquetFileReaderFactory { + /// Create a new `DefaultParquetFileReaderFactory`. + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +/// Implements [`AsyncFileReader`] for a parquet file in object storage. +/// +/// This implementation uses the [`ParquetObjectReader`] to read data from the +/// object store on demand, as required, tracking the number of bytes read. +/// +/// This implementation does not coalesce I/O operations or cache bytes. Such +/// optimizations can be done either at the object store level or by providing a +/// custom implementation of [`ParquetFileReaderFactory`]. +pub(crate) struct ParquetFileReader { + pub file_metrics: ParquetFileMetrics, + pub inner: ParquetObjectReader, +} + +impl AsyncFileReader for ParquetFileReader { + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.file_metrics.bytes_scanned.add(range.end - range.start); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + let total = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + self.inner.get_metadata() + } +} + +impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + let mut inner = ParquetObjectReader::new(store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(ParquetFileReader { + inner, + file_metrics, + })) + } +} diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 0a0ca4369d27..7dd91d3d4e4b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -417,7 +417,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { #[cfg(test)] mod tests { use super::*; - use crate::datasource::physical_plan::parquet::ParquetFileReader; + use crate::datasource::physical_plan::parquet::reader::ParquetFileReader; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use arrow::datatypes::DataType::Decimal128; use arrow::datatypes::{DataType, Field}; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/writer.rs b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs new file mode 100644 index 000000000000..0c0c54691068 --- /dev/null +++ b/datafusion/core/src/datasource/physical_plan/parquet/writer.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::datasource::listing::ListingTableUrl; +use datafusion_common::DataFusionError; +use datafusion_execution::TaskContext; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use futures::StreamExt; +use object_store::buffered::BufWriter; +use object_store::path::Path; +use parquet::arrow::AsyncArrowWriter; +use parquet::file::properties::WriterProperties; +use std::sync::Arc; +use tokio::task::JoinSet; + +/// Executes a query and writes the results to a partitioned Parquet file. +pub async fn plan_to_parquet( + task_ctx: Arc, + plan: Arc, + path: impl AsRef, + writer_properties: Option, +) -> datafusion_common::Result<()> { + let path = path.as_ref(); + let parsed = ListingTableUrl::parse(path)?; + let object_store_url = parsed.object_store(); + let store = task_ctx.runtime_env().object_store(&object_store_url)?; + let mut join_set = JoinSet::new(); + for i in 0..plan.output_partitioning().partition_count() { + let plan: Arc = plan.clone(); + let filename = format!("{}/part-{i}.parquet", parsed.prefix()); + let file = Path::parse(filename)?; + let propclone = writer_properties.clone(); + + let storeref = store.clone(); + let buf_writer = BufWriter::new(storeref, file.clone()); + let mut stream = plan.execute(i, task_ctx.clone())?; + join_set.spawn(async move { + let mut writer = + AsyncArrowWriter::try_new(buf_writer, plan.schema(), propclone)?; + while let Some(next_batch) = stream.next().await { + let batch = next_batch?; + writer.write(&batch).await?; + } + writer + .close() + .await + .map_err(DataFusionError::from) + .map(|_| ()) + }); + } + + while let Some(result) = join_set.join_next().await { + match result { + Ok(res) => res?, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + } + + Ok(()) +}