diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 3fca7b23e8c..c9f97cdc6d0 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -449,6 +449,16 @@ fn parse( &DataType::Date64(_) => { build_primitive_array::(line_number, rows, i) } + &DataType::Timestamp(TimeUnit::Microsecond, _) => { + build_primitive_array::( + line_number, + rows, + i, + ) + } + &DataType::Timestamp(TimeUnit::Nanosecond, _) => { + build_primitive_array::(line_number, rows, i) + } &DataType::Utf8 => Ok(Arc::new( rows.iter().map(|row| row.get(i)).collect::(), ) as ArrayRef), @@ -531,6 +541,30 @@ impl Parser for Date64Type { } } +impl Parser for TimestampNanosecondType { + fn parse(string: &str) -> Option { + match Self::DATA_TYPE { + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + let date_time = string.parse::().ok()?; + Self::Native::from_i64(date_time.timestamp_nanos()) + } + _ => None, + } + } +} + +impl Parser for TimestampMicrosecondType { + fn parse(string: &str) -> Option { + match Self::DATA_TYPE { + DataType::Timestamp(TimeUnit::Microsecond, None) => { + let date_time = string.parse::().ok()?; + Self::Native::from_i64(date_time.timestamp_nanos() / 1000) + } + _ => None, + } + } +} + fn parse_item(string: &str) -> Option { T::parse(string) } diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index dae9c90c6a2..ebfc8871d87 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -1055,6 +1055,57 @@ mod tests { Ok(()) } + #[tokio::test] + async fn group_by_date_trunc() -> Result<()> { + let tmp_dir = TempDir::new()?; + let mut ctx = ExecutionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("c2", DataType::UInt64, false), + Field::new( + "t1", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ), + ])); + + // generate a partitioned file + for partition in 0..4 { + let filename = format!("partition-{}.{}", partition, "csv"); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..10 { + let data = format!("{},2020-12-{}T00:00:00.000\n", i, i + 10); + file.write_all(data.as_bytes())?; + } + } + + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema).has_header(false), + )?; + + let results = plan_and_collect( + &mut ctx, + "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)" + ).await?; + assert_eq!(results.len(), 1); + + let batch = &results[0]; + + assert_eq!(field_names(batch), vec!["week", "SUM(c2)"]); + + let expected: Vec<&str> = + vec!["2020-12-07T00:00:00,24", "2020-12-14T00:00:00,156"]; + let mut rows = test::format_batch(&batch); + rows.sort(); + assert_eq!(rows, expected); + + Ok(()) + } + async fn run_count_distinct_integers_aggregated_scenario( partitions: Vec>, ) -> Result> { diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index a12b00cf845..247f899f50e 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -25,6 +25,7 @@ use arrow::{ buffer::Buffer, datatypes::{DataType, TimeUnit, ToByteSlice}, }; +use chrono::Duration; use chrono::{prelude::*, LocalResult}; #[inline] @@ -205,6 +206,108 @@ pub fn to_timestamp(args: &[ArrayRef]) -> Result { Ok(TimestampNanosecondArray::from(Arc::new(data))) } +/// date_trunc SQL function +pub fn date_trunc(args: &[ArrayRef]) -> Result { + let granularity_array = + &args[0] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution( + "Could not cast date_trunc granularity input to StringArray" + .to_string(), + ) + })?; + + let array = &args[1] + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution( + "Could not cast date_trunc array input to TimestampNanosecondArray" + .to_string(), + ) + })?; + + let range = 0..array.len(); + let result = range + .map(|i| { + if array.is_null(i) { + Ok(0_i64) + } else { + let date_time = match granularity_array.value(i) { + "second" => array + .value_as_datetime(i) + .and_then(|d| d.with_nanosecond(0)), + "minute" => array + .value_as_datetime(i) + .and_then(|d| d.with_nanosecond(0)) + .and_then(|d| d.with_second(0)), + "hour" => array + .value_as_datetime(i) + .and_then(|d| d.with_nanosecond(0)) + .and_then(|d| d.with_second(0)) + .and_then(|d| d.with_minute(0)), + "day" => array + .value_as_datetime(i) + .and_then(|d| d.with_nanosecond(0)) + .and_then(|d| d.with_second(0)) + .and_then(|d| d.with_minute(0)) + .and_then(|d| d.with_hour(0)), + "week" => array + .value_as_datetime(i) + .and_then(|d| d.with_nanosecond(0)) + .and_then(|d| d.with_second(0)) + .and_then(|d| d.with_minute(0)) + .and_then(|d| d.with_hour(0)) + .map(|d| { + d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64) + }), + "month" => array + .value_as_datetime(i) + .and_then(|d| d.with_nanosecond(0)) + .and_then(|d| d.with_second(0)) + .and_then(|d| d.with_minute(0)) + .and_then(|d| d.with_hour(0)) + .and_then(|d| d.with_day0(0)), + "year" => array + .value_as_datetime(i) + .and_then(|d| d.with_nanosecond(0)) + .and_then(|d| d.with_second(0)) + .and_then(|d| d.with_minute(0)) + .and_then(|d| d.with_hour(0)) + .and_then(|d| d.with_day0(0)) + .and_then(|d| d.with_month0(0)), + unsupported => { + return Err(DataFusionError::Execution(format!( + "Unsupported date_trunc granularity: {}", + unsupported + ))) + } + }; + date_time.map(|d| d.timestamp_nanos()).ok_or_else(|| { + DataFusionError::Execution(format!( + "Can't truncate date time: {:?}", + array.value_as_datetime(i) + )) + }) + } + }) + .collect::>>()?; + + let data = ArrayData::new( + DataType::Timestamp(TimeUnit::Nanosecond, None), + array.len(), + Some(array.null_count()), + array.data().null_buffer().cloned(), + 0, + vec![Buffer::from(result.to_byte_slice())], + vec![], + ); + + Ok(TimestampNanosecondArray::from(Arc::new(data))) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -378,6 +481,64 @@ mod tests { Ok(()) } + #[test] + fn date_trunc_test() -> Result<()> { + let mut ts_builder = StringBuilder::new(2); + let mut truncated_builder = StringBuilder::new(2); + let mut string_builder = StringBuilder::new(2); + + ts_builder.append_null()?; + truncated_builder.append_null()?; + string_builder.append_value("second")?; + + ts_builder.append_value("2020-09-08T13:42:29.190855Z")?; + truncated_builder.append_value("2020-09-08T13:42:29.000000Z")?; + string_builder.append_value("second")?; + + ts_builder.append_value("2020-09-08T13:42:29.190855Z")?; + truncated_builder.append_value("2020-09-08T13:42:00.000000Z")?; + string_builder.append_value("minute")?; + + ts_builder.append_value("2020-09-08T13:42:29.190855Z")?; + truncated_builder.append_value("2020-09-08T13:00:00.000000Z")?; + string_builder.append_value("hour")?; + + ts_builder.append_value("2020-09-08T13:42:29.190855Z")?; + truncated_builder.append_value("2020-09-08T00:00:00.000000Z")?; + string_builder.append_value("day")?; + + ts_builder.append_value("2020-09-08T13:42:29.190855Z")?; + truncated_builder.append_value("2020-09-07T00:00:00.000000Z")?; + string_builder.append_value("week")?; + + ts_builder.append_value("2020-09-08T13:42:29.190855Z")?; + truncated_builder.append_value("2020-09-01T00:00:00.000000Z")?; + string_builder.append_value("month")?; + + ts_builder.append_value("2020-09-08T13:42:29.190855Z")?; + truncated_builder.append_value("2020-01-01T00:00:00.000000Z")?; + string_builder.append_value("year")?; + + ts_builder.append_value("2021-01-01T13:42:29.190855Z")?; + truncated_builder.append_value("2020-12-28T00:00:00.000000Z")?; + string_builder.append_value("week")?; + + ts_builder.append_value("2020-01-01T13:42:29.190855Z")?; + truncated_builder.append_value("2019-12-30T00:00:00.000000Z")?; + string_builder.append_value("week")?; + + let string_array = Arc::new(string_builder.finish()); + let ts_array = Arc::new(to_timestamp(&[Arc::new(ts_builder.finish())]).unwrap()); + let date_trunc_array = date_trunc(&[string_array, ts_array]) + .expect("that to_timestamp parsed values without error"); + + let expected_timestamps = + to_timestamp(&[Arc::new(truncated_builder.finish())]).unwrap(); + + assert_eq!(date_trunc_array, expected_timestamps); + Ok(()) + } + #[test] fn to_timestamp_invalid_input_type() -> Result<()> { // pass the wrong type of input array to to_timestamp and test diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index 0023fbf21d6..316586d44b0 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -130,6 +130,8 @@ pub enum BuiltinScalarFunction { Array, /// SQL NULLIF() NullIf, + /// Date truncate + DateTrunc, } impl fmt::Display for BuiltinScalarFunction { @@ -168,6 +170,7 @@ impl FromStr for BuiltinScalarFunction { "trim" => BuiltinScalarFunction::Trim, "upper" => BuiltinScalarFunction::Upper, "to_timestamp" => BuiltinScalarFunction::ToTimestamp, + "date_trunc" => BuiltinScalarFunction::DateTrunc, "array" => BuiltinScalarFunction::Array, "nullif" => BuiltinScalarFunction::NullIf, _ => { @@ -247,6 +250,9 @@ pub fn return_type( BuiltinScalarFunction::ToTimestamp => { Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)) } + BuiltinScalarFunction::DateTrunc => { + Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)) + } BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList( Box::new(Field::new("item", arg_types[0].clone(), true)), arg_types.len() as i32, @@ -317,6 +323,9 @@ pub fn create_physical_expr( BuiltinScalarFunction::ToTimestamp => { |args| Ok(Arc::new(datetime_expressions::to_timestamp(args)?)) } + BuiltinScalarFunction::DateTrunc => { + |args| Ok(Arc::new(datetime_expressions::date_trunc(args)?)) + } BuiltinScalarFunction::Array => |args| Ok(array_expressions::array(args)?), }); // coerce @@ -355,6 +364,10 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8]) } BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1, vec![DataType::Utf8]), + BuiltinScalarFunction::DateTrunc => Signature::Exact(vec![ + DataType::Utf8, + DataType::Timestamp(TimeUnit::Nanosecond, None), + ]), BuiltinScalarFunction::Array => { Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec()) } diff --git a/rust/datafusion/src/physical_plan/group_scalar.rs b/rust/datafusion/src/physical_plan/group_scalar.rs index 8c11a6be65a..295eb462cca 100644 --- a/rust/datafusion/src/physical_plan/group_scalar.rs +++ b/rust/datafusion/src/physical_plan/group_scalar.rs @@ -35,6 +35,8 @@ pub(crate) enum GroupByScalar { Int32(i32), Int64(i64), Utf8(Box), + TimeMicrosecond(i64), + TimeNanosecond(i64), } impl TryFrom<&ScalarValue> for GroupByScalar { @@ -87,6 +89,8 @@ impl From<&GroupByScalar> for ScalarValue { GroupByScalar::UInt32(v) => ScalarValue::UInt32(Some(*v)), GroupByScalar::UInt64(v) => ScalarValue::UInt64(Some(*v)), GroupByScalar::Utf8(v) => ScalarValue::Utf8(Some(v.to_string())), + GroupByScalar::TimeMicrosecond(v) => ScalarValue::TimeMicrosecond(Some(*v)), + GroupByScalar::TimeNanosecond(v) => ScalarValue::TimeNanosecond(Some(*v)), } } } diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 96772467140..864bc78a154 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -30,7 +30,7 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::{Accumulator, AggregateExpr}; use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; use arrow::{ @@ -49,6 +49,7 @@ use super::{ use ahash::RandomState; use hashbrown::HashMap; +use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray}; use async_trait::async_trait; /// Hash aggregate modes @@ -672,6 +673,12 @@ fn create_batch_from_map( GroupByScalar::Utf8(str) => { Arc::new(StringArray::from(vec![&***str])) } + GroupByScalar::TimeMicrosecond(n) => { + Arc::new(TimestampMicrosecondArray::from(vec![*n])) + } + GroupByScalar::TimeNanosecond(n) => { + Arc::new(TimestampNanosecondArray::from_vec(vec![*n], None)) + } }) .collect::>(); @@ -780,6 +787,20 @@ pub(crate) fn create_group_by_values( let array = col.as_any().downcast_ref::().unwrap(); vec[i] = GroupByScalar::Utf8(Box::new(array.value(row).into())) } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + vec[i] = GroupByScalar::TimeMicrosecond(array.value(row)) + } + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + vec[i] = GroupByScalar::TimeNanosecond(array.value(row)) + } _ => { // This is internal because we should have caught this before. return Err(DataFusionError::Internal( diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs index 9ac7447a8ab..8a3ee4b09ba 100644 --- a/rust/datafusion/src/physical_plan/hash_join.rs +++ b/rust/datafusion/src/physical_plan/hash_join.rs @@ -18,6 +18,7 @@ //! Defines the join plan for executing partitions in parallel and then joining the results //! into a set of partitions. +use arrow::array::{TimestampMicrosecondArray, TimestampNanosecondArray}; use arrow::{array::ArrayRef, compute}; use std::sync::Arc; use std::{any::Any, collections::HashSet}; @@ -28,7 +29,7 @@ use hashbrown::HashMap; use tokio::sync::Mutex; use arrow::array::{make_array, Array, MutableArrayData}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, TimeUnit}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -373,6 +374,20 @@ pub(crate) fn create_key( let array = col.as_any().downcast_ref::().unwrap(); vec.extend(array.value(row).to_le_bytes().iter()); } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + vec.extend(array.value(row).to_le_bytes().iter()); + } + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + let array = col + .as_any() + .downcast_ref::() + .unwrap(); + vec.extend(array.value(row).to_le_bytes().iter()); + } DataType::Utf8 => { let array = col.as_any().downcast_ref::().unwrap(); let value = array.value(row); diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs b/rust/datafusion/src/physical_plan/type_coercion.rs index 91eaf659fd7..090729a1b9f 100644 --- a/rust/datafusion/src/physical_plan/type_coercion.rs +++ b/rust/datafusion/src/physical_plan/type_coercion.rs @@ -31,7 +31,7 @@ use std::sync::Arc; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::{DataType, Schema, TimeUnit}; use super::{functions::Signature, PhysicalExpr}; use crate::error::{DataFusionError, Result}; @@ -176,6 +176,7 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool { | Float32 | Float64 ), + Timestamp(TimeUnit::Nanosecond, None) => matches!(type_from, Timestamp(_, None)), Utf8 => true, _ => false, } diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs index 5afcabd9ecc..64e5bcc22b5 100644 --- a/rust/datafusion/src/scalar.rs +++ b/rust/datafusion/src/scalar.rs @@ -20,8 +20,9 @@ use std::{convert::TryFrom, fmt, sync::Arc}; use arrow::array::{ - Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, UInt16Builder, - UInt32Builder, UInt64Builder, UInt8Builder, + Int16Builder, Int32Builder, Int64Builder, Int8Builder, ListBuilder, + TimestampMicrosecondArray, TimestampNanosecondArray, UInt16Builder, UInt32Builder, + UInt64Builder, UInt8Builder, }; use arrow::{ array::ArrayRef, @@ -37,6 +38,7 @@ use arrow::{ }; use crate::error::{DataFusionError, Result}; +use arrow::datatypes::TimeUnit; /// Represents a dynamically typed, nullable single value. /// This is the single-valued counter-part of arrow’s `Array`. @@ -72,6 +74,10 @@ pub enum ScalarValue { List(Option>, DataType), /// Date stored as a signed 32bit int Date32(Option), + /// Timestamp Microseconds + TimeMicrosecond(Option), + /// Timestamp Nanoseconds + TimeNanosecond(Option), } macro_rules! typed_cast { @@ -131,6 +137,12 @@ impl ScalarValue { ScalarValue::Int16(_) => DataType::Int16, ScalarValue::Int32(_) => DataType::Int32, ScalarValue::Int64(_) => DataType::Int64, + ScalarValue::TimeMicrosecond(_) => { + DataType::Timestamp(TimeUnit::Microsecond, None) + } + ScalarValue::TimeNanosecond(_) => { + DataType::Timestamp(TimeUnit::Nanosecond, None) + } ScalarValue::Float32(_) => DataType::Float32, ScalarValue::Float64(_) => DataType::Float64, ScalarValue::Utf8(_) => DataType::Utf8, @@ -205,6 +217,12 @@ impl ScalarValue { ScalarValue::UInt16(e) => Arc::new(UInt16Array::from(vec![*e; size])), ScalarValue::UInt32(e) => Arc::new(UInt32Array::from(vec![*e; size])), ScalarValue::UInt64(e) => Arc::new(UInt64Array::from(vec![*e; size])), + ScalarValue::TimeMicrosecond(e) => { + Arc::new(TimestampMicrosecondArray::from(vec![*e])) + } + ScalarValue::TimeNanosecond(e) => { + Arc::new(TimestampNanosecondArray::from_opt_vec(vec![*e], None)) + } ScalarValue::Utf8(e) => Arc::new(StringArray::from(vec![e.as_deref(); size])), ScalarValue::LargeUtf8(e) => { Arc::new(LargeStringArray::from(vec![e.as_deref(); size])) @@ -440,6 +458,8 @@ impl fmt::Display for ScalarValue { ScalarValue::UInt16(e) => format_option!(f, e)?, ScalarValue::UInt32(e) => format_option!(f, e)?, ScalarValue::UInt64(e) => format_option!(f, e)?, + ScalarValue::TimeMicrosecond(e) => format_option!(f, e)?, + ScalarValue::TimeNanosecond(e) => format_option!(f, e)?, ScalarValue::Utf8(e) => format_option!(f, e)?, ScalarValue::LargeUtf8(e) => format_option!(f, e)?, ScalarValue::List(e, _) => match e { @@ -473,6 +493,8 @@ impl fmt::Debug for ScalarValue { ScalarValue::UInt16(_) => write!(f, "UInt16({})", self), ScalarValue::UInt32(_) => write!(f, "UInt32({})", self), ScalarValue::UInt64(_) => write!(f, "UInt64({})", self), + ScalarValue::TimeMicrosecond(_) => write!(f, "TimeMicrosecond({})", self), + ScalarValue::TimeNanosecond(_) => write!(f, "TimeNanosecond({})", self), ScalarValue::Utf8(None) => write!(f, "Utf8({})", self), ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self), ScalarValue::LargeUtf8(None) => write!(f, "LargeUtf8({})", self), diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs index db0ef3b0ff2..e589834be5c 100644 --- a/rust/datafusion/src/test/mod.rs +++ b/rust/datafusion/src/test/mod.rs @@ -21,7 +21,7 @@ use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder}; use arrow::array::{self, Int32Array}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; use std::fs::File; use std::io::prelude::*; @@ -210,6 +210,24 @@ pub fn format_batch(batch: &RecordBatch) -> Vec { .unwrap() .value(row_index) )), + DataType::Timestamp(TimeUnit::Microsecond, _) => s.push_str(&format!( + "{:?}", + array + .as_any() + .downcast_ref::() + .unwrap() + .value_as_datetime(row_index) + .unwrap() + )), + DataType::Timestamp(TimeUnit::Nanosecond, _) => s.push_str(&format!( + "{:?}", + array + .as_any() + .downcast_ref::() + .unwrap() + .value_as_datetime(row_index) + .unwrap() + )), _ => s.push('?'), } }