Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,16 @@ fn parse(
&DataType::Date64(_) => {
build_primitive_array::<Date64Type>(line_number, rows, i)
}
&DataType::Timestamp(TimeUnit::Microsecond, _) => {
build_primitive_array::<TimestampMicrosecondType>(
line_number,
rows,
i,
)
}
&DataType::Timestamp(TimeUnit::Nanosecond, _) => {
build_primitive_array::<TimestampNanosecondType>(line_number, rows, i)
}
&DataType::Utf8 => Ok(Arc::new(
rows.iter().map(|row| row.get(i)).collect::<StringArray>(),
) as ArrayRef),
Expand Down Expand Up @@ -531,6 +541,30 @@ impl Parser for Date64Type {
}
}

impl Parser for TimestampNanosecondType {
fn parse(string: &str) -> Option<i64> {
match Self::DATA_TYPE {
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
Self::Native::from_i64(date_time.timestamp_nanos())
}
_ => None,
}
}
}

impl Parser for TimestampMicrosecondType {
fn parse(string: &str) -> Option<i64> {
match Self::DATA_TYPE {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
Self::Native::from_i64(date_time.timestamp_nanos() / 1000)
}
_ => None,
}
}
}

fn parse_item<T: Parser>(string: &str) -> Option<T::Native> {
T::parse(string)
}
Expand Down
51 changes: 51 additions & 0 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,57 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn group_by_date_trunc() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = ExecutionContext::new();
let schema = Arc::new(Schema::new(vec![
Field::new("c2", DataType::UInt64, false),
Field::new(
"t1",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
]));

// generate a partitioned file
for partition in 0..4 {
let filename = format!("partition-{}.{}", partition, "csv");
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;

// generate some data
for i in 0..10 {
let data = format!("{},2020-12-{}T00:00:00.000\n", i, i + 10);
file.write_all(data.as_bytes())?;
}
}

ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema).has_header(false),
)?;

let results = plan_and_collect(
&mut ctx,
"SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)"
).await?;
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["week", "SUM(c2)"]);

let expected: Vec<&str> =
vec!["2020-12-07T00:00:00,24", "2020-12-14T00:00:00,156"];
let mut rows = test::format_batch(&batch);
rows.sort();
assert_eq!(rows, expected);

Ok(())
}

async fn run_count_distinct_integers_aggregated_scenario(
partitions: Vec<Vec<(&str, u64)>>,
) -> Result<Vec<RecordBatch>> {
Expand Down
161 changes: 161 additions & 0 deletions rust/datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::{
buffer::Buffer,
datatypes::{DataType, TimeUnit, ToByteSlice},
};
use chrono::Duration;
use chrono::{prelude::*, LocalResult};

#[inline]
Expand Down Expand Up @@ -205,6 +206,108 @@ pub fn to_timestamp(args: &[ArrayRef]) -> Result<TimestampNanosecondArray> {
Ok(TimestampNanosecondArray::from(Arc::new(data)))
}

/// date_trunc SQL function
pub fn date_trunc(args: &[ArrayRef]) -> Result<TimestampNanosecondArray> {
let granularity_array =
&args[0]
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
DataFusionError::Execution(
"Could not cast date_trunc granularity input to StringArray"
.to_string(),
)
})?;

let array = &args[1]
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.ok_or_else(|| {
DataFusionError::Execution(
"Could not cast date_trunc array input to TimestampNanosecondArray"
.to_string(),
)
})?;

let range = 0..array.len();
let result = range
.map(|i| {
if array.is_null(i) {
Ok(0_i64)
} else {
let date_time = match granularity_array.value(i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list seems consistent with https://www.postgresql.org/docs/9.1/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC 👍 I think it is fine that we don't yeet support some of the more esoteric stuff (like milleniumm')

"second" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0)),
"minute" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0)),
"hour" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0)),
"day" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0)),
"week" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.map(|d| {
d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)
}),
"month" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0)),
"year" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0))
.and_then(|d| d.with_month0(0)),
unsupported => {
return Err(DataFusionError::Execution(format!(
"Unsupported date_trunc granularity: {}",
unsupported
)))
}
};
date_time.map(|d| d.timestamp_nanos()).ok_or_else(|| {
DataFusionError::Execution(format!(
"Can't truncate date time: {:?}",
array.value_as_datetime(i)
))
})
}
})
.collect::<Result<Vec<_>>>()?;

let data = ArrayData::new(
DataType::Timestamp(TimeUnit::Nanosecond, None),
array.len(),
Some(array.null_count()),
array.data().null_buffer().cloned(),
0,
vec![Buffer::from(result.to_byte_slice())],
vec![],
);

Ok(TimestampNanosecondArray::from(Arc::new(data)))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -378,6 +481,64 @@ mod tests {
Ok(())
}

#[test]
fn date_trunc_test() -> Result<()> {
let mut ts_builder = StringBuilder::new(2);
let mut truncated_builder = StringBuilder::new(2);
let mut string_builder = StringBuilder::new(2);

ts_builder.append_null()?;
truncated_builder.append_null()?;
string_builder.append_value("second")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T13:42:29.000000Z")?;
string_builder.append_value("second")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T13:42:00.000000Z")?;
string_builder.append_value("minute")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T13:00:00.000000Z")?;
string_builder.append_value("hour")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T00:00:00.000000Z")?;
string_builder.append_value("day")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-07T00:00:00.000000Z")?;
string_builder.append_value("week")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-01T00:00:00.000000Z")?;
string_builder.append_value("month")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-01-01T00:00:00.000000Z")?;
string_builder.append_value("year")?;

ts_builder.append_value("2021-01-01T13:42:29.190855Z")?;
truncated_builder.append_value("2020-12-28T00:00:00.000000Z")?;
string_builder.append_value("week")?;

ts_builder.append_value("2020-01-01T13:42:29.190855Z")?;
truncated_builder.append_value("2019-12-30T00:00:00.000000Z")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

string_builder.append_value("week")?;

let string_array = Arc::new(string_builder.finish());
let ts_array = Arc::new(to_timestamp(&[Arc::new(ts_builder.finish())]).unwrap());
let date_trunc_array = date_trunc(&[string_array, ts_array])
.expect("that to_timestamp parsed values without error");

let expected_timestamps =
to_timestamp(&[Arc::new(truncated_builder.finish())]).unwrap();

assert_eq!(date_trunc_array, expected_timestamps);
Ok(())
}

#[test]
fn to_timestamp_invalid_input_type() -> Result<()> {
// pass the wrong type of input array to to_timestamp and test
Expand Down
13 changes: 13 additions & 0 deletions rust/datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub enum BuiltinScalarFunction {
Array,
/// SQL NULLIF()
NullIf,
/// Date truncate
DateTrunc,
}

impl fmt::Display for BuiltinScalarFunction {
Expand Down Expand Up @@ -168,6 +170,7 @@ impl FromStr for BuiltinScalarFunction {
"trim" => BuiltinScalarFunction::Trim,
"upper" => BuiltinScalarFunction::Upper,
"to_timestamp" => BuiltinScalarFunction::ToTimestamp,
"date_trunc" => BuiltinScalarFunction::DateTrunc,
"array" => BuiltinScalarFunction::Array,
"nullif" => BuiltinScalarFunction::NullIf,
_ => {
Expand Down Expand Up @@ -247,6 +250,9 @@ pub fn return_type(
BuiltinScalarFunction::ToTimestamp => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
BuiltinScalarFunction::DateTrunc => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
Box::new(Field::new("item", arg_types[0].clone(), true)),
arg_types.len() as i32,
Expand Down Expand Up @@ -317,6 +323,9 @@ pub fn create_physical_expr(
BuiltinScalarFunction::ToTimestamp => {
|args| Ok(Arc::new(datetime_expressions::to_timestamp(args)?))
}
BuiltinScalarFunction::DateTrunc => {
|args| Ok(Arc::new(datetime_expressions::date_trunc(args)?))
}
BuiltinScalarFunction::Array => |args| Ok(array_expressions::array(args)?),
});
// coerce
Expand Down Expand Up @@ -355,6 +364,10 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8])
}
BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1, vec![DataType::Utf8]),
BuiltinScalarFunction::DateTrunc => Signature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
BuiltinScalarFunction::Array => {
Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec())
}
Expand Down
4 changes: 4 additions & 0 deletions rust/datafusion/src/physical_plan/group_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub(crate) enum GroupByScalar {
Int32(i32),
Int64(i64),
Utf8(Box<String>),
TimeMicrosecond(i64),
TimeNanosecond(i64),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

impl TryFrom<&ScalarValue> for GroupByScalar {
Expand Down Expand Up @@ -87,6 +89,8 @@ impl From<&GroupByScalar> for ScalarValue {
GroupByScalar::UInt32(v) => ScalarValue::UInt32(Some(*v)),
GroupByScalar::UInt64(v) => ScalarValue::UInt64(Some(*v)),
GroupByScalar::Utf8(v) => ScalarValue::Utf8(Some(v.to_string())),
GroupByScalar::TimeMicrosecond(v) => ScalarValue::TimeMicrosecond(Some(*v)),
GroupByScalar::TimeNanosecond(v) => ScalarValue::TimeNanosecond(Some(*v)),
}
}
}
Expand Down
Loading