Skip to content

Commit

Permalink
Add support for Utf8View, Boolean, Date32/64, int32/64 for writing hi…
Browse files Browse the repository at this point in the history
…ve style partitions (#12283)

* Add support for Utf8View, Boolean, Date32/64, int32/64 for writing out hive style partitioning.

* Swith to Cow vs String to reduce instances of string allocation.

* Cargo fmt update.
  • Loading branch information
Omega359 authored Sep 11, 2024
1 parent 3ece7a7 commit 6aae2ee
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 6 deletions.
71 changes: 65 additions & 6 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Module containing helper methods/traits related to enabling
//! dividing input stream into multiple output files at execution time

use std::borrow::Cow;
use std::collections::HashMap;

use std::sync::Arc;
Expand All @@ -31,7 +32,11 @@ use arrow_array::builder::UInt64Builder;
use arrow_array::cast::AsArray;
use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use chrono::NaiveDate;
use datafusion_common::cast::{
as_boolean_array, as_date32_array, as_date64_array, as_int32_array, as_int64_array,
as_string_array, as_string_view_array,
};
use datafusion_common::{exec_datafusion_err, DataFusionError};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -320,9 +325,11 @@ async fn hive_style_partitions_demuxer(
fn compute_partition_keys_by_row<'a>(
rb: &'a RecordBatch,
partition_by: &'a [(String, DataType)],
) -> Result<Vec<Vec<&'a str>>> {
) -> Result<Vec<Vec<Cow<'a, str>>>> {
let mut all_partition_values = vec![];

const EPOCH_DAYS_FROM_CE: i32 = 719_163;

// For the purposes of writing partitioned data, we can rely on schema inference
// to determine the type of the partition cols in order to provide a more ergonomic
// UI which does not require specifying DataTypes manually. So, we ignore the
Expand All @@ -342,7 +349,59 @@ fn compute_partition_keys_by_row<'a>(
DataType::Utf8 => {
let array = as_string_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(array.value(i));
partition_values.push(Cow::from(array.value(i)));
}
}
DataType::Utf8View => {
let array = as_string_view_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i)));
}
}
DataType::Boolean => {
let array = as_boolean_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Date32 => {
let array = as_date32_array(col_array)?;
// ISO-8601/RFC3339 format - yyyy-mm-dd
let format = "%Y-%m-%d";
for i in 0..rb.num_rows() {
let date = NaiveDate::from_num_days_from_ce_opt(
EPOCH_DAYS_FROM_CE + array.value(i),
)
.unwrap()
.format(format)
.to_string();
partition_values.push(Cow::from(date));
}
}
DataType::Date64 => {
let array = as_date64_array(col_array)?;
// ISO-8601/RFC3339 format - yyyy-mm-dd
let format = "%Y-%m-%d";
for i in 0..rb.num_rows() {
let date = NaiveDate::from_num_days_from_ce_opt(
EPOCH_DAYS_FROM_CE + (array.value(i) / 86_400_000) as i32,
)
.unwrap()
.format(format)
.to_string();
partition_values.push(Cow::from(date));
}
}
DataType::Int32 => {
let array = as_int32_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Int64 => {
let array = as_int64_array(col_array)?;
for i in 0..rb.num_rows() {
partition_values.push(Cow::from(array.value(i).to_string()));
}
}
DataType::Dictionary(_, _) => {
Expand All @@ -354,7 +413,7 @@ fn compute_partition_keys_by_row<'a>(

for val in array.values() {
partition_values.push(
val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))?
Cow::from(val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))?),
);
}
},
Expand All @@ -377,13 +436,13 @@ fn compute_partition_keys_by_row<'a>(

fn compute_take_arrays(
rb: &RecordBatch,
all_partition_values: Vec<Vec<&str>>,
all_partition_values: Vec<Vec<Cow<str>>>,
) -> HashMap<Vec<String>, UInt64Builder> {
let mut take_map = HashMap::new();
for i in 0..rb.num_rows() {
let mut part_key = vec![];
for vals in all_partition_values.iter() {
part_key.push(vals[i].to_owned());
part_key.push(vals[i].clone().into());
}
let builder = take_map.entry(part_key).or_insert(UInt64Builder::new());
builder.append_value(i as u64);
Expand Down
22 changes: 22 additions & 0 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ select * from validate_partitioned_parquet_1_x order by column2;
----
a

# Copy to directory as partitioned files
query I
COPY (values (1::int, 2::bigint, 19968::date, arrow_cast(1725235200000, 'Date64'), false, 'x'),
(11::int, 22::bigint, 19969::date, arrow_cast(1725148800000, 'Date64'), true, 'y')
)
TO 'test_files/scratch/copy/partitioned_table5/' STORED AS parquet PARTITIONED BY (column1, column2, column3, column4, column5)
OPTIONS ('format.compression' 'zstd(10)');
----
2

# validate partitioning
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet5 (column1 int, column2 bigint, column3 date, column4 date, column5 boolean, column6 varchar) STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table5/' PARTITIONED BY (column1, column2, column3, column4, column5);

query IIDDBT
select column1, column2, column3, column4, column5, column6 from validate_partitioned_parquet5 order by column1,column2,column3,column4,column5;
----
1 2 2024-09-02 2024-09-02 false x
11 22 2024-09-03 2024-09-01 true y


statement ok
create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar);

Expand Down

0 comments on commit 6aae2ee

Please sign in to comment.