Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(9493): provide access to FileMetaData for files written with ParquetSink #9548

Merged
merged 8 commits into from
Mar 12, 2024
251 changes: 228 additions & 23 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

use arrow_array::RecordBatch;
use async_trait::async_trait;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_datafusion_err, stats::Precision};
use datafusion_physical_plan::metrics::MetricsSet;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
};
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
Expand Down Expand Up @@ -541,6 +542,8 @@ async fn fetch_statistics(
pub struct ParquetSink {
/// Config options for writing data
config: FileSinkConfig,
/// File metadata from successfully produced parquet files.
wiedld marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// File metadata from successfully produced parquet files.
/// File metadata from successfully produced parquet files. The Mutex is only used to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.

The use of a Mutex here is confusing without the context of this PR, so I think it would be a good idea to leave a comment explaining.

I think this is a fine temporary workaround, but I'm sure we can find a way to return FileMetaData in a new public interface without breaking changes to DataSink or using locks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely agreed.

written: Arc<parking_lot::Mutex<HashMap<Path, FileMetaData>>>,
}

impl Debug for ParquetSink {
Expand All @@ -564,13 +567,23 @@ impl DisplayAs for ParquetSink {
impl ParquetSink {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
Self {
config,
written: Default::default(),
}
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}

/// Retrieve the file metadata for the written files, keyed to the path
/// which may be partitioned (in the case of hive style partitioning).
pub fn written(&self) -> HashMap<Path, FileMetaData> {
self.written.lock().clone()
}

/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
Expand Down Expand Up @@ -668,8 +681,10 @@ impl DataSink for ParquetSink {
"parquet".into(),
);

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
JoinSet::new();
let mut file_write_tasks: JoinSet<
std::result::Result<(Path, FileMetaData), DataFusionError>,
> = JoinSet::new();

while let Some((path, mut rx)) = file_stream_rx.recv().await {
if !allow_single_file_parallelism {
let mut writer = self
Expand All @@ -680,13 +695,14 @@ impl DataSink for ParquetSink {
)
.await?;
file_write_tasks.spawn(async move {
let mut row_count = 0;
while let Some(batch) = rx.recv().await {
row_count += batch.num_rows();
writer.write(&batch).await?;
}
writer.close().await?;
Ok(row_count)
let file_metadata = writer
.close()
.await
.map_err(DataFusionError::ParquetError)?;
Ok((path, file_metadata))
});
} else {
let writer = create_writer(
Expand All @@ -701,14 +717,15 @@ impl DataSink for ParquetSink {
let props = parquet_props.clone();
let parallel_options_clone = parallel_options.clone();
file_write_tasks.spawn(async move {
output_single_parquet_file_parallelized(
let file_metadata = output_single_parquet_file_parallelized(
writer,
rx,
schema,
&props,
parallel_options_clone,
)
.await
.await?;
Ok((path, file_metadata))
});
}
}
Expand All @@ -717,7 +734,13 @@ impl DataSink for ParquetSink {
while let Some(result) = file_write_tasks.join_next().await {
match result {
Ok(r) => {
row_count += r?;
let (path, file_metadata) = r?;
row_count += file_metadata.num_rows;
let mut written_files = self.written.lock();
written_files
.try_insert(path.clone(), file_metadata)
.map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
drop(written_files);
}
Err(e) => {
if e.is_panic() {
Expand Down Expand Up @@ -919,7 +942,7 @@ async fn concatenate_parallel_row_groups(
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> Result<usize> {
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
Expand All @@ -929,13 +952,10 @@ async fn concatenate_parallel_row_groups(
writer_props,
)?;

let mut row_count = 0;

while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, cnt) = result?;
row_count += cnt;
let (serialized_columns, _cnt) = result?;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
Expand All @@ -949,13 +969,13 @@ async fn concatenate_parallel_row_groups(
rg_out.close()?;
}

let inner_writer = parquet_writer.into_inner()?;
let final_buff = inner_writer.buffer.try_lock().unwrap();
let file_metadata = parquet_writer.close()?;
let final_buff = merged_buff.buffer.try_lock().unwrap();

object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;

Ok(row_count)
Ok(file_metadata)
}

/// Parallelizes the serialization of a single parquet file, by first serializing N
Expand All @@ -968,7 +988,7 @@ async fn output_single_parquet_file_parallelized(
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
parallel_options: ParallelParquetWriterOptions,
) -> Result<usize> {
) -> Result<FileMetaData> {
let max_rowgroups = parallel_options.max_parallel_row_groups;
// Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
let (serialize_tx, serialize_rx) =
Expand All @@ -982,7 +1002,7 @@ async fn output_single_parquet_file_parallelized(
arc_props.clone(),
parallel_options,
);
let row_count = concatenate_parallel_row_groups(
let file_metadata = concatenate_parallel_row_groups(
serialize_rx,
output_schema.clone(),
arc_props.clone(),
Expand All @@ -991,7 +1011,7 @@ async fn output_single_parquet_file_parallelized(
.await?;

launch_serialization_task.join_unwind().await?;
Ok(row_count)
Ok(file_metadata)
}

#[cfg(test)]
Expand Down Expand Up @@ -1077,6 +1097,7 @@ pub(crate) mod test_util {
#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
use crate::physical_plan::collect;
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -1088,13 +1109,19 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_schema::Field;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
as_int32_array, as_timestamp_nanosecond_array,
};
use datafusion_common::ScalarValue;
use datafusion_common::config::ParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::{FileTypeWriterOptions, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::BoxStream;
use futures::StreamExt;
use log::error;
Expand Down Expand Up @@ -1789,4 +1816,182 @@ mod tests {
let format = ParquetFormat::default();
scan_format(state, &format, &testdata, file_name, projection, limit).await
}

fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let local = Arc::new(
LocalFileSystem::new_with_prefix(&tmp_dir)
.expect("should create object store"),
);

let mut session = SessionConfig::default();
let mut parquet_opts = ParquetOptions::default();
parquet_opts.allow_single_file_parallelism = true;
session.options_mut().execution.parquet = parquet_opts;

let runtime = RuntimeEnv::default();
runtime
.object_store_registry
.register_store(store_url, local);

Arc::new(
TaskContext::default()
.with_session_config(session)
.with_runtime(Arc::new(runtime)),
)
}

#[tokio::test]
async fn parquet_sink_write() -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
file_type_writer_options: FileTypeWriterOptions::Parquet(
ParquetWriterOptions::new(WriterProperties::default()),
),
};
let parquet_sink = Arc::new(ParquetSink::new(file_sink_config));

// create data
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();

// write stream
parquet_sink
.write_all(
Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(vec![Ok(batch)]),
)),
&build_ctx(object_store_url.as_ref()),
)
.await
.unwrap();

// assert written
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
written.len(),
1,
"expected a single parquet files to be written, instead found {}",
written.len()
);

// check the file metadata
for (
path,
FileMetaData {
num_rows, schema, ..
},
wiedld marked this conversation as resolved.
Show resolved Hide resolved
) in written.take(1)
{
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 1, "should not have path prefix");

assert_eq!(num_rows, 2, "file metdata to have 2 rows");
assert!(
schema.iter().any(|col_schema| col_schema.name == "a"),
"output file metadata should contain col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
"output file metadata should contain col b"
);
}

Ok(())
}

#[tokio::test]
async fn parquet_sink_write_partitions() -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let object_store_url = ObjectStoreUrl::local_filesystem();

// set file config to include partitioning on field_a
let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
overwrite: true,
file_type_writer_options: FileTypeWriterOptions::Parquet(
ParquetWriterOptions::new(WriterProperties::default()),
),
};
let parquet_sink = Arc::new(ParquetSink::new(file_sink_config));

// create data with 2 partitions
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();

// write stream
parquet_sink
.write_all(
Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(vec![Ok(batch)]),
)),
&build_ctx(object_store_url.as_ref()),
)
.await
.unwrap();

// assert written
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
written.len(),
2,
"expected two parquet files to be written, instead found {}",
written.len()
);

// check the file metadata includes partitions
let mut expected_partitions = std::collections::HashSet::from(["a=foo", "a=bar"]);
for (
path,
FileMetaData {
num_rows, schema, ..
},
) in written.take(2)
{
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 2, "should have path prefix");

let prefix = path_parts[0].as_ref();
assert!(
expected_partitions.contains(prefix),
"expected path prefix to match partition, instead found {:?}",
prefix
);
expected_partitions.remove(prefix);

assert_eq!(num_rows, 1, "file metdata to have 1 row");
assert!(
!schema.iter().any(|col_schema| col_schema.name == "a"),
"output file metadata will not contain partitioned col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
"output file metadata should contain col b"
);
}

Ok(())
}
}
Loading