Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Reading and Writing Extension FileTypes #8667

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion datafusion/common/src/file_options/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
//! File type abstraction

use crate::error::{DataFusionError, Result};
use crate::parsers::CompressionTypeVariant;

use core::fmt;
use std::fmt::Display;
use std::hash::Hasher;
use std::str::FromStr;

/// The default file extension of arrow files
Expand All @@ -41,7 +43,8 @@ pub trait GetExt {
}

/// Readable file type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[allow(clippy::derived_hash_with_manual_eq)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the manual and auto traits should be compatible, so silencing this is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably also manually implement Hash pretty easily as well, but I agree it isn't a bad thing

#[derive(Debug, Clone, Hash)]
pub enum FileType {
/// Apache Arrow file
ARROW,
Expand All @@ -54,8 +57,65 @@ pub enum FileType {
CSV,
/// JSON file
JSON,
/// FileType Implemented Outside of DataFusion
Extension(Box<dyn ExtensionFileType>),
}

/// A trait to enable externally implementing the functionality of a [FileType].
pub trait ExtensionFileType:
std::fmt::Debug + ExtensionFileTypeClone + Send + Sync
{
/// Returns the default file extension for this type, e.g. CSV would return ".csv".to_owned()
/// The default_extension is also used to uniquely identify a specific FileType::Extension variant,
/// so ensure this String is unique from any built in FileType and any other ExtensionFileTypes
/// defined.
fn default_extension(&self) -> String;

/// Returns the file extension when it is compressed with a given [CompressionTypeVariant]
fn extension_with_compression(
&self,
compression: CompressionTypeVariant,
) -> Result<String>;
}

pub trait ExtensionFileTypeClone {
fn clone_box(&self) -> Box<dyn ExtensionFileType>;
}

impl Clone for Box<dyn ExtensionFileType> {
fn clone(&self) -> Box<dyn ExtensionFileType> {
self.clone_box()
}
}

impl std::hash::Hash for Box<dyn ExtensionFileType> {
fn hash<H>(&self, state: &mut H)
where
H: Hasher,
{
self.default_extension().hash(state)
}
}

impl PartialEq for FileType {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(FileType::ARROW, FileType::ARROW) => true,
(FileType::AVRO, FileType::AVRO) => true,
#[cfg(feature = "parquet")]
(FileType::PARQUET, FileType::PARQUET) => true,
(FileType::CSV, FileType::CSV) => true,
(FileType::JSON, FileType::JSON) => true,
(FileType::Extension(ext_self), FileType::Extension(ext_other)) => {
ext_self.default_extension() == ext_other.default_extension()
}
_ => false,
}
}
}

impl Eq for FileType {}

impl GetExt for FileType {
fn get_ext(&self) -> String {
match self {
Expand All @@ -65,6 +125,7 @@ impl GetExt for FileType {
FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(),
FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(),
FileType::JSON => DEFAULT_JSON_EXTENSION.to_owned(),
FileType::Extension(ext) => ext.default_extension(),
}
}
}
Expand All @@ -78,6 +139,7 @@ impl Display for FileType {
FileType::PARQUET => "parquet",
FileType::AVRO => "avro",
FileType::ARROW => "arrow",
FileType::Extension(ext) => return ext.fmt(f),
};
write!(f, "{}", out)
}
Expand Down
11 changes: 10 additions & 1 deletion datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ pub enum FileTypeWriterOptions {
JSON(JsonWriterOptions),
Avro(AvroWriterOptions),
Arrow(ArrowWriterOptions),
/// For extension [FileType]s, FileTypeWriterOptions simply stores
/// any passed StatementOptions to be handled later by any custom
/// physical plans (Such as a FileFormat::create_writer_physical_plan)
Extension(Option<StatementOptions>),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileTypeWriterOptions does not handle any parsing or validation for externally defined FileTypes. It simply acts as a container for the options passed in.

}

impl FileTypeWriterOptions {
Expand Down Expand Up @@ -184,14 +188,17 @@ impl FileTypeWriterOptions {
FileType::ARROW => {
FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?)
}
FileType::Extension(_) => {
FileTypeWriterOptions::Extension(Some(statement_options.clone()))
}
};

Ok(file_type_write_options)
}

/// Constructs a FileTypeWriterOptions from session defaults only.
pub fn build_default(
file_type: &FileType,
file_type: &mut FileType,
config_defaults: &ConfigOptions,
) -> Result<Self> {
let empty_statement = StatementOptions::new(vec![]);
Expand All @@ -214,6 +221,7 @@ impl FileTypeWriterOptions {
FileType::ARROW => {
FileTypeWriterOptions::Arrow(ArrowWriterOptions::try_from(options)?)
}
FileType::Extension(_) => FileTypeWriterOptions::Extension(None),
};

Ok(file_type_write_options)
Expand Down Expand Up @@ -290,6 +298,7 @@ impl Display for FileTypeWriterOptions {
FileTypeWriterOptions::JSON(_) => "JsonWriterOptions",
#[cfg(feature = "parquet")]
FileTypeWriterOptions::Parquet(_) => "ParquetWriterOptions",
FileTypeWriterOptions::Extension(_) => "ExensionWriterOptions",
};
write!(f, "{}", name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl FileTypeExt for FileType {
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
)),
},
FileType::Extension(ext) => ext.extension_with_compression(c.variant),
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ impl ListingTableConfig {
),
#[cfg(feature = "parquet")]
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::Extension(_) => {
unreachable!("FileType::from_str cannot return Extension variant!")
}
};

Ok((file_format, ext))
Expand Down Expand Up @@ -768,7 +771,7 @@ impl TableProvider for ListingTable {
let file_type_writer_options = match &self.options().file_type_write_options {
Some(opt) => opt.clone(),
None => FileTypeWriterOptions::build_default(
&file_format.file_type(),
&mut file_format.file_type(),
state.config_options(),
)?,
};
Expand Down Expand Up @@ -1716,6 +1719,9 @@ mod tests {
)
.await?;
}
FileType::Extension(_) => {
panic!("Extension file type not implemented in write path.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead return an NotImplement exception?

}
}

// Create and register the source table with the provided schema and inserted data
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ impl TableProviderFactory for ListingTableFactory {
JsonFormat::default().with_file_compression_type(file_compression_type),
),
FileType::ARROW => Arc::new(ArrowFormat),
FileType::Extension(_) => {
unreachable!("FileType::from_str cannot return Extension variant!")
}
};

let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
Expand Down Expand Up @@ -181,6 +184,9 @@ impl TableProviderFactory for ListingTableFactory {
FileType::PARQUET => file_type_writer_options,
FileType::ARROW => file_type_writer_options,
FileType::AVRO => file_type_writer_options,
FileType::Extension(_) => {
unreachable!("FileType::from_str cannot return Extension variant!")
}
};

let table_path = ListingTableUrl::parse(&cmd.location)?;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ pub struct FileSinkConfig {
pub single_file_output: bool,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
/// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size
/// Contains settings specific to writing a given FileType, e.g. parquet max_row_group_size.
/// Note that for externally defined FileTypes, FileTypeWriterOptions contains arbitrary
/// config tuples that must be handled within the physical plan.
pub file_type_writer_options: FileTypeWriterOptions,
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ impl DefaultPhysicalPlanner {
FileType::JSON => Arc::new(JsonFormat::default()),
FileType::AVRO => Arc::new(AvroFormat {} ),
FileType::ARROW => Arc::new(ArrowFormat {}),
FileType::Extension(_ext) => return not_impl_err!("Extension FileTypes not supported in Copy To statements.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The built in Copy plans can't support external types (since we can't initialize them in DataFusion), but it should be not too difficult to build a custom COPY PhysicalPlan that uses a custom FileFormat, but otherwise is the same and relies on the build in Copy logical plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could have the FileTypeExtension have a method that returned Arc<dyn FileFormat> ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this (as well as combining ExtensionFileType trait into FileFormat), but that would require adding lots of dependencies between the DataFusion crates. FileType is in common and used in most of DataFusion while FileFormat is only in core.

This could work but I think the consequence would be DataFusion becomes one big crate. I think the only way to keep the independent crates is to have at least two File related abstractions. One simple one for logical planning in Common and one heavy duty one for Execution plans in core.

Even linking them is a challenge because then common depends on core 🤔

};

sink_format.create_writer_physical_plan(input_exec, session_state, config, None).await
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,9 @@ impl TryFrom<&FileTypeWriterOptions> for protobuf::FileTypeWriterOptions {
FileTypeWriterOptions::Arrow(ArrowWriterOptions {}) => {
return not_impl_err!("Arrow file sink protobuf serialization")
}
FileTypeWriterOptions::Extension(_) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to support serializing this since it is ultimately just a Vec<(String, String)>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also punt it into the FileType::Extension (e.g. add a serialize(&self) -> Result<Vec<u8>> type method)

return not_impl_err!("Extension file sink protobuf serialization")
}
};
Ok(Self {
file_type: Some(file_type),
Expand Down
Loading