Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support COPY TO Externally Defined File Formats, add FileType trait #11060

Merged
merged 23 commits into from
Jun 28, 2024

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Jun 22, 2024

Which issue does this PR close?

Closes #8345
Revives #8667

Rationale for this change

Previously, COPY TO queries could only be planned/executed for a preset enum of FileTypes. CREATE EXTERNAL TABLE supports arbitrary file formats via the FileFormat and TableProvider traits. This PR removes the FileType enum in favor of a FileType trait and updates COPY TO plans to work with any externally implemented dyn FileFormat.

This PR also enables future work to remove csv, parquet, json, ect... formats into dedicated crates (datafusion-csv, ...).

What changes are included in this PR?

  • Deletes the FileType enum and replaces with a trait
  • Creates a wrapper struct for FileFormat that implements FileType to enable conversions between the two
  • Extends LogicalExtensionCodec to allow serde of dyn FileType

Are these changes tested?

Existing tests verify behavior for currently supported formats remains the same.

Are there any user-facing changes?

Yes, there are API changes relevant for developers using DataFusion.

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions core Core DataFusion crate labels Jun 22, 2024
@alamb
Copy link
Contributor

alamb commented Jun 22, 2024

This looks awesome -- thank you @devinjdangelo -- I plan to review this carefully, but probably not until tomorrow

Copy link
Contributor Author

@devinjdangelo devinjdangelo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @alamb @thinkharderdev I finally had some time to figure out how to make #8667 work.

There are a number of tasks to be done, preferably as follow on PRs given the size/complexity of the changes in this PR already:

  • end-to-end example of implementing a custom FileFormat and using COPY with it
  • Finishing LogicalExtensionCodec impls for csv, json, parquet, arrow, and avro FileFormat
  • Moving FileFormat impls and supporting code out of core and into dedicated crates
  • (maybe) Moving ListingTable to a dedicated crate

/// These file types have special built in behavior for configuration.
/// Use TableOptions::Extensions for configuring other file types.
#[derive(Debug, Clone)]
pub enum ConfigFileType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the last vestige of the old FileType enum. It can be completely ignored if using a custom format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 perhaps we should add some Trait to unify the handling of options for built in formats and custom formats 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially remove the TableOptions code and instead have each FileFormatFactory handle configuration. This is actually mostly the case in this PR already, but TableOptions is a common helper.

@@ -40,107 +37,10 @@ pub trait GetExt {
fn get_ext(&self) -> String;
}

/// Readable file type
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the primary change in the PR. Most other changes are just to recover the original behavior for these file types, but with the new trait.

/// The former trait is a superset of the latter trait, which includes execution time
/// relevant methods. [FileType] is only used in logical planning and only implements
/// the subset of methods required during logical planning.
pub struct DefaultFileFormat {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design pattern is copied from how the DefaultTableSource allows a TableProvider to be used during logical planning as a TableSource.

In other words, FileType is to FileFormat as TableSource is to TableProvider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the naming confusing -- I would have expected that DefaultFileFormat implemented FileFormatFactory or something

Maybe we could call this FileFormatSource ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to DefaultFileType.

#[derive(Debug)]
pub struct CsvLogicalExtensionCodec;

// TODO! This is a placeholder for now and needs to be implemented for real.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this comment that I have extended LogicalExtensionCodec and created placeholder impls, but have not actually implemented logic to serde all built in FileFormat yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will file follow on tickets

use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion_sql::planner::ContextProvider;

struct MockCsvType {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct could be pulled out of this test as an example of LogicalPlanning a COPY query with a custom FileType

/// File format options.
pub format_options: FormatOptions,
/// File type trait
pub file_type: Arc<dyn FileType>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using DataFusion core for execution plans, this FileType can be converted to a FileFormatFactory which will produce a FileFormat at runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason it can't be FileFormatFactory directly 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would introduce a dependency between datafusion-expr and datafusion-core. The FileType trait exists to avoid this dependency and so users only using datafusion for logical planning would not have to implement all of the physical execution related FileFormatmethods.

@alamb alamb added the api change Changes the API exposed to users of the crate label Jun 23, 2024
@alamb alamb changed the title Support COPY TO Externally Defined File Formats Support COPY TO Externally Defined File Formats, add FileType trait Jun 23, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- I had a good look at this PR, and I think it looks really nice

I did not quite get through the entire PR yet, but I wanted to provide some early feeback

I think it very much adheres to DataFusion's design goals of permiting extension at all points and makes it easier to support new file formats

One thing I didn't fully grok at first was whyn there was a need for FileFormatFactory -- like why not simply use FileFormat everywhere.

Then I realized that the reason was that the FileFormats have state, specifically default options and that the format itself then carries the specific options

pub struct JsonFormatFactory {
    /// default options
    options: Option<JsonOptions>,  
}

Perhaps we can document this rationale somewhere so others are not confused as well.

Suggestion: Add an example

I think something that would be awesome to add (as a follow on PR perhaps) is an example of creating a custom file format and read/write to it.

#11079

datafusion/core/src/datasource/file_format/mod.rs Outdated Show resolved Hide resolved
@@ -58,6 +74,15 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
fn get_ext(&self) -> String;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Is there any reason this has to return an owned String? Could it return &str ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think &str would be too restrictive, but we could avoid requiring a String allocation with a cow instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep it String for now then

fn get_ext_with_compression(
&self,
_file_compression_type: &FileCompressionType,
) -> Result<String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about owned String

/// The former trait is a superset of the latter trait, which includes execution time
/// relevant methods. [FileType] is only used in logical planning and only implements
/// the subset of methods required during logical planning.
pub struct DefaultFileFormat {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the naming confusing -- I would have expected that DefaultFileFormat implemented FileFormatFactory or something

Maybe we could call this FileFormatSource ?

}
}

impl FileType for DefaultFileFormat {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than this wrapper, I wonder if we could implement FileType for dyn FileFormatFactory

impl FileType for &dyn FileFormatFactory{ 
...
}

And avoid having to have this struct 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried briefly, but ran into various errors. I am not sure if it is possible with the traits defined in different crates to make this work without a wrapper. I think that is why DefaultTableSource exists as well.

@@ -237,6 +248,33 @@ impl SessionState {
function_factory: None,
};

#[cfg(feature = "parquet")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

datafusion/core/src/execution/session_state.rs Outdated Show resolved Hide resolved
@@ -786,32 +779,9 @@ impl DefaultPhysicalPlanner {
table_partition_cols,
overwrite: false,
};
let mut table_options = session_state.default_table_options();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great this code has been removed from the default planner

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @devinjdangelo -- I made it through this PR and I think it looks really nice

I do think it is worth considering how to simplify the traits (possibly unifying the configuration) and there look like there are a few todos, but otherwise this PR looks really nice to me

cc @ozankabak @berkaysynnada @metesynnada

/// These file types have special built in behavior for configuration.
/// Use TableOptions::Extensions for configuring other file types.
#[derive(Debug, Clone)]
pub enum ConfigFileType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 perhaps we should add some Trait to unify the handling of options for built in formats and custom formats 🤔

match value {
FileType::ARROW => FormatOptions::ARROW,
FileType::AVRO => FormatOptions::AVRO,
#[cfg(feature = "parquet")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is certainly nice to avoid many of these #[cfg(feature = "parquet")] that were sprinkled throughout the code

/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_file_format(&mut self, format: ConfigFileType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameter is called "format" but the struct is ConfigFileType

Maybe we should call this set_config_type? Or maybe it could just pass in a `Arc~?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the method name for now.

It might make sense for TableOptions to be broken up and its functionality moved into each implementation of FileFormatFactory. I have avoided any reference to FileFormat in datafusion-common, since that would add an indirect dependency on datafusion-core to datafusion-sql and other crates.

use bytes::Bytes;
use futures::StreamExt;

#[test]
fn get_ext_with_compression() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we port these tests instead of just removing them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I moved this functionality to a trait method on each FileFormatFactory. We could add a test for each format which is similar to this test.

/// File format options.
pub format_options: FormatOptions,
/// File type trait
pub file_type: Arc<dyn FileType>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason it can't be FileFormatFactory directly 🤔

#[derive(Debug)]
pub struct JsonLogicalExtensionCodec;

// TODO! This is a placeholder for now and needs to be implemented for real.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to complete the TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users depending on the ability to serialize COPY plans (e.g. ballista) will need this TODO to be completed before upgrading to any version of datafusion including this change.

I think it would be OK to cut an follow up ticket for this.

Copy link
Contributor Author

@devinjdangelo devinjdangelo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb for the thorough review as always! I have made a few small changes.

/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_file_format(&mut self, format: ConfigFileType) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the method name for now.

It might make sense for TableOptions to be broken up and its functionality moved into each implementation of FileFormatFactory. I have avoided any reference to FileFormat in datafusion-common, since that would add an indirect dependency on datafusion-core to datafusion-sql and other crates.

use bytes::Bytes;
use futures::StreamExt;

#[test]
fn get_ext_with_compression() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I moved this functionality to a trait method on each FileFormatFactory. We could add a test for each format which is similar to this test.

@@ -58,6 +74,15 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
fn get_ext(&self) -> String;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think &str would be too restrictive, but we could avoid requiring a String allocation with a cow instead.

/// The former trait is a superset of the latter trait, which includes execution time
/// relevant methods. [FileType] is only used in logical planning and only implements
/// the subset of methods required during logical planning.
pub struct DefaultFileFormat {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this to DefaultFileType.

}
}

impl FileType for DefaultFileFormat {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried briefly, but ran into various errors. I am not sure if it is possible with the traits defined in different crates to make this work without a wrapper. I think that is why DefaultTableSource exists as well.

/// File format options.
pub format_options: FormatOptions,
/// File type trait
pub file_type: Arc<dyn FileType>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would introduce a dependency between datafusion-expr and datafusion-core. The FileType trait exists to avoid this dependency and so users only using datafusion for logical planning would not have to implement all of the physical execution related FileFormatmethods.

#[derive(Debug)]
pub struct JsonLogicalExtensionCodec;

// TODO! This is a placeholder for now and needs to be implemented for real.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users depending on the ability to serialize COPY plans (e.g. ballista) will need this TODO to be completed before upgrading to any version of datafusion including this change.

I think it would be OK to cut an follow up ticket for this.

/// These file types have special built in behavior for configuration.
/// Use TableOptions::Extensions for configuring other file types.
#[derive(Debug, Clone)]
pub enum ConfigFileType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially remove the TableOptions code and instead have each FileFormatFactory handle configuration. This is actually mostly the case in this PR already, but TableOptions is a common helper.

@berkaysynnada
Copy link
Contributor

Thank you, @devinjdangelo, for proposing this extensibility feature. I would like to clarify a few points to ensure I understand correctly:

  • We cannot merge FileType and FileFormatFactory due to dependency issues.
  • We cannot merge FileFormatFactory and FileFormat because of stateful format factories.

If these points are correct, I am considering whether we could move these states (extension options, like CsvOptions) into FileType somehow to remove one of these layers. This is just a thought experiment; I don't want to block the PR, which I believe improves extensibility.

@devinjdangelo
Copy link
Contributor Author

devinjdangelo commented Jun 24, 2024

Thank you for taking a look @berkaysynnada

We cannot merge FileType and FileFormatFactory due to dependency issues.

That is correct. Logical planning cannot depend on FileFormatFactory or FileFormat to avoid a dependency on datafusion's execution engine during logical planning.

We cannot merge FileFormatFactory and FileFormat because of stateful format factories.

Yes, SessionState stores a HashMap<String, Arc<dyn FileFormatFactory>>. Each FileFormatFactory could be shared among multiple concurrent queries and must be able to produce many, differently configured, Arc<dyn FileFormat>s. If SessionState stored only a Arc<dyn FileFormat> it would have to have a &mut self method enabled by a Mutex<> and even with that it would possibly lead to unexpected behavior if multiple queries modified the format during execution.

@alamb
Copy link
Contributor

alamb commented Jun 24, 2024

Yes, SessionState stores a HashMap<String, Arc>. Each FileFormatFactory could be shared among multiple concurrent queries and must be able to produce many, differently configured,

I wonder if it wuld be possible to store the file format state outside the FileFormat and pass in the format options as part of each query somehow 🤔

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets see if we can find some way to avoid the extra level of stateful factories. If not then I think this PR could still be merged

Before we merge I'll file follow on tickets for supporting protobuf encoding

@devinjdangelo
Copy link
Contributor Author

I wonder if it wuld be possible to store the file format state outside the FileFormat and pass in the format options as part of each query somehow 🤔

I think this is possible, but it would also make implementing FileFormat a bit inconvenient. We would have to pass in and parse format_options on every trait method. E.g. something like:

pub trait FileFormat{
        async fn create_physical_plan(
        &self,
        state: &SessionState,
        conf: FileScanConfig,
        filters: Option<&Arc<dyn PhysicalExpr>>,
        format_options: &HashMap<String, String>,
    ) -> Result<Arc<dyn ExecutionPlan>>;
    ...
}

And each method would have to parse the format_options. FileFormatFactory avoids this by allowing FileFormat to be created during physical planning when query specific options are known and can be parsed once/stored in the FileFormat struct.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should proceed with this PR, even though some of the traits are less than ideal (due to having to handle state)

Also before merging we should file the follow on tickets you suggest in #11060 (review)

Let's leave it open for another day or two to gather additional comments

Thanks again for pushing this part of the code forward

@alamb
Copy link
Contributor

alamb commented Jun 27, 2024

I merged up to resolve a conflict

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #11150 to track moving the extension codes

I plan to merge this PR when CI is complete

@alamb alamb merged commit 5501e8e into apache:main Jun 28, 2024
23 checks passed
@alamb
Copy link
Contributor

alamb commented Jun 28, 2024

Forward ! Thank you @devinjdangelo and @Weijun-H

comphead pushed a commit to comphead/arrow-datafusion that referenced this pull request Jul 2, 2024
…pache#11060)

* wip create and register ext file types with session

* Add contains function, and support in datafusion substrait consumer (apache#10879)

* adding new function contains

* adding substrait test

* adding doc

* adding doc

* Update docs/source/user-guide/sql/scalar_functions.md

Co-authored-by: Alex Huang <[email protected]>

* adding entry

---------

Co-authored-by: Alex Huang <[email protected]>

* logical planning updated

* compiling

* removing filetype enum

* compiling

* working on tests

* fix some tests

* test fixes

* cli fix

* cli fmt

* Update datafusion/core/src/datasource/file_format/mod.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/core/src/execution/session_state.rs

Co-authored-by: Andrew Lamb <[email protected]>

* review comments

* review comments

* review comments

* typo fix

* fmt

* fix err log style

* fmt

---------

Co-authored-by: Lordworms <[email protected]>
Co-authored-by: Alex Huang <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
findepi pushed a commit to findepi/datafusion that referenced this pull request Jul 16, 2024
…pache#11060)

* wip create and register ext file types with session

* Add contains function, and support in datafusion substrait consumer (apache#10879)

* adding new function contains

* adding substrait test

* adding doc

* adding doc

* Update docs/source/user-guide/sql/scalar_functions.md

Co-authored-by: Alex Huang <[email protected]>

* adding entry

---------

Co-authored-by: Alex Huang <[email protected]>

* logical planning updated

* compiling

* removing filetype enum

* compiling

* working on tests

* fix some tests

* test fixes

* cli fix

* cli fmt

* Update datafusion/core/src/datasource/file_format/mod.rs

Co-authored-by: Andrew Lamb <[email protected]>

* Update datafusion/core/src/execution/session_state.rs

Co-authored-by: Andrew Lamb <[email protected]>

* review comments

* review comments

* review comments

* typo fix

* fmt

* fix err log style

* fmt

---------

Co-authored-by: Lordworms <[email protected]>
Co-authored-by: Alex Huang <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate logical-expr Logical plan and expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

enable users to add support for LIstingTable / object_store table formats of different types
4 participants