Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,73 @@ case class ToUTCTimestamp(left: Expression, right: Expression)
}
}

/**
* This modifies a timestamp to show how the display time changes going from one timezone to
* another, for the same instant in time.
*
* We intentionally do not provide an ExpressionDescription as this is not meant to be exposed to
* users, its only used for internal conversions.
*/
private[spark] case class TimestampTimezoneCorrection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a whole expression for this? can't we just reuse existing expressions? It's just simple arithmetics isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you could use ToUTCTimestamp / FromUTCTimestamp for this, but that would be more expensive since you'd be doing the conversion twice for each value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm saying is the analysis rule can just determine the delta, and then just do a simple add/delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, let me see if I can figure that out.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately the offset depends on the actual date, so a timezone conversion can not be simplified to a simple delta.

Daylight saving time starts and ends on different days in different timezones, while some timezones don't have DST changes at all. Additionally, timezone rules have changed over time and keep changing. Both the basic timezone offset and the DST rules themselves could change (and have changed) over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, timezone rules have changed over time and keep changing.

Ah, yes, that makes sense... it's also why my initial tests were failing at the DST boundaries. :-/

time: Expression,
from: Expression,
to: Expression)
extends TernaryExpression with ImplicitCastInputTypes {

// modeled on ToUTCTimestamp + Conv (as an example TernaryExpression)

// convertTz() does the *opposite* conversion we want, which is why from & to appear reversed
// in all the calls to convertTz. Its used for showing how the display time changes when we go
// from one timezone to another. We want to see how should change the SQLTimestamp value to
// ensure the display does *not* change, despite going from one TZ to another.

override def children: Seq[Expression] = Seq(time, from, to)
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType, StringType)
override def dataType: DataType = TimestampType
override def prettyName: String = "timestamp_timezone_correction"

override def nullSafeEval(time: Any, from: Any, to: Any): Any = {
DateTimeUtils.convertTz(
time.asInstanceOf[Long],
to.asInstanceOf[UTF8String].toString(),
from.asInstanceOf[UTF8String].toString())
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
if (from.foldable && to.foldable) {
val fromTz = from.eval()
val toTz = to.eval()
if (fromTz == null || toTz == null) {
ev.copy(code = s"""
|boolean ${ev.isNull} = true;
|long ${ev.value} = 0;
""".stripMargin)
} else {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val eval = time.genCode(ctx)
ev.copy(code = s"""
|${eval.code}
|boolean ${ev.isNull} = ${eval.isNull};
|long ${ev.value} = 0;
|if (!${ev.isNull}) {
| ${ev.value} = $dtu.convertTz(${eval.value}, "$toTz", "$fromTz");
|}
""".stripMargin)
}
} else {
nullSafeCodeGen(ctx, ev, (time, from, to) =>
s"""
|${ev.value} = $dtu.convertTz(
| $time,
| $to.toString(),
| $from.toString());
""".stripMargin
)
}
}
}

/**
* Parses a column to a date based on the given format.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ object DateTimeUtils {
computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
}

lazy val validTimezones = TimeZone.getAvailableIDs().toSet
def isValidTimezone(timezoneId: String): Boolean = {
validTimezones.contains(timezoneId)
}

def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = {
val sdf = new SimpleDateFormat(formatString, Locale.US)
sdf.setTimeZone(timeZone)
Expand Down Expand Up @@ -1015,6 +1020,10 @@ object DateTimeUtils {
guess
}

def convertTz(ts: SQLTimestamp, fromZone: String, toZone: String): SQLTimestamp = {
convertTz(ts, getTimeZone(fromZone), getTimeZone(toZone))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performance is going to suck here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess caching the value as done by the FromUTCTimestamp expression is the right way to go?

}

/**
* Convert the timestamp `ts` from one timezone to another.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,4 +741,26 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
}

test("parquet_timestamp_correction") {
def test(t: String, fromTz: String, toTz: String, expected: String): Unit = {
checkEvaluation(
TimestampTimezoneCorrection(
Literal.create(if (t != null) Timestamp.valueOf(t) else null, TimestampType),
Literal.create(fromTz, StringType),
Literal.create(toTz, StringType)),
if (expected != null) Timestamp.valueOf(expected) else null)
checkEvaluation(
TimestampTimezoneCorrection(
Literal.create(if (t != null) Timestamp.valueOf(t) else null, TimestampType),
NonFoldableLiteral.create(fromTz, StringType),
NonFoldableLiteral.create(toTz, StringType)),
if (expected != null) Timestamp.valueOf(expected) else null)
}
test("2015-07-24 00:00:00", "UTC", "PST", "2015-07-23 17:00:00")
test("2015-01-24 00:00:00", "UTC", "PST", "2015-01-23 16:00:00")
test(null, "UTC", "UTC", null)
test("2015-07-24 00:00:00", null, null, null)
test(null, null, null, null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser}
import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser, TimestampTableTimeZone}
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
Expand Down Expand Up @@ -179,6 +179,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
TimestampTableTimeZone.checkTableTz("", extraOptions.toMap)

sparkSession.baseRelationToDataFrame(
DataSource.apply(
Expand Down
13 changes: 11 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, TimestampTableTimeZone}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -215,6 +215,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
def save(path: String): Unit = {
this.extraOptions += ("path" -> path)
TimestampTableTimeZone.checkTableTz(s"for path $path", extraOptions.toMap)
save()
}

Expand Down Expand Up @@ -266,6 +267,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def insertInto(tableName: String): Unit = {
extraOptions.get(TimestampTableTimeZone.TIMEZONE_PROPERTY).foreach { tz =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't seem to be doing this type of validity check in general; otherwise we'd need to add a lot more checks here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec requests errors when using invalid time zones. I guess this would still fail with a different error in that case, so I can remove this if you're really against adding it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I tried a couple of things, and while it may be possible to remove some of these checks and replace them with a check in DateTimeUtils.computeTimeZone, that doesn't cover all cases. For example, you could use "ALTER TABLE" with an invalid time zone and that wouldn't trigger the check.

So given the spec I'm inclined to leave the checks as is, unless @zivanfi is open to making the spec more lax in that area. (TimeZone.getTimeZone(invalidId) actually returns the UTC time zone, as unexpected as that behavior may be, so things won't necessarily break without the checks.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although other table properties don't have similar checks, their effect is usually easy to see. The effect of this specific table property however is not immediately apparent: for new data it is only revealed in interoperability with other components, and for existing data it should not have any visible effect if set correctly. Therefore we decided it would be best to be very strict in checks, because otherwise a typo in the table property value could only be discovered after some data has already been written with irreversible errors. This was the reasoning behind this part of specs.

throw new AnalysisException("Cannot provide a table timezone on insert; tried to insert " +
s"$tableName with ${TimestampTableTimeZone.TIMEZONE_PROPERTY}=$tz")
}
insertInto(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))
}

Expand Down Expand Up @@ -406,6 +411,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
} else {
CatalogTableType.MANAGED
}
val props =
extraOptions.filterKeys(key => key == TimestampTableTimeZone.TIMEZONE_PROPERTY).toMap
TimestampTableTimeZone.checkTableTz(tableIdent, props)

val tableDesc = CatalogTable(
identifier = tableIdent,
Expand All @@ -414,7 +422,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
schema = new StructType,
provider = Some(source),
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec)
bucketSpec = getBucketSpec,
properties = props)

runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils, TimestampTableTimeZone}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
Expand Down Expand Up @@ -230,6 +230,13 @@ case class AlterTableSetPropertiesCommand(
isView: Boolean)
extends RunnableCommand {

if (isView) {
properties.get(TimestampTableTimeZone.TIMEZONE_PROPERTY).foreach { _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there even a meaning to set properties for any views? we should either drop this check, or have a more general check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveQL explicitly allows properties in view; I've never used them, though.

throw new AnalysisException("Timezone cannot be set for view")
}
}
TimestampTableTimeZone.checkTableTz(tableName, properties)

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils, TimestampTableTimeZone}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -86,7 +86,8 @@ case class CreateTableLikeCommand(
schema = sourceTableDesc.schema,
provider = newProvider,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
bucketSpec = sourceTableDesc.bucketSpec)
bucketSpec = sourceTableDesc.bucketSpec,
properties = sourceTableDesc.properties)

catalog.createTable(newTableDesc, ifNotExists)
Seq.empty[Row]
Expand Down Expand Up @@ -126,6 +127,8 @@ case class CreateTableCommand(
sparkSession.sessionState.catalog.createTable(table, ignoreIfExists)
Seq.empty[Row]
}

TimestampTableTimeZone.checkTableTz(table.identifier, table.properties)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.execution.datasources.TimestampTableTimeZone
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.util.SchemaUtils

Expand Down Expand Up @@ -123,6 +124,10 @@ case class CreateViewCommand(
s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
}

properties.get(TimestampTableTimeZone.TIMEZONE_PROPERTY).foreach { _ =>
throw new AnalysisException("Timezone cannot be set for view")
}

override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
Expand Down
Loading