-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Copy To Partitioned Files #9240
Changes from 3 commits
f1dcbf9
7f017d6
109471b
6c68f4b
e9d6325
aed4d08
8b6b6b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions { | |
/// Allows compression of CSV and JSON. | ||
/// Not supported for parquet. | ||
compression: CompressionTypeVariant, | ||
/// Sets which columns should be used for hive-style partitioned writes by name. | ||
/// Can be set to empty vec![] for non-partitioned writes. | ||
partition_by: Vec<String>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need a test for this new feature I took a look around and I didn't see any good existing tests sadly. This is what I found. I'll make a short PR to move those tests into the dataframe tests to make it more discoverable |
||
} | ||
|
||
impl DataFrameWriteOptions { | ||
|
@@ -82,6 +85,7 @@ impl DataFrameWriteOptions { | |
overwrite: false, | ||
single_file_output: false, | ||
compression: CompressionTypeVariant::UNCOMPRESSED, | ||
partition_by: vec![], | ||
} | ||
} | ||
/// Set the overwrite option to true or false | ||
|
@@ -101,6 +105,12 @@ impl DataFrameWriteOptions { | |
self.compression = compression; | ||
self | ||
} | ||
|
||
/// Sets the partition_by columns for output partitioning | ||
pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self { | ||
self.partition_by = partition_by; | ||
self | ||
} | ||
} | ||
|
||
impl Default for DataFrameWriteOptions { | ||
|
@@ -1176,6 +1186,7 @@ impl DataFrame { | |
self.plan, | ||
path.into(), | ||
FileType::CSV, | ||
options.partition_by, | ||
copy_options, | ||
)? | ||
.build()?; | ||
|
@@ -1219,6 +1230,7 @@ impl DataFrame { | |
self.plan, | ||
path.into(), | ||
FileType::JSON, | ||
options.partition_by, | ||
copy_options, | ||
)? | ||
.build()?; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -319,14 +319,22 @@ fn compute_partition_keys_by_row<'a>( | |
) -> Result<Vec<Vec<&'a str>>> { | ||
let mut all_partition_values = vec![]; | ||
|
||
for (col, dtype) in partition_by.iter() { | ||
// For the purposes of writing partitioned data, we can rely on schema inference | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
// to determine the type of the partition cols in order to provide a more ergonomic | ||
// UI which does not require specifying DataTypes manually. So, we ignore the | ||
// DataType within the partition_by array and infer the correct type from the | ||
// batch schema instead. | ||
let schema = rb.schema(); | ||
for (col, _) in partition_by.iter() { | ||
let mut partition_values = vec![]; | ||
|
||
let dtype = schema.field_with_name(col)?.data_type(); | ||
let col_array = | ||
rb.column_by_name(col) | ||
.ok_or(DataFusionError::Execution(format!( | ||
"PartitionBy Column {} does not exist in source data!", | ||
col | ||
)))?; | ||
"PartitionBy Column {} does not exist in source data! Got schema {schema}.", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it can be shortened with |
||
col, | ||
)))?; | ||
|
||
match dtype { | ||
DataType::Utf8 => { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -568,6 +568,7 @@ impl DefaultPhysicalPlanner { | |
output_url, | ||
file_format, | ||
copy_options, | ||
partition_by, | ||
}) => { | ||
let input_exec = self.create_initial_plan(input, session_state).await?; | ||
let parsed_url = ListingTableUrl::parse(output_url)?; | ||
|
@@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner { | |
CopyOptions::WriterOptions(writer_options) => *writer_options.clone() | ||
}; | ||
|
||
// Note: the DataType passed here is ignored for the purposes of writing and inferred instead | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The read path needs an explicit DataType defined for the partition cols so it knows what to cast to, but I realized that the write path can just infer the correct DataType from the RecordBatch schema. This allows COPY to only specify partition columns by name and not have to worry about specifying the correct data type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree this is a better UX -- thank you |
||
// from the schema of the RecordBatch being written. This allows COPY statements to specify only | ||
// the column name rather than column name + explicit data type. | ||
let table_partition_cols = partition_by.iter() | ||
.map(|s| (s.to_string(), arrow_schema::DataType::Null)) | ||
.collect::<Vec<_>>(); | ||
|
||
// Set file sink related options | ||
let config = FileSinkConfig { | ||
object_store_url, | ||
table_paths: vec![parsed_url], | ||
file_groups: vec![], | ||
output_schema: Arc::new(schema), | ||
table_partition_cols: vec![], | ||
table_partition_cols, | ||
overwrite: false, | ||
file_type_writer_options | ||
}; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -918,6 +918,7 @@ impl AsLogicalPlan for LogicalPlanNode { | |
input: Arc::new(input), | ||
output_url: copy.output_url.clone(), | ||
file_format: FileType::from_str(©.file_type)?, | ||
partition_by: vec![], | ||
copy_options, | ||
}, | ||
)) | ||
|
@@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode { | |
output_url, | ||
file_format, | ||
copy_options, | ||
partition_by: _, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that I did not add support for partition_by in proto. We should add a follow up ticket for this. I don't believe this PR will break downstream systems like Ballista's handling of COPY, but it will silently ignore partition_by options until prost is updated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. filed #9248 |
||
}) => { | ||
let input = protobuf::LogicalPlanNode::try_from_logical_plan( | ||
input, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,44 @@ COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compressi | |
---- | ||
2 | ||
|
||
# Copy to directory as partitioned files | ||
query IT | ||
COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format parquet, compression 'zstd(10)', partition_by 'col2'); | ||
---- | ||
2 | ||
|
||
# validate multiple partitioned parquet file output | ||
statement ok | ||
CREATE EXTERNAL TABLE validate_partitioned_parquet STORED AS PARQUET | ||
LOCATION 'test_files/scratch/copy/partitioned_table1/' PARTITIONED BY (col2); | ||
|
||
query I? | ||
select * from validate_partitioned_parquet; | ||
---- | ||
2 Bar | ||
1 Foo | ||
|
||
# Copy to directory as partitioned files | ||
query ITT | ||
COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/' | ||
(format parquet, compression 'zstd(10)', partition_by 'column2, column3'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anonymous columns get the name "columnX" based on their order in the VALUES clause. It would be nice to document this somewhere, though I did make sure it is relatively easy to discover this based on the error message if you get a column name wrong. |
||
---- | ||
3 | ||
|
||
# validate multiple partitioned parquet file output | ||
statement ok | ||
CREATE EXTERNAL TABLE validate_partitioned_parquet2 STORED AS PARQUET | ||
LOCATION 'test_files/scratch/copy/partitioned_table2/' PARTITIONED BY (column2, column3); | ||
|
||
query I?? | ||
select * from validate_partitioned_parquet2; | ||
---- | ||
3 c z | ||
1 a x | ||
2 b y | ||
|
||
|
||
|
||
query TT | ||
EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compression 'zstd(10)'); | ||
---- | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth mentioning that this also removes all
'
from the column name 🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the current parsing logic will not work for columns with single quotes... e.g.
I'll see if I can generalize the parsing a bit. It seems standard convention in SQL and postgres is to use double single quotes to escape within a string literal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to get COPY + partition_by to work with single quote containing column names, but the read path doesn't seem to work and I even managed to trigger a panic. I wonder if we should track down that bug to support ' in the name of a partition column or if we should just declare that unsupported and reject partition by ("'column_name'")