diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index ba40488f4028c..dfac547d7bb35 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -494,10 +494,10 @@ fn typechecked_scalar_value_conversion( ScalarValue::Date32(Some(*v)) } (Value::TimeMicrosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => { - ScalarValue::TimestampMicrosecond(Some(*v)) + ScalarValue::TimestampMicrosecond(Some(*v), None) } (Value::TimeNanosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => { - ScalarValue::TimestampNanosecond(Some(*v)) + ScalarValue::TimestampNanosecond(Some(*v), None) } (Value::Utf8Value(v), PrimitiveScalarType::Utf8) => { ScalarValue::Utf8(Some(v.to_owned())) @@ -530,10 +530,10 @@ fn typechecked_scalar_value_conversion( PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None), PrimitiveScalarType::Date32 => ScalarValue::Date32(None), PrimitiveScalarType::TimeMicrosecond => { - ScalarValue::TimestampMicrosecond(None) + ScalarValue::TimestampMicrosecond(None, None) } PrimitiveScalarType::TimeNanosecond => { - ScalarValue::TimestampNanosecond(None) + ScalarValue::TimestampNanosecond(None, None) } PrimitiveScalarType::Null => { return Err(proto_error( @@ -593,10 +593,10 @@ impl TryInto for &protobuf::scalar_value::Value ScalarValue::Date32(Some(*v)) } protobuf::scalar_value::Value::TimeMicrosecondValue(v) => { - ScalarValue::TimestampMicrosecond(Some(*v)) + ScalarValue::TimestampMicrosecond(Some(*v), None) } protobuf::scalar_value::Value::TimeNanosecondValue(v) => { - ScalarValue::TimestampNanosecond(Some(*v)) + ScalarValue::TimestampNanosecond(Some(*v), None) } protobuf::scalar_value::Value::ListValue(v) => v.try_into()?, protobuf::scalar_value::Value::NullListValue(v) => { @@ -758,10 +758,10 @@ impl TryInto for protobuf::PrimitiveScalarType protobuf::PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None), protobuf::PrimitiveScalarType::Date32 => ScalarValue::Date32(None), protobuf::PrimitiveScalarType::TimeMicrosecond => { - ScalarValue::TimestampMicrosecond(None) + ScalarValue::TimestampMicrosecond(None, None) } protobuf::PrimitiveScalarType::TimeNanosecond => { - ScalarValue::TimestampNanosecond(None) + ScalarValue::TimestampNanosecond(None, None) } }) } @@ -811,10 +811,10 @@ impl TryInto for &protobuf::ScalarValue { ScalarValue::Date32(Some(*v)) } protobuf::scalar_value::Value::TimeMicrosecondValue(v) => { - ScalarValue::TimestampMicrosecond(Some(*v)) + ScalarValue::TimestampMicrosecond(Some(*v), None) } protobuf::scalar_value::Value::TimeNanosecondValue(v) => { - ScalarValue::TimestampNanosecond(Some(*v)) + ScalarValue::TimestampNanosecond(Some(*v), None) } protobuf::scalar_value::Value::ListValue(scalar_list) => { let protobuf::ScalarListValue { diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index a5e2aa0e98c60..a0f481a803258 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -216,8 +216,8 @@ mod roundtrip_tests { ScalarValue::LargeUtf8(None), ScalarValue::List(None, Box::new(DataType::Boolean)), ScalarValue::Date32(None), - ScalarValue::TimestampMicrosecond(None), - ScalarValue::TimestampNanosecond(None), + ScalarValue::TimestampMicrosecond(None, None), + ScalarValue::TimestampNanosecond(None, None), ScalarValue::Boolean(Some(true)), ScalarValue::Boolean(Some(false)), ScalarValue::Float32(Some(1.0)), @@ -256,11 +256,11 @@ mod roundtrip_tests { ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))), ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(i32::MAX)), - ScalarValue::TimestampNanosecond(Some(0)), - ScalarValue::TimestampNanosecond(Some(i64::MAX)), - ScalarValue::TimestampMicrosecond(Some(0)), - ScalarValue::TimestampMicrosecond(Some(i64::MAX)), - ScalarValue::TimestampMicrosecond(None), + ScalarValue::TimestampNanosecond(Some(0), None), + ScalarValue::TimestampNanosecond(Some(i64::MAX), None), + ScalarValue::TimestampMicrosecond(Some(0), None), + ScalarValue::TimestampMicrosecond(Some(i64::MAX), None), + ScalarValue::TimestampMicrosecond(None, None), ScalarValue::List( Some(Box::new(vec![ ScalarValue::Float32(Some(-213.1)), @@ -619,8 +619,8 @@ mod roundtrip_tests { ScalarValue::Utf8(None), ScalarValue::LargeUtf8(None), ScalarValue::Date32(None), - ScalarValue::TimestampMicrosecond(None), - ScalarValue::TimestampNanosecond(None), + ScalarValue::TimestampMicrosecond(None, None), + ScalarValue::TimestampNanosecond(None, None), //ScalarValue::List(None, DataType::Boolean) ]; diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 68ed7097632f1..47b5df47cd730 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -652,12 +652,12 @@ impl TryFrom<&datafusion::scalar::ScalarValue> for protobuf::ScalarValue { datafusion::scalar::ScalarValue::Date32(val) => { create_proto_scalar(val, PrimitiveScalarType::Date32, |s| Value::Date32Value(*s)) } - datafusion::scalar::ScalarValue::TimestampMicrosecond(val) => { + datafusion::scalar::ScalarValue::TimestampMicrosecond(val, _) => { create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| { Value::TimeMicrosecondValue(*s) }) } - datafusion::scalar::ScalarValue::TimestampNanosecond(val) => { + datafusion::scalar::ScalarValue::TimestampNanosecond(val, _) => { create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| { Value::TimeNanosecondValue(*s) }) diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index bcdfae7f4d8ec..fc862cd9ae376 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -1478,9 +1478,10 @@ macro_rules! make_timestamp_literal { #[doc = $DOC] impl TimestampLiteral for $TYPE { fn lit_timestamp_nano(&self) -> Expr { - Expr::Literal(ScalarValue::TimestampNanosecond(Some( - (self.clone()).into(), - ))) + Expr::Literal(ScalarValue::TimestampNanosecond( + Some((self.clone()).into()), + None, + )) } } }; @@ -2048,7 +2049,8 @@ mod tests { #[test] fn test_lit_timestamp_nano() { let expr = col("time").eq(lit_timestamp_nano(10)); // 10 is an implicit i32 - let expected = col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10)))); + let expected = + col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10), None))); assert_eq!(expr, expected); let i: i64 = 10; diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index 0ca9212cf6571..ff2c05c76f18c 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -1703,7 +1703,7 @@ mod tests { .build() .unwrap(); - let expected = "Projection: TimestampNanosecond(1599566400000000000) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\ + let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\ \n TableScan: test projection=None" .to_string(); let actual = get_optimized_plan_formatted(&plan, &Utc::now()); @@ -1780,7 +1780,7 @@ mod tests { // expect the same timestamp appears in both exprs let actual = get_optimized_plan_formatted(&plan, &time); let expected = format!( - "Projection: TimestampNanosecond({}) AS now(), TimestampNanosecond({}) AS t2\ + "Projection: TimestampNanosecond({}, Some(\"UTC\")) AS now(), TimestampNanosecond({}, Some(\"UTC\")) AS t2\ \n TableScan: test projection=None", time.timestamp_nanos(), time.timestamp_nanos() diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index d10312798d3ff..6af2f66a6086a 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -181,6 +181,7 @@ pub fn make_now( move |_arg| { Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( now_ts, + Some("UTC".to_owned()), ))) } } @@ -240,8 +241,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result { let f = |x: Option| x.map(|x| date_trunc_single(granularity, x)).transpose(); Ok(match array { - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => { - ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?)) + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => { + ColumnarValue::Scalar(ScalarValue::TimestampNanosecond( + (f)(*v)?, + tz_opt.clone(), + )) } ColumnarValue::Array(array) => { let array = array diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index d8bae7d1794a6..bd593fd6ecb5d 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -329,16 +329,16 @@ macro_rules! binary_array_op_scalar { DataType::Float32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array), DataType::Float64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array), DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray), - DataType::Timestamp(TimeUnit::Nanosecond, None) => { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray) } - DataType::Timestamp(TimeUnit::Microsecond, None) => { + DataType::Timestamp(TimeUnit::Microsecond, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray) } - DataType::Timestamp(TimeUnit::Millisecond, None) => { + DataType::Timestamp(TimeUnit::Millisecond, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray) } - DataType::Timestamp(TimeUnit::Second, None) => { + DataType::Timestamp(TimeUnit::Second, _) => { compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray) } DataType::Date32 => { @@ -374,16 +374,16 @@ macro_rules! binary_array_op { DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array), DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array), DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray), - DataType::Timestamp(TimeUnit::Nanosecond, None) => { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray) } - DataType::Timestamp(TimeUnit::Microsecond, None) => { + DataType::Timestamp(TimeUnit::Microsecond, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray) } - DataType::Timestamp(TimeUnit::Millisecond, None) => { + DataType::Timestamp(TimeUnit::Millisecond, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray) } - DataType::Timestamp(TimeUnit::Second, None) => { + DataType::Timestamp(TimeUnit::Second, _) => { compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray) } DataType::Date32 => { @@ -541,12 +541,14 @@ fn common_binary_type( // re-write the error message of failed coercions to include the operator's information match result { - None => Err(DataFusionError::Plan( + None => { + Err(DataFusionError::Plan( format!( "'{:?} {} {:?}' can't be evaluated because there isn't a common type to coerce the types to", lhs_type, op, rhs_type ), - )), + )) + }, Some(t) => Ok(t) } } diff --git a/datafusion/src/physical_plan/expressions/coercion.rs b/datafusion/src/physical_plan/expressions/coercion.rs index 180b16548b32b..a449a8d129b42 100644 --- a/datafusion/src/physical_plan/expressions/coercion.rs +++ b/datafusion/src/physical_plan/expressions/coercion.rs @@ -100,11 +100,48 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { use arrow::datatypes::DataType::*; + use arrow::datatypes::TimeUnit; match (lhs_type, rhs_type) { (Utf8, Date32) => Some(Date32), (Date32, Utf8) => Some(Date32), (Utf8, Date64) => Some(Date64), (Date64, Utf8) => Some(Date64), + (Timestamp(lhs_unit, lhs_tz), Timestamp(rhs_unit, rhs_tz)) => { + let tz = match (lhs_tz, rhs_tz) { + // can't cast across timezones + (Some(lhs_tz), Some(rhs_tz)) => { + if lhs_tz != rhs_tz { + return None; + } else { + Some(lhs_tz.clone()) + } + } + (Some(lhs_tz), None) => Some(lhs_tz.clone()), + (None, Some(rhs_tz)) => Some(rhs_tz.clone()), + (None, None) => None, + }; + + let unit = match (lhs_unit, rhs_unit) { + (TimeUnit::Second, TimeUnit::Millisecond) => TimeUnit::Second, + (TimeUnit::Second, TimeUnit::Microsecond) => TimeUnit::Second, + (TimeUnit::Second, TimeUnit::Nanosecond) => TimeUnit::Second, + (TimeUnit::Millisecond, TimeUnit::Second) => TimeUnit::Second, + (TimeUnit::Millisecond, TimeUnit::Microsecond) => TimeUnit::Millisecond, + (TimeUnit::Millisecond, TimeUnit::Nanosecond) => TimeUnit::Millisecond, + (TimeUnit::Microsecond, TimeUnit::Second) => TimeUnit::Second, + (TimeUnit::Microsecond, TimeUnit::Millisecond) => TimeUnit::Millisecond, + (TimeUnit::Microsecond, TimeUnit::Nanosecond) => TimeUnit::Microsecond, + (TimeUnit::Nanosecond, TimeUnit::Second) => TimeUnit::Second, + (TimeUnit::Nanosecond, TimeUnit::Millisecond) => TimeUnit::Millisecond, + (TimeUnit::Nanosecond, TimeUnit::Microsecond) => TimeUnit::Microsecond, + (l, r) => { + assert_eq!(l, r); + l.clone() + } + }; + + Some(Timestamp(unit, tz)) + } _ => None, } } diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 2f61881696545..8f6cd45b193a7 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -129,6 +129,12 @@ macro_rules! typed_min_max_batch { let value = compute::$OP(array); ScalarValue::$SCALAR(value) }}; + + ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident, $TZ:expr) => {{ + let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); + let value = compute::$OP(array); + ScalarValue::$SCALAR(value, $TZ.clone()) + }}; } // TODO implement this in arrow-rs with simd @@ -189,26 +195,35 @@ macro_rules! min_max_batch { DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP), DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP), DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP), - DataType::Timestamp(TimeUnit::Second, _) => { - typed_min_max_batch!($VALUES, TimestampSecondArray, TimestampSecond, $OP) + DataType::Timestamp(TimeUnit::Second, tz_opt) => { + typed_min_max_batch!( + $VALUES, + TimestampSecondArray, + TimestampSecond, + $OP, + tz_opt + ) } - DataType::Timestamp(TimeUnit::Millisecond, _) => typed_min_max_batch!( + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!( $VALUES, TimestampMillisecondArray, TimestampMillisecond, - $OP + $OP, + tz_opt ), - DataType::Timestamp(TimeUnit::Microsecond, _) => typed_min_max_batch!( + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!( $VALUES, TimestampMicrosecondArray, TimestampMicrosecond, - $OP + $OP, + tz_opt ), - DataType::Timestamp(TimeUnit::Nanosecond, _) => typed_min_max_batch!( + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!( $VALUES, TimestampNanosecondArray, TimestampNanosecond, - $OP + $OP, + tz_opt ), DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP), DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP), @@ -273,6 +288,18 @@ macro_rules! typed_min_max { (Some(a), Some(b)) => Some((*a).$OP(*b)), }) }}; + + ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident, $TZ:expr) => {{ + ScalarValue::$SCALAR( + match ($VALUE, $DELTA) { + (None, None) => None, + (Some(a), None) => Some(a.clone()), + (None, Some(b)) => Some(b.clone()), + (Some(a), Some(b)) => Some((*a).$OP(*b)), + }, + $TZ.clone(), + ) + }}; } // min/max of two scalar string values. @@ -337,26 +364,26 @@ macro_rules! min_max { (ScalarValue::LargeUtf8(lhs), ScalarValue::LargeUtf8(rhs)) => { typed_min_max_string!(lhs, rhs, LargeUtf8, $OP) } - (ScalarValue::TimestampSecond(lhs), ScalarValue::TimestampSecond(rhs)) => { - typed_min_max!(lhs, rhs, TimestampSecond, $OP) + (ScalarValue::TimestampSecond(lhs, l_tz), ScalarValue::TimestampSecond(rhs, _)) => { + typed_min_max!(lhs, rhs, TimestampSecond, $OP, l_tz) } ( - ScalarValue::TimestampMillisecond(lhs), - ScalarValue::TimestampMillisecond(rhs), + ScalarValue::TimestampMillisecond(lhs, l_tz), + ScalarValue::TimestampMillisecond(rhs, _), ) => { - typed_min_max!(lhs, rhs, TimestampMillisecond, $OP) + typed_min_max!(lhs, rhs, TimestampMillisecond, $OP, l_tz) } ( - ScalarValue::TimestampMicrosecond(lhs), - ScalarValue::TimestampMicrosecond(rhs), + ScalarValue::TimestampMicrosecond(lhs, l_tz), + ScalarValue::TimestampMicrosecond(rhs, _), ) => { - typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP) + typed_min_max!(lhs, rhs, TimestampMicrosecond, $OP, l_tz) } ( - ScalarValue::TimestampNanosecond(lhs), - ScalarValue::TimestampNanosecond(rhs), + ScalarValue::TimestampNanosecond(lhs, l_tz), + ScalarValue::TimestampNanosecond(rhs, _), ) => { - typed_min_max!(lhs, rhs, TimestampNanosecond, $OP) + typed_min_max!(lhs, rhs, TimestampNanosecond, $OP, l_tz) } ( ScalarValue::Date32(lhs), diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 9c59b9662daac..df073b62c5b78 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -612,7 +612,10 @@ pub fn return_type( BuiltinScalarFunction::ToTimestampSeconds => { Ok(DataType::Timestamp(TimeUnit::Second, None)) } - BuiltinScalarFunction::Now => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), + BuiltinScalarFunction::Now => Ok(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("UTC".to_owned()), + )), BuiltinScalarFunction::Translate => { utf8_to_str_type(&input_expr_types[0], "translate") } diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index fbd0c9716e406..25d1f3fdd85c3 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -369,7 +369,7 @@ pub fn create_hashes<'a>( multi_col ); } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { + DataType::Timestamp(TimeUnit::Nanosecond, _) => { hash_array_primitive!( TimestampNanosecondArray, col, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 1302369886f86..6d913ac0f27c0 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -632,6 +632,7 @@ impl DefaultPhysicalPlanner { let physical_input = self.create_initial_plan(input, ctx_state).await?; let input_schema = physical_input.as_ref().schema(); let input_dfschema = input.as_ref().schema(); + let runtime_expr = self.create_physical_expr( predicate, input_dfschema, diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index 5eb29bbd01f9f..e8898c1557a8a 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -29,7 +29,7 @@ use crate::physical_plan::{ }; pub use arrow::compute::SortOptions; use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, error::ArrowError}; @@ -201,6 +201,15 @@ fn sort_batch( None, )?; + let schema = Arc::new(Schema::new( + schema + .fields() + .iter() + .zip(batch.columns().iter().map(|col| col.data_type())) + .map(|(field, ty)| Field::new(field.name(), ty.clone(), field.is_nullable())) + .collect::>(), + )); + // reorder all rows based on sorted indices RecordBatch::try_new( schema, diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index e9eafe1c109cd..ca0ffd190a1a7 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -20,6 +20,7 @@ use crate::error::{DataFusionError, Result}; use arrow::{ array::*, + compute::kernels::cast::cast, datatypes::{ ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit, @@ -77,13 +78,13 @@ pub enum ScalarValue { /// Date stored as a signed 64bit int Date64(Option), /// Timestamp Second - TimestampSecond(Option), + TimestampSecond(Option, Option), /// Timestamp Milliseconds - TimestampMillisecond(Option), + TimestampMillisecond(Option, Option), /// Timestamp Microseconds - TimestampMicrosecond(Option), + TimestampMicrosecond(Option, Option), /// Timestamp Nanoseconds - TimestampNanosecond(Option), + TimestampNanosecond(Option, Option), /// Interval with YearMonth unit IntervalYearMonth(Option), /// Interval with DayTime unit @@ -150,14 +151,14 @@ impl PartialEq for ScalarValue { (Date32(_), _) => false, (Date64(v1), Date64(v2)) => v1.eq(v2), (Date64(_), _) => false, - (TimestampSecond(v1), TimestampSecond(v2)) => v1.eq(v2), - (TimestampSecond(_), _) => false, - (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.eq(v2), - (TimestampMillisecond(_), _) => false, - (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.eq(v2), - (TimestampMicrosecond(_), _) => false, - (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.eq(v2), - (TimestampNanosecond(_), _) => false, + (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.eq(v2), + (TimestampSecond(_, _), _) => false, + (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.eq(v2), + (TimestampMillisecond(_, _), _) => false, + (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.eq(v2), + (TimestampMicrosecond(_, _), _) => false, + (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.eq(v2), + (TimestampNanosecond(_, _), _) => false, (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.eq(v2), (IntervalYearMonth(_), _) => false, (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.eq(v2), @@ -236,14 +237,20 @@ impl PartialOrd for ScalarValue { (Date32(_), _) => None, (Date64(v1), Date64(v2)) => v1.partial_cmp(v2), (Date64(_), _) => None, - (TimestampSecond(v1), TimestampSecond(v2)) => v1.partial_cmp(v2), - (TimestampSecond(_), _) => None, - (TimestampMillisecond(v1), TimestampMillisecond(v2)) => v1.partial_cmp(v2), - (TimestampMillisecond(_), _) => None, - (TimestampMicrosecond(v1), TimestampMicrosecond(v2)) => v1.partial_cmp(v2), - (TimestampMicrosecond(_), _) => None, - (TimestampNanosecond(v1), TimestampNanosecond(v2)) => v1.partial_cmp(v2), - (TimestampNanosecond(_), _) => None, + (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.partial_cmp(v2), + (TimestampSecond(_, _), _) => None, + (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => { + v1.partial_cmp(v2) + } + (TimestampMillisecond(_, _), _) => None, + (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => { + v1.partial_cmp(v2) + } + (TimestampMicrosecond(_, _), _) => None, + (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => { + v1.partial_cmp(v2) + } + (TimestampNanosecond(_, _), _) => None, (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.partial_cmp(v2), (IntervalYearMonth(_), _) => None, (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.partial_cmp(v2), @@ -300,10 +307,10 @@ impl std::hash::Hash for ScalarValue { } Date32(v) => v.hash(state), Date64(v) => v.hash(state), - TimestampSecond(v) => v.hash(state), - TimestampMillisecond(v) => v.hash(state), - TimestampMicrosecond(v) => v.hash(state), - TimestampNanosecond(v) => v.hash(state), + TimestampSecond(v, _) => v.hash(state), + TimestampMillisecond(v, _) => v.hash(state), + TimestampMicrosecond(v, _) => v.hash(state), + TimestampNanosecond(v, _) => v.hash(state), IntervalYearMonth(v) => v.hash(state), IntervalDayTime(v) => v.hash(state), Struct(v, t) => { @@ -339,6 +346,19 @@ fn get_dict_value( Ok((dict_array.values(), Some(values_index))) } +macro_rules! typed_cast_tz { + ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident, $TZ:expr) => {{ + let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); + ScalarValue::$SCALAR( + match array.is_null($index) { + true => None, + false => Some(array.value($index).into()), + }, + $TZ.clone(), + ) + }}; +} + macro_rules! typed_cast { ($array:expr, $index:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{ let array = $array.as_any().downcast_ref::<$ARRAYTYPE>().unwrap(); @@ -387,25 +407,25 @@ macro_rules! build_timestamp_list { Some(values) => { let values = values.as_ref(); match $TIME_UNIT { - TimeUnit::Second => build_values_list!( + TimeUnit::Second => build_values_list_tz!( TimestampSecondBuilder, TimestampSecond, values, $SIZE ), - TimeUnit::Microsecond => build_values_list!( + TimeUnit::Microsecond => build_values_list_tz!( TimestampMillisecondBuilder, TimestampMillisecond, values, $SIZE ), - TimeUnit::Millisecond => build_values_list!( + TimeUnit::Millisecond => build_values_list_tz!( TimestampMicrosecondBuilder, TimestampMicrosecond, values, $SIZE ), - TimeUnit::Nanosecond => build_values_list!( + TimeUnit::Nanosecond => build_values_list_tz!( TimestampNanosecondBuilder, TimestampNanosecond, values, @@ -440,6 +460,29 @@ macro_rules! build_values_list { }}; } +macro_rules! build_values_list_tz { + ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{ + let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len())); + + for _ in 0..$SIZE { + for scalar_value in $VALUES { + match scalar_value { + ScalarValue::$SCALAR_TY(Some(v), _) => { + builder.values().append_value(v.clone()).unwrap() + } + ScalarValue::$SCALAR_TY(None, _) => { + builder.values().append_null().unwrap(); + } + _ => panic!("Incompatible ScalarValue for list"), + }; + } + builder.append(true).unwrap(); + } + + builder.finish() + }}; +} + macro_rules! build_array_from_option { ($DATA_TYPE:ident, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{ match $EXPR { @@ -455,7 +498,12 @@ macro_rules! build_array_from_option { }}; ($DATA_TYPE:ident, $ENUM:expr, $ENUM2:expr, $ARRAY_TYPE:ident, $EXPR:expr, $SIZE:expr) => {{ match $EXPR { - Some(value) => Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)), + Some(value) => { + let array: ArrayRef = Arc::new($ARRAY_TYPE::from_value(*value, $SIZE)); + // Need to call cast to cast to final data type with timezone/extra param + cast(&array, &DataType::$DATA_TYPE($ENUM, $ENUM2)) + .expect("cannot do temporal cast") + } None => new_null_array(&DataType::$DATA_TYPE($ENUM, $ENUM2), $SIZE), } }}; @@ -504,17 +552,17 @@ impl ScalarValue { ScalarValue::Decimal128(_, precision, scale) => { DataType::Decimal(*precision, *scale) } - ScalarValue::TimestampSecond(_) => { - DataType::Timestamp(TimeUnit::Second, None) + ScalarValue::TimestampSecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Second, tz_opt.clone()) } - ScalarValue::TimestampMillisecond(_) => { - DataType::Timestamp(TimeUnit::Millisecond, None) + ScalarValue::TimestampMillisecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Millisecond, tz_opt.clone()) } - ScalarValue::TimestampMicrosecond(_) => { - DataType::Timestamp(TimeUnit::Microsecond, None) + ScalarValue::TimestampMicrosecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Microsecond, tz_opt.clone()) } - ScalarValue::TimestampNanosecond(_) => { - DataType::Timestamp(TimeUnit::Nanosecond, None) + ScalarValue::TimestampNanosecond(_, tz_opt) => { + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone()) } ScalarValue::Float32(_) => DataType::Float32, ScalarValue::Float64(_) => DataType::Float64, @@ -579,9 +627,10 @@ impl ScalarValue { | ScalarValue::Utf8(None) | ScalarValue::LargeUtf8(None) | ScalarValue::List(None, _) - | ScalarValue::TimestampMillisecond(None) - | ScalarValue::TimestampMicrosecond(None) - | ScalarValue::TimestampNanosecond(None) + | ScalarValue::TimestampSecond(None, _) + | ScalarValue::TimestampMillisecond(None, _) + | ScalarValue::TimestampMicrosecond(None, _) + | ScalarValue::TimestampNanosecond(None, _) | ScalarValue::Struct(None, _) | ScalarValue::Decimal128(None, _, _) // For decimal type, the value is null means ScalarValue::Decimal128 is null. ) @@ -662,6 +711,28 @@ impl ScalarValue { }}; } + macro_rules! build_array_primitive_tz { + ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{ + { + let array = scalars + .map(|sv| { + if let ScalarValue::$SCALAR_TY(v, _) = sv { + Ok(v) + } else { + Err(DataFusionError::Internal(format!( + "Inconsistent types in ScalarValue::iter_to_array. \ + Expected {:?}, got {:?}", + data_type, sv + ))) + } + }) + .collect::>()?; + + Arc::new(array) + } + }}; + } + /// Creates an array of $ARRAY_TY by unpacking values of /// SCALAR_TY for "string-like" types. macro_rules! build_array_string { @@ -771,17 +842,17 @@ impl ScalarValue { DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary), DataType::Date32 => build_array_primitive!(Date32Array, Date32), DataType::Date64 => build_array_primitive!(Date64Array, Date64), - DataType::Timestamp(TimeUnit::Second, None) => { - build_array_primitive!(TimestampSecondArray, TimestampSecond) + DataType::Timestamp(TimeUnit::Second, _) => { + build_array_primitive_tz!(TimestampSecondArray, TimestampSecond) } - DataType::Timestamp(TimeUnit::Millisecond, None) => { - build_array_primitive!(TimestampMillisecondArray, TimestampMillisecond) + DataType::Timestamp(TimeUnit::Millisecond, _) => { + build_array_primitive_tz!(TimestampMillisecondArray, TimestampMillisecond) } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - build_array_primitive!(TimestampMicrosecondArray, TimestampMicrosecond) + DataType::Timestamp(TimeUnit::Microsecond, _) => { + build_array_primitive_tz!(TimestampMicrosecondArray, TimestampMicrosecond) } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - build_array_primitive!(TimestampNanosecondArray, TimestampNanosecond) + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + build_array_primitive_tz!(TimestampNanosecondArray, TimestampNanosecond) } DataType::Interval(IntervalUnit::DayTime) => { build_array_primitive!(IntervalDayTimeArray, IntervalDayTime) @@ -1032,35 +1103,35 @@ impl ScalarValue { ScalarValue::UInt64(e) => { build_array_from_option!(UInt64, UInt64Array, e, size) } - ScalarValue::TimestampSecond(e) => build_array_from_option!( + ScalarValue::TimestampSecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Second, - None, + tz_opt.clone(), TimestampSecondArray, e, size ), - ScalarValue::TimestampMillisecond(e) => build_array_from_option!( + ScalarValue::TimestampMillisecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Millisecond, - None, + tz_opt.clone(), TimestampMillisecondArray, e, size ), - ScalarValue::TimestampMicrosecond(e) => build_array_from_option!( + ScalarValue::TimestampMicrosecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Microsecond, - None, + tz_opt.clone(), TimestampMicrosecondArray, e, size ), - ScalarValue::TimestampNanosecond(e) => build_array_from_option!( + ScalarValue::TimestampNanosecond(e, tz_opt) => build_array_from_option!( Timestamp, TimeUnit::Nanosecond, - None, + tz_opt.clone(), TimestampNanosecondArray, e, size @@ -1247,27 +1318,41 @@ impl ScalarValue { DataType::Date64 => { typed_cast!(array, index, Date64Array, Date64) } - DataType::Timestamp(TimeUnit::Second, _) => { - typed_cast!(array, index, TimestampSecondArray, TimestampSecond) + DataType::Timestamp(TimeUnit::Second, tz_opt) => { + typed_cast_tz!( + array, + index, + TimestampSecondArray, + TimestampSecond, + tz_opt + ) } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - typed_cast!( + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => { + typed_cast_tz!( array, index, TimestampMillisecondArray, - TimestampMillisecond + TimestampMillisecond, + tz_opt ) } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - typed_cast!( + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => { + typed_cast_tz!( array, index, TimestampMicrosecondArray, - TimestampMicrosecond + TimestampMicrosecond, + tz_opt ) } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - typed_cast!(array, index, TimestampNanosecondArray, TimestampNanosecond) + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { + typed_cast_tz!( + array, + index, + TimestampNanosecondArray, + TimestampNanosecond, + tz_opt + ) } DataType::Dictionary(index_type, _) => { let (values, values_index) = match **index_type { @@ -1403,16 +1488,16 @@ impl ScalarValue { ScalarValue::Date64(val) => { eq_array_primitive!(array, index, Date64Array, val) } - ScalarValue::TimestampSecond(val) => { + ScalarValue::TimestampSecond(val, _) => { eq_array_primitive!(array, index, TimestampSecondArray, val) } - ScalarValue::TimestampMillisecond(val) => { + ScalarValue::TimestampMillisecond(val, _) => { eq_array_primitive!(array, index, TimestampMillisecondArray, val) } - ScalarValue::TimestampMicrosecond(val) => { + ScalarValue::TimestampMicrosecond(val, _) => { eq_array_primitive!(array, index, TimestampMicrosecondArray, val) } - ScalarValue::TimestampNanosecond(val) => { + ScalarValue::TimestampNanosecond(val, _) => { eq_array_primitive!(array, index, TimestampNanosecondArray, val) } ScalarValue::IntervalYearMonth(val) => { @@ -1561,10 +1646,10 @@ impl TryFrom for i64 { match value { ScalarValue::Int64(Some(inner_value)) | ScalarValue::Date64(Some(inner_value)) - | ScalarValue::TimestampNanosecond(Some(inner_value)) - | ScalarValue::TimestampMicrosecond(Some(inner_value)) - | ScalarValue::TimestampMillisecond(Some(inner_value)) - | ScalarValue::TimestampSecond(Some(inner_value)) => Ok(inner_value), + | ScalarValue::TimestampNanosecond(Some(inner_value), _) + | ScalarValue::TimestampMicrosecond(Some(inner_value), _) + | ScalarValue::TimestampMillisecond(Some(inner_value), _) + | ScalarValue::TimestampSecond(Some(inner_value), _) => Ok(inner_value), _ => Err(DataFusionError::Internal(format!( "Cannot convert {:?} to {}", value, @@ -1606,17 +1691,17 @@ impl TryFrom<&DataType> for ScalarValue { DataType::LargeUtf8 => ScalarValue::LargeUtf8(None), DataType::Date32 => ScalarValue::Date32(None), DataType::Date64 => ScalarValue::Date64(None), - DataType::Timestamp(TimeUnit::Second, _) => { - ScalarValue::TimestampSecond(None) + DataType::Timestamp(TimeUnit::Second, tz_opt) => { + ScalarValue::TimestampSecond(None, tz_opt.clone()) } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - ScalarValue::TimestampMillisecond(None) + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => { + ScalarValue::TimestampMillisecond(None, tz_opt.clone()) } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - ScalarValue::TimestampMicrosecond(None) + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => { + ScalarValue::TimestampMicrosecond(None, tz_opt.clone()) } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - ScalarValue::TimestampNanosecond(None) + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => { + ScalarValue::TimestampNanosecond(None, tz_opt.clone()) } DataType::Dictionary(_index_type, value_type) => { value_type.as_ref().try_into()? @@ -1663,10 +1748,10 @@ impl fmt::Display for ScalarValue { ScalarValue::UInt16(e) => format_option!(f, e)?, ScalarValue::UInt32(e) => format_option!(f, e)?, ScalarValue::UInt64(e) => format_option!(f, e)?, - ScalarValue::TimestampSecond(e) => format_option!(f, e)?, - ScalarValue::TimestampMillisecond(e) => format_option!(f, e)?, - ScalarValue::TimestampMicrosecond(e) => format_option!(f, e)?, - ScalarValue::TimestampNanosecond(e) => format_option!(f, e)?, + ScalarValue::TimestampSecond(e, _) => format_option!(f, e)?, + ScalarValue::TimestampMillisecond(e, _) => format_option!(f, e)?, + ScalarValue::TimestampMicrosecond(e, _) => format_option!(f, e)?, + ScalarValue::TimestampNanosecond(e, _) => format_option!(f, e)?, ScalarValue::Utf8(e) => format_option!(f, e)?, ScalarValue::LargeUtf8(e) => format_option!(f, e)?, ScalarValue::Binary(e) => match e { @@ -1738,15 +1823,17 @@ impl fmt::Debug for ScalarValue { ScalarValue::UInt16(_) => write!(f, "UInt16({})", self), ScalarValue::UInt32(_) => write!(f, "UInt32({})", self), ScalarValue::UInt64(_) => write!(f, "UInt64({})", self), - ScalarValue::TimestampSecond(_) => write!(f, "TimestampSecond({})", self), - ScalarValue::TimestampMillisecond(_) => { - write!(f, "TimestampMillisecond({})", self) + ScalarValue::TimestampSecond(_, tz_opt) => { + write!(f, "TimestampSecond({}, {:?})", self, tz_opt) } - ScalarValue::TimestampMicrosecond(_) => { - write!(f, "TimestampMicrosecond({})", self) + ScalarValue::TimestampMillisecond(_, tz_opt) => { + write!(f, "TimestampMillisecond({}, {:?})", self, tz_opt) } - ScalarValue::TimestampNanosecond(_) => { - write!(f, "TimestampNanosecond({})", self) + ScalarValue::TimestampMicrosecond(_, tz_opt) => { + write!(f, "TimestampMicrosecond({}, {:?})", self, tz_opt) + } + ScalarValue::TimestampNanosecond(_, tz_opt) => { + write!(f, "TimestampNanosecond({}, {:?})", self, tz_opt) } ScalarValue::Utf8(None) => write!(f, "Utf8({})", self), ScalarValue::Utf8(Some(_)) => write!(f, "Utf8(\"{}\")", self), @@ -1798,25 +1885,25 @@ impl ScalarType for Float32Type { impl ScalarType for TimestampSecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampSecond(r) + ScalarValue::TimestampSecond(r, None) } } impl ScalarType for TimestampMillisecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampMillisecond(r) + ScalarValue::TimestampMillisecond(r, None) } } impl ScalarType for TimestampMicrosecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampMicrosecond(r) + ScalarValue::TimestampMicrosecond(r, None) } } impl ScalarType for TimestampNanosecondType { fn scalar(r: Option) -> ScalarValue { - ScalarValue::TimestampNanosecond(r) + ScalarValue::TimestampNanosecond(r, None) } } @@ -2003,6 +2090,23 @@ mod tests { }}; } + /// Creates array directly and via ScalarValue and ensures they are the same + /// but for variants that carry a timezone field. + macro_rules! check_scalar_iter_tz { + ($SCALAR_T:ident, $ARRAYTYPE:ident, $INPUT:expr) => {{ + let scalars: Vec<_> = $INPUT + .iter() + .map(|v| ScalarValue::$SCALAR_T(*v, None)) + .collect(); + + let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap(); + + let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT)); + + assert_eq!(&array, &expected); + }}; + } + /// Creates array directly and via ScalarValue and ensures they /// are the same, for string arrays macro_rules! check_scalar_iter_string { @@ -2056,22 +2160,22 @@ mod tests { check_scalar_iter!(UInt32, UInt32Array, vec![Some(1), None, Some(3)]); check_scalar_iter!(UInt64, UInt64Array, vec![Some(1), None, Some(3)]); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampSecond, TimestampSecondArray, vec![Some(1), None, Some(3)] ); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampMillisecond, TimestampMillisecondArray, vec![Some(1), None, Some(3)] ); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampMicrosecond, TimestampMicrosecondArray, vec![Some(1), None, Some(3)] ); - check_scalar_iter!( + check_scalar_iter_tz!( TimestampNanosecond, TimestampNanosecondArray, vec![Some(1), None, Some(3)] @@ -2152,6 +2256,10 @@ mod tests { // Since ScalarValues are used in a non trivial number of places, // making it larger means significant more memory consumption // per distinct value. + #[cfg(target_arch = "aarch64")] + assert_eq!(std::mem::size_of::(), 64); + + #[cfg(target_arch = "amd64")] assert_eq!(std::mem::size_of::(), 48); } @@ -2199,6 +2307,17 @@ mod tests { scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(), } }}; + + ($INPUT:expr, $ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{ + let tz = $TZ; + TestCase { + array: Arc::new($INPUT.iter().collect::<$ARRAY_TY>()), + scalars: $INPUT + .iter() + .map(|v| ScalarValue::$SCALAR_TY(*v, tz.clone())) + .collect(), + } + }}; } macro_rules! make_str_test_case { @@ -2263,10 +2382,49 @@ mod tests { make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary), make_test_case!(i32_vals, Date32Array, Date32), make_test_case!(i64_vals, Date64Array, Date64), - make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond), - make_test_case!(i64_vals, TimestampMillisecondArray, TimestampMillisecond), - make_test_case!(i64_vals, TimestampMicrosecondArray, TimestampMicrosecond), - make_test_case!(i64_vals, TimestampNanosecondArray, TimestampNanosecond), + make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond, None), + make_test_case!( + i64_vals, + TimestampSecondArray, + TimestampSecond, + Some("UTC".to_owned()) + ), + make_test_case!( + i64_vals, + TimestampMillisecondArray, + TimestampMillisecond, + None + ), + make_test_case!( + i64_vals, + TimestampMillisecondArray, + TimestampMillisecond, + Some("UTC".to_owned()) + ), + make_test_case!( + i64_vals, + TimestampMicrosecondArray, + TimestampMicrosecond, + None + ), + make_test_case!( + i64_vals, + TimestampMicrosecondArray, + TimestampMicrosecond, + Some("UTC".to_owned()) + ), + make_test_case!( + i64_vals, + TimestampNanosecondArray, + TimestampNanosecond, + None + ), + make_test_case!( + i64_vals, + TimestampNanosecondArray, + TimestampNanosecond, + Some("UTC".to_owned()) + ), make_test_case!(i32_vals, IntervalYearMonthArray, IntervalYearMonth), make_test_case!(i64_vals, IntervalDayTimeArray, IntervalDayTime), make_str_dict_test_case!(str_vals, Int8Type, Utf8), @@ -2893,4 +3051,30 @@ mod tests { assert_eq!(array, &expected); } + + #[test] + fn scalar_timestamp_ns_utc_timezone() { + let scalar = ScalarValue::TimestampNanosecond( + Some(1599566400000000000), + Some("UTC".to_owned()), + ); + + assert_eq!( + scalar.get_datatype(), + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + ); + + let array = scalar.to_array(); + assert_eq!(array.len(), 1); + assert_eq!( + array.data_type(), + &DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + ); + + let newscalar = ScalarValue::try_from_array(&array, 0).unwrap(); + assert_eq!( + newscalar.get_datatype(), + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned())) + ); + } } diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index b72606f137c5a..7c3210dd7599e 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -4104,37 +4104,44 @@ async fn like() -> Result<()> { } fn make_timestamp_table() -> Result> +where + A: ArrowTimestampType, +{ + make_timestamp_tz_table::(None) +} + +fn make_timestamp_tz_table(tz: Option) -> Result> where A: ArrowTimestampType, { let schema = Arc::new(Schema::new(vec![ - Field::new("ts", DataType::Timestamp(A::get_time_unit(), None), false), + Field::new( + "ts", + DataType::Timestamp(A::get_time_unit(), tz.clone()), + false, + ), Field::new("value", DataType::Int32, true), ])); - let mut builder = PrimitiveBuilder::::new(3); - - let nanotimestamps = vec![ - 1599572549190855000i64, // 2020-09-08T13:42:29.190855+00:00 - 1599568949190855000, // 2020-09-08T12:42:29.190855+00:00 - 1599565349190855000, //2020-09-08T11:42:29.190855+00:00 - ]; // 2020-09-08T11:42:29.190855+00:00 let divisor = match A::get_time_unit() { TimeUnit::Nanosecond => 1, TimeUnit::Microsecond => 1000, TimeUnit::Millisecond => 1_000_000, TimeUnit::Second => 1_000_000_000, }; - for ts in nanotimestamps { - builder.append_value( - ::Native::from_i64(ts / divisor).unwrap(), - )?; - } + + let timestamps = vec![ + 1599572549190855000i64 / divisor, // 2020-09-08T13:42:29.190855+00:00 + 1599568949190855000 / divisor, // 2020-09-08T12:42:29.190855+00:00 + 1599565349190855000 / divisor, //2020-09-08T11:42:29.190855+00:00 + ]; // 2020-09-08T11:42:29.190855+00:00 + + let array = PrimitiveArray::::from_vec(timestamps, tz); let data = RecordBatch::try_new( schema.clone(), vec![ - Arc::new(builder.finish()), + Arc::new(array), Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), ], )?; @@ -6615,3 +6622,357 @@ async fn csv_query_with_decimal_by_sql() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn timestamp_minmax() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_tz_table::(None)?; + let table_b = + make_timestamp_tz_table::(Some("UTC".to_owned()))?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT MIN(table_a.ts), MAX(table_b.ts) FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+----------------------------+", + "| MIN(table_a.ts) | MAX(table_b.ts) |", + "+-------------------------+----------------------------+", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 |", + "+-------------------------+----------------------------+", + ]; + assert_batches_eq!(expected, &actual); + + Ok(()) +} + +#[tokio::test] +async fn timestamp_coercion() -> Result<()> { + { + let mut ctx = ExecutionContext::new(); + let table_a = + make_timestamp_tz_table::(Some("UTC".to_owned()))?; + let table_b = + make_timestamp_tz_table::(Some("UTC".to_owned()))?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------------------+-------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+---------------------+-------------------------+--------------------------+", + "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190 | true |", + "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190 | true |", + "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190 | true |", + "+---------------------+-------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+---------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true |", + "+---------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+---------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+---------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29 | 2020-09-08 11:42:29.190855 | true |", + "+---------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+---------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+-------------------------+---------------------+--------------------------+", + "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29 | true |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29 | true |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29 | true |", + "+-------------------------+---------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+-------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true |", + "+-------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+-------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+-------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190 | 2020-09-08 11:42:29.190855 | true |", + "+-------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+---------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+---------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true |", + "+----------------------------+---------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+-------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+-------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true |", + "+----------------------------+-------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true |", + "+----------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+---------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+---------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29 | true |", + "+----------------------------+---------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+-------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+-------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190 | true |", + "+----------------------------+-------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + { + let mut ctx = ExecutionContext::new(); + let table_a = make_timestamp_table::()?; + let table_b = make_timestamp_table::()?; + ctx.register_table("table_a", table_a)?; + ctx.register_table("table_b", table_b)?; + + let sql = "SELECT table_a.ts, table_b.ts, table_a.ts = table_b.ts FROM table_a, table_b"; + let actual = execute_to_batches(&mut ctx, sql).await; + let expected = vec![ + "+----------------------------+----------------------------+--------------------------+", + "| ts | ts | table_a.ts Eq table_b.ts |", + "+----------------------------+----------------------------+--------------------------+", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 13:42:29.190855 | true |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 13:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 12:42:29.190855 | true |", + "| 2020-09-08 12:42:29.190855 | 2020-09-08 11:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 13:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 12:42:29.190855 | false |", + "| 2020-09-08 11:42:29.190855 | 2020-09-08 11:42:29.190855 | true |", + "+----------------------------+----------------------------+--------------------------+", + ]; + assert_batches_eq!(expected, &actual); + } + + Ok(()) +}