Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Copy To Partitioned Files #9240

Merged
merged 7 commits into from
Feb 19, 2024
Merged

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Feb 15, 2024

Which issue does this PR close?

Re #9237
Closes #8493

Rationale for this change

We currently support writing to partitioned listing tables. It would be nice to leverage the same physical plan implementation within a CopyTo statement, so users can skip registering a table if they don't need one.

What changes are included in this PR?

  • Extends CopyTo LogicalPlan to support partition_by option
  • Extends CopyTo physical planning to propagate partition_by columns to the FileSinkExec plan
  • Extends DataFrameWriteOptions to support the same partition_by option
  • Adds partition column DataType inference to demux code so users do not have to explicitly specify data type in the COPY statement.

With these changes we can support queries like the following:

COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' 
(format parquet, compression 'zstd(10)', partition_by 'col2');

COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/' 
(format parquet, compression 'zstd(10)', partition_by 'column2, column3');

Are these changes tested?

Yes, the examples above are validated in copy.slt tests.

Are there any user-facing changes?

Copying to partition files is easier now

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Feb 15, 2024
@@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url,
file_format,
copy_options,
partition_by: _,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I did not add support for partition_by in proto. We should add a follow up ticket for this.

I don't believe this PR will break downstream systems like Ballista's handling of COPY, but it will silently ignore partition_by options until prost is updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed #9248

@@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner {
CopyOptions::WriterOptions(writer_options) => *writer_options.clone()
};

// Note: the DataType passed here is ignored for the purposes of writing and inferred instead
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read path needs an explicit DataType defined for the partition cols so it knows what to cast to, but I realized that the write path can just infer the correct DataType from the RecordBatch schema.

This allows COPY to only specify partition columns by name and not have to worry about specifying the correct data type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a better UX -- thank you

# Copy to directory as partitioned files
query ITT
COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/'
(format parquet, compression 'zstd(10)', partition_by 'column2, column3');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anonymous columns get the name "columnX" based on their order in the VALUES clause. It would be nice to document this somewhere, though I did make sure it is relatively easy to discover this based on the error message if you get a column name wrong.

"PartitionBy Column {} does not exist in source data!",
col
)))?;
"PartitionBy Column {} does not exist in source data! Got schema {schema}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be shortened with exec_datafusion_err!

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @devinjdangelo
Wondering as we write partitioned data then should also test the underlying file/folder structure, how the data was written to disk?

@devinjdangelo
Copy link
Contributor Author

Wondering as we write partitioned data then should also test the underlying file/folder structure, how the data was written to disk?

The copy.slt tests rely on the read path for partitioned tables to make sure the files were written out correctly.

# Copy to directory as partitioned files
query ITT
COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/' 
(format parquet, compression 'zstd(10)', partition_by 'column2, column3');
----
3

# validate multiple partitioned parquet file output
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet2 STORED AS PARQUET 
LOCATION 'test_files/scratch/copy/partitioned_table2/' PARTITIONED BY (column2, column3);

query I??
select * from validate_partitioned_parquet2 order by column1,column2,column3;
----
1 a x
2 b y
3 c z

The code currently doesn't do any checking/validation of existing directories or files before writing. DuckDB describes some of the options they have for controlling this behavior here https://duckdb.org/docs/data/partitioning/partitioned_writes.html

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried it out locally

❯ COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO '/tmp/foo/' (format parquet, compression 'zstd(10)', partition_by 'column2, column3');
+-------+
| count |
+-------+
| 3     |
+-------+
1 row in set. Query took 0.039 seconds.

❯
\q
andrewlamb@Andrews-MacBook-Pro:~/Software/arrow-datafusion/datafusion-cli$ ls -ltr /tmp/foo/
total 0
drwxr-xr-x@ 3 andrewlamb  wheel    96B Feb 16 09:28 column2=a/
drwxr-xr-x@ 3 andrewlamb  wheel    96B Feb 16 09:28 column2=b/
drwxr-xr-x@ 3 andrewlamb  wheel    96B Feb 16 09:28 column2=c/

match partition_by {
Some(part_cols) => part_cols
.split(',')
.map(|s| s.trim().replace('\'', ""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth mentioning that this also removes all ' from the column name 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the current parsing logic will not work for columns with single quotes... e.g.

create table test ("'test'" varchar, "'test2'" varchar); 

I'll see if I can generalize the parsing a bit. It seems standard convention in SQL and postgres is to use double single quotes to escape within a string literal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to get COPY + partition_by to work with single quote containing column names, but the read path doesn't seem to work and I even managed to trigger a panic. I wonder if we should track down that bug to support ' in the name of a partition column or if we should just declare that unsupported and reject partition by ("'column_name'")

@@ -319,14 +319,20 @@ fn compute_partition_keys_by_row<'a>(
) -> Result<Vec<Vec<&'a str>>> {
let mut all_partition_values = vec![];

for (col, dtype) in partition_by.iter() {
// For the purposes of writing partitioned data, we can rely on schema inference
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner {
CopyOptions::WriterOptions(writer_options) => *writer_options.clone()
};

// Note: the DataType passed here is ignored for the purposes of writing and inferred instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a better UX -- thank you

@@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url,
file_format,
copy_options,
partition_by: _,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed #9248

LOCATION 'test_files/scratch/copy/partitioned_table1/' PARTITIONED BY (col2);

query I?
select * from validate_partitioned_parquet order by col1, col2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test for reading out of one of the partitions

Something like

select * from 'test_files/scratch/copy/partitioned_table1/col2=Foo'

To demonstrate that the output was actually partitioned ? I think this test would pass even if the partition columns were ignored

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added additional tests to copy.slt to verify this

@@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions {
/// Allows compression of CSV and JSON.
/// Not supported for parquet.
compression: CompressionTypeVariant,
/// Sets which columns should be used for hive-style partitioned writes by name.
/// Can be set to empty vec![] for non-partitioned writes.
partition_by: Vec<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a test for this new feature DataFrame::write_parquet

I took a look around and I didn't see any good existing tests sadly. This is what I found.

https://github.com/apache/arrow-datafusion/blob/4d389c2590370d85bfe3af77f5243d5b40f5a222/datafusion/core/src/datasource/physical_plan/parquet/mod.rs#L2070

I'll make a short PR to move those tests into the dataframe tests to make it more discoverable

CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV
LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'");

# This triggers a panic (index out of bounds)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to query on this table partitioned by a single quote containing column name panics with index out of bounds error.

Manually inspecting the CSV suggests the previous COPY statement worked.

As mentioned in other thread, I'm not sure if it makes sense to support ' in a partition path name. It will certainly get ugly if we try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I think we wouldn't panic (perhaps we can generate a not supported error instead)

Given this PR doesn't seem to make the situation worse (or better) I don't think we need to fix it now. Instead I think we should file a ticket to address it as a follow on. I will do so

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOCATION 'test_files/scratch/copy/partitioned_table1/col2=Bar';

query I
select * from validate_partitioned_parquet_bar order by col1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV
LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'");

# This triggers a panic (index out of bounds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I think we wouldn't panic (perhaps we can generate a not supported error instead)

Given this PR doesn't seem to make the situation worse (or better) I don't think we need to fix it now. Instead I think we should file a ticket to address it as a follow on. I will do so

@alamb
Copy link
Contributor

alamb commented Feb 19, 2024

Thanks again @devinjdangelo -- I think all we need now is a few follow on tickets and I'll merge this PR

@alamb
Copy link
Contributor

alamb commented Feb 19, 2024

Filed #9269 to track panic in copy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support writing hive style partitioned files in COPY command
3 participants