Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: coerce text to timestamps with timezones #7720

Merged
merged 2 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Built-in functions module contains all the built-in functions definitions.

use crate::nullif::SUPPORTED_NULLIF_TYPES;
use crate::type_coercion::functions::data_types;
use crate::type_coercion::functions::{data_types, TIMEZONE_PLACEHOLDER};
use crate::{
conditional_expressions, struct_expressions, utils, Signature, TypeSignature,
Volatility,
Expand Down Expand Up @@ -1020,13 +1020,25 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::DateTrunc => Signature::one_of(
vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8, Timestamp(Nanosecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8, Timestamp(Microsecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8, Timestamp(Millisecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![Utf8, Timestamp(Second, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_PLACEHOLDER.into())),
]),
],
self.volatility(),
),
Expand All @@ -1040,8 +1052,11 @@ impl BuiltinScalarFunction {
]),
Exact(vec![
Interval(MonthDayNano),
Timestamp(array_type.clone(), Some("+TZ".into())),
Timestamp(Nanosecond, Some("+TZ".into())),
Timestamp(
array_type.clone(),
Some(TIMEZONE_PLACEHOLDER.into()),
),
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![
Interval(DayTime),
Expand All @@ -1050,24 +1065,30 @@ impl BuiltinScalarFunction {
]),
Exact(vec![
Interval(DayTime),
Timestamp(array_type.clone(), Some("+TZ".into())),
Timestamp(Nanosecond, Some("+TZ".into())),
Timestamp(
array_type.clone(),
Some(TIMEZONE_PLACEHOLDER.into()),
),
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![
Interval(MonthDayNano),
Timestamp(array_type.clone(), None),
]),
Exact(vec![
Interval(MonthDayNano),
Timestamp(array_type.clone(), Some("+TZ".into())),
Timestamp(
array_type.clone(),
Some(TIMEZONE_PLACEHOLDER.into()),
),
]),
Exact(vec![
Interval(DayTime),
Timestamp(array_type.clone(), None),
]),
Exact(vec![
Interval(DayTime),
Timestamp(array_type, Some("+TZ".into())),
Timestamp(array_type, Some(TIMEZONE_PLACEHOLDER.into())),
]),
]
};
Expand All @@ -1085,13 +1106,25 @@ impl BuiltinScalarFunction {
Exact(vec![Utf8, Date32]),
Exact(vec![Utf8, Date64]),
Exact(vec![Utf8, Timestamp(Second, None)]),
Exact(vec![Utf8, Timestamp(Second, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Second, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8, Timestamp(Microsecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Microsecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8, Timestamp(Millisecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Millisecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8, Timestamp(Nanosecond, Some("+TZ".into()))]),
Exact(vec![
Utf8,
Timestamp(Nanosecond, Some(TIMEZONE_PLACEHOLDER.into())),
]),
],
self.volatility(),
),
Expand Down
86 changes: 64 additions & 22 deletions datafusion/expr/src/type_coercion/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ use arrow::{
};
use datafusion_common::{plan_err, DataFusionError, Result};

/// Constant that is used as a placeholder for any valid timezone.
/// This is used where a function can accept a timestamp type with any
/// valid timezone, it exists to avoid the need to enumerate all possible
/// timezones.
///
/// Type coercion always ensures that functions will be executed using
/// timestamp arrays that have a valid time zone. Functions must never
/// return results with this timezone.
pub(crate) const TIMEZONE_PLACEHOLDER: &str = "+TZ";

/// Performs type coercion for function arguments.
///
/// Returns the data types to which each argument must be coerced to
Expand Down Expand Up @@ -121,7 +131,7 @@ fn maybe_data_types(
} else {
// attempt to coerce
if let Some(valid_type) = coerced_from(valid_type, current_type) {
new_type.push(valid_type.clone())
new_type.push(valid_type)
} else {
// not possible
return None;
Expand All @@ -140,37 +150,41 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool {
return true;
}
if let Some(coerced) = coerced_from(type_into, type_from) {
return coerced == type_into;
return coerced == *type_into;
}
false
}

fn coerced_from<'a>(
type_into: &'a DataType,
type_from: &'a DataType,
) -> Option<&'a DataType> {
) -> Option<DataType> {
use self::DataType::*;

match type_into {
// coerced into type_into
Int8 if matches!(type_from, Null | Int8) => Some(type_into),
Int16 if matches!(type_from, Null | Int8 | Int16 | UInt8) => Some(type_into),
Int8 if matches!(type_from, Null | Int8) => Some(type_into.clone()),
Int16 if matches!(type_from, Null | Int8 | Int16 | UInt8) => {
Some(type_into.clone())
}
Int32 if matches!(type_from, Null | Int8 | Int16 | Int32 | UInt8 | UInt16) => {
Some(type_into)
Some(type_into.clone())
}
Int64
if matches!(
type_from,
Null | Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32
) =>
{
Some(type_into)
Some(type_into.clone())
}
UInt8 if matches!(type_from, Null | UInt8) => Some(type_into.clone()),
UInt16 if matches!(type_from, Null | UInt8 | UInt16) => Some(type_into.clone()),
UInt32 if matches!(type_from, Null | UInt8 | UInt16 | UInt32) => {
Some(type_into.clone())
}
UInt8 if matches!(type_from, Null | UInt8) => Some(type_into),
UInt16 if matches!(type_from, Null | UInt8 | UInt16) => Some(type_into),
UInt32 if matches!(type_from, Null | UInt8 | UInt16 | UInt32) => Some(type_into),
UInt64 if matches!(type_from, Null | UInt8 | UInt16 | UInt32 | UInt64) => {
Some(type_into)
Some(type_into.clone())
}
Float32
if matches!(
Expand All @@ -186,7 +200,7 @@ fn coerced_from<'a>(
| Float32
) =>
{
Some(type_into)
Some(type_into.clone())
}
Float64
if matches!(
Expand All @@ -204,25 +218,39 @@ fn coerced_from<'a>(
| Decimal128(_, _)
) =>
{
Some(type_into)
Some(type_into.clone())
}
Timestamp(TimeUnit::Nanosecond, None)
if matches!(
type_from,
Null | Timestamp(_, None) | Date32 | Utf8 | LargeUtf8
) =>
{
Some(type_into)
Some(type_into.clone())
}
Interval(_) if matches!(type_from, Utf8 | LargeUtf8) => Some(type_into),
Utf8 | LargeUtf8 => Some(type_into),
Null if can_cast_types(type_from, type_into) => Some(type_into),
Interval(_) if matches!(type_from, Utf8 | LargeUtf8) => Some(type_into.clone()),
Utf8 | LargeUtf8 => Some(type_into.clone()),
Null if can_cast_types(type_from, type_into) => Some(type_into.clone()),

// Coerce to consistent timezones, if the `type_from` timezone exists.
Timestamp(TimeUnit::Nanosecond, Some(_))
if matches!(type_from, Timestamp(TimeUnit::Nanosecond, Some(_))) =>
Timestamp(unit, Some(tz)) if tz.as_ref() == TIMEZONE_PLACEHOLDER => {
match type_from {
Timestamp(_, Some(from_tz)) => {
Some(Timestamp(unit.clone(), Some(from_tz.clone())))
}
Null | Date32 | Utf8 | LargeUtf8 => {
// In the absence of any other information assume the time zone is "+00" (UTC).
Some(Timestamp(unit.clone(), Some("+00".into())))
}
_ => None,
}
}
Timestamp(_, Some(_))
if matches!(
type_from,
Null | Timestamp(_, Some(_)) | Date32 | Utf8 | LargeUtf8
) =>
{
Some(type_from)
Some(type_into.clone())
}

// cannot coerce
Expand All @@ -233,7 +261,7 @@ fn coerced_from<'a>(
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, TimeUnit};

#[test]
fn test_maybe_data_types() {
Expand Down Expand Up @@ -265,6 +293,20 @@ mod tests {
vec![DataType::Boolean, DataType::UInt16],
Some(vec![DataType::Boolean, DataType::UInt32]),
),
// UTF8 -> Timestamp
(
vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+TZ".into())),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+01".into())),
],
vec![DataType::Utf8, DataType::Utf8, DataType::Utf8],
Some(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00".into())),
DataType::Timestamp(TimeUnit::Nanosecond, Some("+01".into())),
]),
),
];

for case in cases {
Expand Down
24 changes: 24 additions & 0 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,30 @@ SELECT date_bin('1 day', TIMESTAMPTZ '2022-01-01 01:10:00+07', TIMESTAMPTZ '2020
----
2021-12-31T00:00:00Z

# postgresql: 2021-12-31 00:00:00+00
query P
SELECT date_bin('1 day', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01')
----
2021-12-31T00:00:00Z

# postgresql: 2021-12-31 00:00:00+00
query P
SELECT date_bin('1 day', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01T00:00:00Z')
----
2021-12-31T00:00:00Z

# postgresql: 2021-12-31 18:00:00+00
query P
SELECT date_bin('2 hour', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01')
----
2021-12-31T18:00:00Z

# postgresql: 2021-12-31 18:00:00+00
query P
SELECT date_bin('2 hour', TIMESTAMPTZ '2022-01-01 01:10:00+07', '2020-01-01T00:00:00Z')
----
2021-12-31T18:00:00Z

# postgresql: 1
query R
SELECT date_part('hour', TIMESTAMPTZ '2000-01-01T01:01:01') as part
Expand Down