Skip to content
20 changes: 10 additions & 10 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,10 @@ fn typechecked_scalar_value_conversion(
ScalarValue::Date32(Some(*v))
}
(Value::TimeMicrosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => {
ScalarValue::TimestampMicrosecond(Some(*v))
ScalarValue::TimestampMicrosecond(Some(*v), None)
}
(Value::TimeNanosecondValue(v), PrimitiveScalarType::TimeMicrosecond) => {
ScalarValue::TimestampNanosecond(Some(*v))
ScalarValue::TimestampNanosecond(Some(*v), None)
}
(Value::Utf8Value(v), PrimitiveScalarType::Utf8) => {
ScalarValue::Utf8(Some(v.to_owned()))
Expand Down Expand Up @@ -530,10 +530,10 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
PrimitiveScalarType::TimeMicrosecond => {
ScalarValue::TimestampMicrosecond(None)
ScalarValue::TimestampMicrosecond(None, None)
}
PrimitiveScalarType::TimeNanosecond => {
ScalarValue::TimestampNanosecond(None)
ScalarValue::TimestampNanosecond(None, None)
}
PrimitiveScalarType::Null => {
return Err(proto_error(
Expand Down Expand Up @@ -593,10 +593,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::scalar_value::Value
ScalarValue::Date32(Some(*v))
}
protobuf::scalar_value::Value::TimeMicrosecondValue(v) => {
ScalarValue::TimestampMicrosecond(Some(*v))
ScalarValue::TimestampMicrosecond(Some(*v), None)
}
protobuf::scalar_value::Value::TimeNanosecondValue(v) => {
ScalarValue::TimestampNanosecond(Some(*v))
ScalarValue::TimestampNanosecond(Some(*v), None)
}
protobuf::scalar_value::Value::ListValue(v) => v.try_into()?,
protobuf::scalar_value::Value::NullListValue(v) => {
Expand Down Expand Up @@ -758,10 +758,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for protobuf::PrimitiveScalarType
protobuf::PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
protobuf::PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
protobuf::PrimitiveScalarType::TimeMicrosecond => {
ScalarValue::TimestampMicrosecond(None)
ScalarValue::TimestampMicrosecond(None, None)
}
protobuf::PrimitiveScalarType::TimeNanosecond => {
ScalarValue::TimestampNanosecond(None)
ScalarValue::TimestampNanosecond(None, None)
}
})
}
Expand Down Expand Up @@ -811,10 +811,10 @@ impl TryInto<datafusion::scalar::ScalarValue> for &protobuf::ScalarValue {
ScalarValue::Date32(Some(*v))
}
protobuf::scalar_value::Value::TimeMicrosecondValue(v) => {
ScalarValue::TimestampMicrosecond(Some(*v))
ScalarValue::TimestampMicrosecond(Some(*v), None)
}
protobuf::scalar_value::Value::TimeNanosecondValue(v) => {
ScalarValue::TimestampNanosecond(Some(*v))
ScalarValue::TimestampNanosecond(Some(*v), None)
}
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
Expand Down
18 changes: 9 additions & 9 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ mod roundtrip_tests {
ScalarValue::LargeUtf8(None),
ScalarValue::List(None, Box::new(DataType::Boolean)),
ScalarValue::Date32(None),
ScalarValue::TimestampMicrosecond(None),
ScalarValue::TimestampNanosecond(None),
ScalarValue::TimestampMicrosecond(None, None),
ScalarValue::TimestampNanosecond(None, None),
ScalarValue::Boolean(Some(true)),
ScalarValue::Boolean(Some(false)),
ScalarValue::Float32(Some(1.0)),
Expand Down Expand Up @@ -256,11 +256,11 @@ mod roundtrip_tests {
ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))),
ScalarValue::Date32(Some(0)),
ScalarValue::Date32(Some(i32::MAX)),
ScalarValue::TimestampNanosecond(Some(0)),
ScalarValue::TimestampNanosecond(Some(i64::MAX)),
ScalarValue::TimestampMicrosecond(Some(0)),
ScalarValue::TimestampMicrosecond(Some(i64::MAX)),
ScalarValue::TimestampMicrosecond(None),
ScalarValue::TimestampNanosecond(Some(0), None),
ScalarValue::TimestampNanosecond(Some(i64::MAX), None),
ScalarValue::TimestampMicrosecond(Some(0), None),
ScalarValue::TimestampMicrosecond(Some(i64::MAX), None),
ScalarValue::TimestampMicrosecond(None, None),
ScalarValue::List(
Some(Box::new(vec![
ScalarValue::Float32(Some(-213.1)),
Expand Down Expand Up @@ -619,8 +619,8 @@ mod roundtrip_tests {
ScalarValue::Utf8(None),
ScalarValue::LargeUtf8(None),
ScalarValue::Date32(None),
ScalarValue::TimestampMicrosecond(None),
ScalarValue::TimestampNanosecond(None),
ScalarValue::TimestampMicrosecond(None, None),
ScalarValue::TimestampNanosecond(None, None),
//ScalarValue::List(None, DataType::Boolean)
];

Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,12 +652,12 @@ impl TryFrom<&datafusion::scalar::ScalarValue> for protobuf::ScalarValue {
datafusion::scalar::ScalarValue::Date32(val) => {
create_proto_scalar(val, PrimitiveScalarType::Date32, |s| Value::Date32Value(*s))
}
datafusion::scalar::ScalarValue::TimestampMicrosecond(val) => {
datafusion::scalar::ScalarValue::TimestampMicrosecond(val, _) => {
create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| {
Value::TimeMicrosecondValue(*s)
})
}
datafusion::scalar::ScalarValue::TimestampNanosecond(val) => {
datafusion::scalar::ScalarValue::TimestampNanosecond(val, _) => {
create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| {
Value::TimeNanosecondValue(*s)
})
Expand Down
10 changes: 6 additions & 4 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1478,9 +1478,10 @@ macro_rules! make_timestamp_literal {
#[doc = $DOC]
impl TimestampLiteral for $TYPE {
fn lit_timestamp_nano(&self) -> Expr {
Expr::Literal(ScalarValue::TimestampNanosecond(Some(
(self.clone()).into(),
)))
Expr::Literal(ScalarValue::TimestampNanosecond(
Some((self.clone()).into()),
None,
))
}
}
};
Expand Down Expand Up @@ -2048,7 +2049,8 @@ mod tests {
#[test]
fn test_lit_timestamp_nano() {
let expr = col("time").eq(lit_timestamp_nano(10)); // 10 is an implicit i32
let expected = col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10))));
let expected =
col("time").eq(lit(ScalarValue::TimestampNanosecond(Some(10), None)));
assert_eq!(expr, expected);

let i: i64 = 10;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,7 @@ mod tests {
.build()
.unwrap();

let expected = "Projection: TimestampNanosecond(1599566400000000000) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS totimestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
\n TableScan: test projection=None"
.to_string();
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
Expand Down Expand Up @@ -1780,7 +1780,7 @@ mod tests {
// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(&plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}) AS now(), TimestampNanosecond({}) AS t2\
"Projection: TimestampNanosecond({}, Some(\"UTC\")) AS now(), TimestampNanosecond({}, Some(\"UTC\")) AS t2\
\n TableScan: test projection=None",
time.timestamp_nanos(),
time.timestamp_nanos()
Expand Down
8 changes: 6 additions & 2 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ pub fn make_now(
move |_arg| {
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
now_ts,
Some("UTC".to_owned()),
)))
}
}
Expand Down Expand Up @@ -240,8 +241,11 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let f = |x: Option<i64>| x.map(|x| date_trunc_single(granularity, x)).transpose();

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v)) => {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond((f)(*v)?))
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
(f)(*v)?,
tz_opt.clone(),
))
}
ColumnarValue::Array(array) => {
let array = array
Expand Down
22 changes: 12 additions & 10 deletions datafusion/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,16 @@ macro_rules! binary_array_op_scalar {
DataType::Float32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op_scalar!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray),
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
DataType::Timestamp(TimeUnit::Microsecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
DataType::Timestamp(TimeUnit::Millisecond, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
DataType::Timestamp(TimeUnit::Second, None) => {
DataType::Timestamp(TimeUnit::Second, _) => {
compute_op_scalar!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
Expand Down Expand Up @@ -374,16 +374,16 @@ macro_rules! binary_array_op {
DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array),
DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array),
DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray),
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray)
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
DataType::Timestamp(TimeUnit::Microsecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMicrosecondArray)
}
DataType::Timestamp(TimeUnit::Millisecond, None) => {
DataType::Timestamp(TimeUnit::Millisecond, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampMillisecondArray)
}
DataType::Timestamp(TimeUnit::Second, None) => {
DataType::Timestamp(TimeUnit::Second, _) => {
compute_op!($LEFT, $RIGHT, $OP, TimestampSecondArray)
}
DataType::Date32 => {
Expand Down Expand Up @@ -541,12 +541,14 @@ fn common_binary_type(

// re-write the error message of failed coercions to include the operator's information
match result {
None => Err(DataFusionError::Plan(
None => {
Err(DataFusionError::Plan(
format!(
"'{:?} {} {:?}' can't be evaluated because there isn't a common type to coerce the types to",
lhs_type, op, rhs_type
),
)),
))
},
Some(t) => Ok(t)
}
}
Expand Down
37 changes: 37 additions & 0 deletions datafusion/src/physical_plan/expressions/coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,48 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataTyp
/// casted to for the purpose of a date computation
pub fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::DataType::*;
use arrow::datatypes::TimeUnit;
match (lhs_type, rhs_type) {
(Utf8, Date32) => Some(Date32),
(Date32, Utf8) => Some(Date32),
(Utf8, Date64) => Some(Date64),
(Date64, Utf8) => Some(Date64),
(Timestamp(lhs_unit, lhs_tz), Timestamp(rhs_unit, rhs_tz)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let tz = match (lhs_tz, rhs_tz) {
// can't cast across timezones
(Some(lhs_tz), Some(rhs_tz)) => {
if lhs_tz != rhs_tz {
return None;
} else {
Some(lhs_tz.clone())
}
}
(Some(lhs_tz), None) => Some(lhs_tz.clone()),
(None, Some(rhs_tz)) => Some(rhs_tz.clone()),
(None, None) => None,
};

let unit = match (lhs_unit, rhs_unit) {
(TimeUnit::Second, TimeUnit::Millisecond) => TimeUnit::Second,
(TimeUnit::Second, TimeUnit::Microsecond) => TimeUnit::Second,
(TimeUnit::Second, TimeUnit::Nanosecond) => TimeUnit::Second,
(TimeUnit::Millisecond, TimeUnit::Second) => TimeUnit::Second,
(TimeUnit::Millisecond, TimeUnit::Microsecond) => TimeUnit::Millisecond,
(TimeUnit::Millisecond, TimeUnit::Nanosecond) => TimeUnit::Millisecond,
(TimeUnit::Microsecond, TimeUnit::Second) => TimeUnit::Second,
(TimeUnit::Microsecond, TimeUnit::Millisecond) => TimeUnit::Millisecond,
(TimeUnit::Microsecond, TimeUnit::Nanosecond) => TimeUnit::Microsecond,
(TimeUnit::Nanosecond, TimeUnit::Second) => TimeUnit::Second,
(TimeUnit::Nanosecond, TimeUnit::Millisecond) => TimeUnit::Millisecond,
(TimeUnit::Nanosecond, TimeUnit::Microsecond) => TimeUnit::Microsecond,
(l, r) => {
assert_eq!(l, r);
l.clone()
}
Comment on lines +137 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(l, r) => {
assert_eq!(l, r);
l.clone()
}
(l, r) if l == r => {
l.clone()
}
_ => unreachable!()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this but we need an else block in order to make the types align, so this becomes:

(l, r) => if l ==r { l.clone() } else { unreachable!() }

which is semantically identical, but the version with assert_eq! will provide more contextual information when the condition doesn't hold

};

Some(Timestamp(unit, tz))
}
_ => None,
}
}
Expand Down
Loading