Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.expressions.postgreSQL.PostgreCastToBoolean
import org.apache.spark.sql.catalyst.expressions.postgreSQL._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, StringType}
import org.apache.spark.sql.types.{BooleanType, TimestampType}

object PostgreSQLDialect {
val postgreSQLDialectRules: List[Rule[LogicalPlan]] =
CastToBoolean ::
CastToBoolean :: CastToTimestamp ::
Nil

object CastToBoolean extends Rule[LogicalPlan] with Logging {
Expand All @@ -46,4 +46,19 @@ object PostgreSQLDialect {
}
}
}

object CastToTimestamp extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
val conf = SQLConf.get
if (conf.usePostgreSQLDialect) {
plan.transformExpressions {
case Cast(child, dataType, timeZoneId)
if dataType == TimestampType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave cast timestamp to timestamp case to Optimizer to do optimization.

Copy link
Member

@Ngone51 Ngone51 Nov 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. Here, I mean that we should change the if condition to: if child.dataType != TimestampType && dataType == TimestampType =>. Because Optimizer, currently, can not optimize Pg cast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got this. I will update this.

PostgreCastToTimestamp(child, timeZoneId)
}
} else {
plan
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
}

// TimestampConverter
private[this] def castToTimestamp(from: DataType): Any => Any = from match {
protected[this] def castToTimestamp(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, zoneId).orNull)
case BooleanType =>
Expand Down Expand Up @@ -1159,7 +1159,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
}
}

private[this] def castToTimestampCode(
protected[this] def castToTimestampCode(
from: DataType,
ctx: CodegenContext): CastFunction = from match {
case StringType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,52 @@
*/
package org.apache.spark.sql.catalyst.expressions.postgreSQL

import java.time.ZoneId

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{CastBase, Expression, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, JavaCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.postgreSQL.StringUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

case class PostgreCastToBoolean(child: Expression, timeZoneId: Option[String])
extends CastBase {
abstract class PostgreCastBase(toType: DataType) extends CastBase {

override protected def ansiEnabled =
throw new UnsupportedOperationException("PostgreSQL dialect doesn't support ansi mode")
def fromTypes: TypeCollection

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))
override def dataType: DataType = toType

override protected def ansiEnabled: Boolean =
throw new UnsupportedOperationException("PostgreSQL dialect doesn't support ansi mode")

override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
case StringType | IntegerType | NullType =>
override def checkInputDataTypes(): TypeCheckResult = {
if (!fromTypes.acceptsType(child.dataType)) {
TypeCheckResult.TypeCheckFailure(
s"cannot cast type ${child.dataType.simpleString} to ${toType.simpleString}")
} else {
TypeCheckResult.TypeCheckSuccess
case _ =>
TypeCheckResult.TypeCheckFailure(s"cannot cast type ${child.dataType} to boolean")
}
}

override def nullable: Boolean = child.nullable

override def sql: String = s"CAST(${child.sql} AS ${toType.sql})"

override def toString: String =
s"PostgreCastTo${toType.simpleString}($child as ${toType.simpleString})"
}

case class PostgreCastToBoolean(child: Expression, timeZoneId: Option[String])
extends PostgreCastBase(BooleanType) {

override def fromTypes: TypeCollection = TypeCollection(StringType, IntegerType, NullType)

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def castToBoolean(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, str => {
Expand All @@ -58,7 +81,7 @@ case class PostgreCastToBoolean(child: Expression, timeZoneId: Option[String])
override def castToBooleanCode(from: DataType): CastFunction = from match {
case StringType =>
val stringUtils = inline"${StringUtils.getClass.getName.stripSuffix("$")}"
(c, evPrim, evNull) =>
(c, evPrim, _) =>
code"""
if ($stringUtils.isTrueString($c.trim().toLowerCase())) {
$evPrim = true;
Expand All @@ -68,16 +91,47 @@ case class PostgreCastToBoolean(child: Expression, timeZoneId: Option[String])
throw new IllegalArgumentException("invalid input syntax for type boolean: $c");
}
"""

case IntegerType =>
super.castToBooleanCode(from)
}
}

override def dataType: DataType = BooleanType
case class PostgreCastToTimestamp(child: Expression, timeZoneId: Option[String])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, we need to define a new rule and a new cast expr for each Pg cast pattern? I mean we cannot define all the Pg cast patterns in a single rule and a cast expr? cc: @cloud-fan @Ngone51

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can and should combine them into a single one(both rule and expression) when more types get in. Just like the original Cast does. But I'm not sure where shall we start. Maybe, this one ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I personally think so. @cloud-fan

extends PostgreCastBase(TimestampType) {

override def nullable: Boolean = child.nullable
override def fromTypes: TypeCollection = TypeCollection(StringType, DateType, NullType)

override def toString: String = s"PostgreCastToBoolean($child as ${dataType.simpleString})"
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def sql: String = s"CAST(${child.sql} AS ${dataType.sql})"
override def castToTimestamp(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, zoneId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that postgre could correctly parse string 19700101, 1970/01/01, January 1 04:05:06 1970 PST while spark can't. So, I think that we may also need to support it in PostgreCastToTimestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. I will check this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postgres# select cast('19700101' as timestamp);
01.01.1970 00:00:00
postgres# select cast('1970/01/01' as timestamp);
01.01.1970 00:00:00
postgres# select cast('January 1 04:05:06 1970 PST' as timestamp);
01.01.1970 04:05:06

Spark results with NULL for all of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu kindly review latest changes and give your feedback on supporting above queries.

Do we need to support them in this PR? If yes, we need to list all formats for timestamps which postgres supports but spark don't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think that the support above is not a main issue of this pr, so better to separate the two work: the timestamp cast support and the timestamp format support for the pg dialect.

.getOrElse(throw new AnalysisException(s"invalid input syntax for type timestamp:$utfs")))
case DateType =>
super.castToTimestamp(from)
}

override def castToTimestampCode(
from: DataType,
ctx: CodegenContext): CastFunction = from match {
case StringType =>
val zoneIdClass = classOf[ZoneId]
val zid = JavaCode.global(
ctx.addReferenceObj("zoneId", zoneId, zoneIdClass.getName),
zoneIdClass)
val longOpt = ctx.freshVariable("longOpt", classOf[Option[Long]])
(c, evPrim, _) =>
code"""
scala.Option<Long> $longOpt =
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $zid);
if ($longOpt.isDefined()) {
$evPrim = ((Long) $longOpt.get()).longValue();
} else {
throw new AnalysisException(s"invalid input syntax for type timestamp:$c");
}
"""
case DateType =>
super.castToTimestampCode(from, ctx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,14 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
assert(PostgreCastToBoolean(Literal(1.toDouble), None).checkInputDataTypes().isFailure)
assert(PostgreCastToBoolean(Literal(1.toFloat), None).checkInputDataTypes().isFailure)
}

test("unsupported data types to cast to tiestamp") {
assert(PostgreCastToTimestamp(Literal(1), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toByte), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toDouble), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toFloat), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toLong), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(1.toShort), None).checkInputDataTypes().isFailure)
assert(PostgreCastToTimestamp(Literal(BigDecimal(1.0)), None).checkInputDataTypes().isFailure)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,15 @@ class PostgreSQLDialectQuerySuite extends QueryTest with SharedSparkSession {
intercept[IllegalArgumentException](sql(s"select cast('$input' as boolean)").collect())
}
}

test("cast to timestamp") {
Seq(1, 0.1, 1.toDouble, 5.toFloat, true, 3.toByte, 4.toShort) foreach { value =>
val actualResult = intercept[AnalysisException](
sql(s"SELECT CAST(${value} AS timestamp)")
).getMessage
val expectedResult = s"cannot cast type ${value.getClass} to timestamp"
assert(actualResult.contains(expectedResult))
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move these tests to SQLQueryTestSuite,e.g., input/postgreSQL/cast.sql?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved these test cases. cast.sql.out needs to be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to delete this test case.

}