Skip to content

Commit

Permalink
Implement serde for CSV and Parquet FileSinkExec (#8646)
Browse files Browse the repository at this point in the history
* Add serde for Csv and Parquet sink

* Add tests

* parquet test passes

* save progress

* add compression type to csv serde

* remove hard-coded compression from CSV serde
  • Loading branch information
andygrove committed Dec 29, 2023
1 parent b85a397 commit 7fc663c
Show file tree
Hide file tree
Showing 10 changed files with 922 additions and 57 deletions.
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ impl BatchSerializer for CsvSerializer {
}

/// Implements [`DataSink`] for writing to a CSV file.
struct CsvSink {
pub struct CsvSink {
/// Config options for writing data
config: FileSinkConfig,
}
Expand All @@ -461,9 +461,16 @@ impl DisplayAs for CsvSink {
}

impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ async fn fetch_statistics(
}

/// Implements [`DataSink`] for writing to a parquet file.
struct ParquetSink {
pub struct ParquetSink {
/// Config options for writing data
config: FileSinkConfig,
}
Expand All @@ -645,10 +645,15 @@ impl DisplayAs for ParquetSink {
}

impl ParquetSink {
fn new(config: FileSinkConfig) -> Self {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}
/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
Expand Down
40 changes: 33 additions & 7 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ message PhysicalPlanNode {
SymmetricHashJoinExecNode symmetric_hash_join = 25;
InterleaveExecNode interleave = 26;
PlaceholderRowExecNode placeholder_row = 27;
CsvSinkExecNode csv_sink = 28;
ParquetSinkExecNode parquet_sink = 29;
}
}

Expand Down Expand Up @@ -1220,20 +1222,22 @@ message ParquetWriterOptions {
}

message CsvWriterOptions {
// Compression type
CompressionTypeVariant compression = 1;
// Optional column delimiter. Defaults to `b','`
string delimiter = 1;
string delimiter = 2;
// Whether to write column names as file headers. Defaults to `true`
bool has_header = 2;
bool has_header = 3;
// Optional date format for date arrays
string date_format = 3;
string date_format = 4;
// Optional datetime format for datetime arrays
string datetime_format = 4;
string datetime_format = 5;
// Optional timestamp format for timestamp arrays
string timestamp_format = 5;
string timestamp_format = 6;
// Optional time format for time arrays
string time_format = 6;
string time_format = 7;
// Optional value to represent null
string null_value = 7;
string null_value = 8;
}

message WriterProperties {
Expand Down Expand Up @@ -1270,6 +1274,28 @@ message JsonSinkExecNode {
PhysicalSortExprNodeCollection sort_order = 4;
}

message CsvSink {
FileSinkConfig config = 1;
}

message CsvSinkExecNode {
PhysicalPlanNode input = 1;
CsvSink sink = 2;
Schema sink_schema = 3;
PhysicalSortExprNodeCollection sort_order = 4;
}

message ParquetSink {
FileSinkConfig config = 1;
}

message ParquetSinkExecNode {
PhysicalPlanNode input = 1;
ParquetSink sink = 2;
Schema sink_schema = 3;
PhysicalSortExprNodeCollection sort_order = 4;
}

message PhysicalExtensionNode {
bytes node = 1;
repeated PhysicalPlanNode inputs = 2;
Expand Down
Loading

0 comments on commit 7fc663c

Please sign in to comment.