Skip to content

Commit

Permalink
fix: coerce text to timestamps with timezones (#7720)
Browse files Browse the repository at this point in the history
* fix: coerce text to timestamps with timezones

Extend the type coercion logic used with function calls to enable
text respresentaions of timestamps to be coerced to a timestamp
type with a timezone. If there is no other suitable choice the
timezone will be "+00" (UTC).

The specific use case for this change is to allow the third (offset)
parameter of the date_bin function to be specified as a constant
srting when the input array for the second parameter has a specitied
time zone.

* review comments

Create a named constant for the placeholder timezone as suggested
in the review.
  • Loading branch information
mhilton committed Oct 2, 2023
1 parent 642e5a4 commit 9225ce8
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 37 deletions.
63 changes: 48 additions & 15 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Built-in functions module contains all the built-in functions definitions.

use crate::nullif::SUPPORTED_NULLIF_TYPES;
use crate::type_coercion::functions::data_types;
use crate::type_coercion::functions::{data_types, TIMEZONE_PLACEHOLDER};
use crate::{
conditional_expressions, struct_expressions, utils, Signature, TypeSignature,
Volatility,
Expand Down Expand Up @@ -1020,13 +1020,25 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::DateTrunc => Signature::one_of(
vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8, Timestamp(Nanosecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8, Timestamp(Microsecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8, Timestamp(Millisecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![Utf8, Timestamp(Second, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_PLACEHOLDER.into())),
]),
],
self.volatility(),
),
Expand All @@ -1040,8 +1052,11 @@ impl BuiltinScalarFunction {
]),
Exact(vec![
Interval(MonthDayNano),
Timestamp(array_type.clone(), Some("+TZ".into())),
Timestamp(Nanosecond, Some("+TZ".into())),
Timestamp(
array_type.clone(),
Some(TIMEZONE_PLACEHOLDER.into()),
),
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![
Interval(DayTime),
Expand All @@ -1050,24 +1065,30 @@ impl BuiltinScalarFunction {
]),
Exact(vec![
Interval(DayTime),
Timestamp(array_type.clone(), Some("+TZ".into())),
Timestamp(Nanosecond, Some("+TZ".into())),
Timestamp(
array_type.clone(),
Some(TIMEZONE_PLACEHOLDER.into()),
),
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![
Interval(MonthDayNano),
Timestamp(array_type.clone(), None),
]),
Exact(vec![
Interval(MonthDayNano),
Timestamp(array_type.clone(), Some("+TZ".into())),
Timestamp(
array_type.clone(),
Some(TIMEZONE_PLACEHOLDER.into()),
),
]),
Exact(vec![
Interval(DayTime),
Timestamp(array_type.clone(), None),
]),
Exact(vec![
Interval(DayTime),
Timestamp(array_type, Some("+TZ".into())),
Timestamp(array_type, Some(TIMEZONE_PLACEHOLDER.into())),
]),
]
};
Expand All @@ -1085,13 +1106,25 @@ impl BuiltinScalarFunction {
Exact(vec![Utf8, Date32]),
Exact(vec![Utf8, Date64]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![Utf8, Timestamp(Second, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8, Timestamp(Microsecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8, Timestamp(Millisecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8, Timestamp(Nanosecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
],
self.volatility(),
),
Expand Down
86 changes: 64 additions & 22 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ use arrow::{
};
use datafusion_common::{plan_err, DataFusionError, Result};

/// Constant that is used as a placeholder for any valid timezone.
/// This is used where a function can accept a timestamp type with any
/// valid timezone, it exists to avoid the need to enumerate all possible
/// timezones.
///
/// Type coercion always ensures that functions will be executed using
/// timestamp arrays that have a valid time zone. Functions must never
/// return results with this timezone.
pub(crate) const TIMEZONE_PLACEHOLDER: &str = "+TZ";

/// Performs type coercion for function arguments.
///
/// Returns the data types to which each argument must be coerced to
Expand Down Expand Up @@ -121,7 +131,7 @@ fn maybe_data_types(
} else {
// attempt to coerce
if let Some(valid_type) = coerced_from(valid_type, current_type) {
new_type.push(valid_type.clone())
new_type.push(valid_type)
} else {
// not possible
return None;
Expand All @@ -140,37 +150,41 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool {
return true;
}
if let Some(coerced) = coerced_from(type_into, type_from) {
return coerced == type_into;
return coerced == *type_into;
}
false
}

fn coerced_from<'a>(
type_into: &'a DataType,
type_from: &'a DataType,
) -> Option<&'a DataType> {
) -> Option<DataType> {
use self::DataType::*;

match type_into {
// coerced into type_into
Int8 if matches!(type_from, Null | Int8) => Some(type_into),
Int16 if matches!(type_from, Null | Int8 | Int16 | UInt8) => Some(type_into),
Int8 if matches!(type_from, Null | Int8) => Some(type_into.clone()),
Int16 if matches!(type_from, Null | Int8 | Int16 | UInt8) => {
Some(type_into.clone())
}
Int32 if matches!(type_from, Null | Int8 | Int16 | Int32 | UInt8 | UInt16) => {
Some(type_into)
Some(type_into.clone())
}
Int64
if matches!(
type_from,
Null | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32
) =>
{
Some(type_into)
Some(type_into.clone())
}
UInt8 if matches!(type_from, Null | UInt8) => Some(type_into.clone()),
UInt16 if matches!(type_from, Null | UInt8 | UInt16) => Some(type_into.clone()),
UInt32 if matches!(type_from, Null | UInt8 | UInt16 | UInt32) => {
Some(type_into.clone())
}
UInt8 if matches!(type_from, Null | UInt8) => Some(type_into),
UInt16 if matches!(type_from, Null | UInt8 | UInt16) => Some(type_into),
UInt32 if matches!(type_from, Null | UInt8 | UInt16 | UInt32) => Some(type_into),
UInt64 if matches!(type_from, Null | UInt8 | UInt16 | UInt32 | UInt64) => {
Some(type_into)
Some(type_into.clone())
}
Float32
if matches!(
Expand All @@ -186,7 +200,7 @@ fn coerced_from<'a>(
| Float32
) =>
{
Some(type_into)
Some(type_into.clone())
}
Float64
if matches!(
Expand All @@ -204,25 +218,39 @@ fn coerced_from<'a>(
| Decimal128(_, _)
) =>
{
Some(type_into)
Some(type_into.clone())
}
Timestamp(TimeUnit::Nanosecond, None)
if matches!(
type_from,
Null | Timestamp(_, None) | Date32 | Utf8 | LargeUtf8
) =>
{
Some(type_into)
Some(type_into.clone())
}
Interval(_) if matches!(type_from, Utf8 | LargeUtf8) => Some(type_into),
Utf8 | LargeUtf8 => Some(type_into),
Null if can_cast_types(type_from, type_into) => Some(type_into),
Interval(_) if matches!(type_from, Utf8 | LargeUtf8) => Some(type_into.clone()),
Utf8 | LargeUtf8 => Some(type_into.clone()),
Null if can_cast_types(type_from, type_into) => Some(type_into.clone()),

// Coerce to consistent timezones, if the `type_from` timezone exists.
Timestamp(TimeUnit::Nanosecond, Some(_))
if matches!(type_from, Timestamp(TimeUnit::Nanosecond, Some(_))) =>
Timestamp(unit, Some(tz)) if tz.as_ref() == TIMEZONE_PLACEHOLDER => {
match type_from {
Timestamp(_, Some(from_tz)) => {
Some(Timestamp(unit.clone(), Some(from_tz.clone())))
}
Null | Date32 | Utf8 | LargeUtf8 => {
// In the absence of any other information assume the time zone is "+00" (UTC).
Some(Timestamp(unit.clone(), Some("+00".into())))
}
_ => None,
}
}
Timestamp(_, Some(_))
if matches!(
type_from,
Null | Timestamp(_, Some(_)) | Date32 | Utf8 | LargeUtf8
) =>
{
Some(type_from)
Some(type_into.clone())
}

// cannot coerce
Expand All @@ -233,7 +261,7 @@ fn coerced_from<'a>(
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, TimeUnit};

#[test]
fn test_maybe_data_types() {
Expand Down Expand Up @@ -265,6 +293,20 @@ mod tests {
vec![DataType::Boolean, DataType::UInt16],
Some(vec![DataType::Boolean, DataType::UInt32]),
),
// UTF8 -> Timestamp
(
vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+TZ".into())),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+01".into())),
],
vec![DataType::Utf8, DataType::Utf8, DataType::Utf8],
Some(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00".into())),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+01".into())),
]),
),
];

for case in cases {
Expand Down
24 changes: 24 additions & 0 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,30 @@ SELECT date_bin('1 day', TIMESTAMPTZ '2022-01-01 01:10:00+07', TIMESTAMPTZ '2020
----
2021-12-31T00:00:00Z

# postgresql: 2021-12-31 00:00:00+00
query P
SELECT date_bin('1 day', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01')
----
2021-12-31T00:00:00Z

# postgresql: 2021-12-31 00:00:00+00
query P
SELECT date_bin('1 day', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01T00:00:00Z')
----
2021-12-31T00:00:00Z

# postgresql: 2021-12-31 18:00:00+00
query P
SELECT date_bin('2 hour', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01')
----
2021-12-31T18:00:00Z

# postgresql: 2021-12-31 18:00:00+00
query P
SELECT date_bin('2 hour', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01T00:00:00Z')
----
2021-12-31T18:00:00Z

# postgresql: 1
query R
SELECT date_part('hour', TIMESTAMPTZ '2000-01-01T01:01:01') as part
Expand Down

0 comments on commit 9225ce8

Please sign in to comment.