Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement serde for CSV and Parquet FileSinkExec #8646

Merged
merged 8 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ impl BatchSerializer for CsvSerializer {
}

/// Implements [`DataSink`] for writing to a CSV file.
struct CsvSink {
pub struct CsvSink {
/// Config options for writing data
config: FileSinkConfig,
}
Expand All @@ -461,9 +461,16 @@ impl DisplayAs for CsvSink {
}

impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ async fn fetch_statistics(
}

/// Implements [`DataSink`] for writing to a parquet file.
struct ParquetSink {
pub struct ParquetSink {
/// Config options for writing data
config: FileSinkConfig,
}
Expand All @@ -645,10 +645,15 @@ impl DisplayAs for ParquetSink {
}

impl ParquetSink {
fn new(config: FileSinkConfig) -> Self {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}
/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
Expand Down
40 changes: 33 additions & 7 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ message PhysicalPlanNode {
SymmetricHashJoinExecNode symmetric_hash_join = 25;
InterleaveExecNode interleave = 26;
PlaceholderRowExecNode placeholder_row = 27;
CsvSinkExecNode csv_sink = 28;
ParquetSinkExecNode parquet_sink = 29;
}
}

Expand Down Expand Up @@ -1220,20 +1222,22 @@ message ParquetWriterOptions {
}

message CsvWriterOptions {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have not released a version of DataFusion that contains CsvWriterOptions yet, so it is safe to change the field numbers here.

// Compression type
CompressionTypeVariant compression = 1;
// Optional column delimiter. Defaults to `b','`
string delimiter = 1;
string delimiter = 2;
// Whether to write column names as file headers. Defaults to `true`
bool has_header = 2;
bool has_header = 3;
// Optional date format for date arrays
string date_format = 3;
string date_format = 4;
// Optional datetime format for datetime arrays
string datetime_format = 4;
string datetime_format = 5;
// Optional timestamp format for timestamp arrays
string timestamp_format = 5;
string timestamp_format = 6;
// Optional time format for time arrays
string time_format = 6;
string time_format = 7;
// Optional value to represent null
string null_value = 7;
string null_value = 8;
}

message WriterProperties {
Expand Down Expand Up @@ -1270,6 +1274,28 @@ message JsonSinkExecNode {
PhysicalSortExprNodeCollection sort_order = 4;
}

message CsvSink {
FileSinkConfig config = 1;
}

message CsvSinkExecNode {
PhysicalPlanNode input = 1;
CsvSink sink = 2;
Schema sink_schema = 3;
PhysicalSortExprNodeCollection sort_order = 4;
}

message ParquetSink {
FileSinkConfig config = 1;
}

message ParquetSinkExecNode {
PhysicalPlanNode input = 1;
ParquetSink sink = 2;
Schema sink_schema = 3;
PhysicalSortExprNodeCollection sort_order = 4;
}

message PhysicalExtensionNode {
bytes node = 1;
repeated PhysicalPlanNode inputs = 2;
Expand Down
Loading