From e86094ed0140483815efa612728766817061ad66 Mon Sep 17 00:00:00 2001 From: Hoa Date: Thu, 4 Feb 2021 19:18:18 +0800 Subject: [PATCH 1/3] [SPARK-34357] Fix sql.Time as TimestampType with day part to epoch --- .../datasources/jdbc/JdbcUtils.scala | 44 +++++++++++-------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 24 +++------- 2 files changed, 31 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 25aa00ac86dcf..476771e90616c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, SQLFeatureNotSupportedException} import java.time.{Instant, LocalDate} -import java.util.Locale +import java.util.{Locale, TimeZone} import java.util.concurrent.TimeUnit import scala.util.Try import scala.util.control.NonFatal +import sun.util.calendar.ZoneInfo + import org.apache.spark.TaskContext import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging @@ -230,7 +232,7 @@ object JdbcUtils extends Logging { case java.sql.Types.SMALLINT => IntegerType case java.sql.Types.SQLXML => StringType case java.sql.Types.STRUCT => StringType - case java.sql.Types.TIME => IntegerType + case java.sql.Types.TIME => TimestampType case java.sql.Types.TIME_WITH_TIMEZONE => null case java.sql.Types.TIMESTAMP => TimestampType @@ -421,23 +423,6 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => row.setFloat(pos, rs.getFloat(pos + 1)) - - // SPARK-33888 - sql TIME type represents as physical int in millis - // Represents a time of day, with no reference to a particular calendar, - // time zone or date, with a precision of one millisecond. - // It stores the number of milliseconds after midnight, 00:00:00.000. - case IntegerType if metadata.contains("logical_time_type") => - (rs: ResultSet, row: InternalRow, pos: Int) => { - val rawTime = rs.getTime(pos + 1) - if (rawTime != null) { - val rawTimeInNano = rawTime.toLocalTime().toNanoOfDay() - val timeInMillis = Math.toIntExact(TimeUnit.NANOSECONDS.toMillis(rawTimeInNano)) - row.setInt(pos, timeInMillis) - } else { - row.update(pos, null) - } - } - case IntegerType => (rs: ResultSet, row: InternalRow, pos: Int) => row.setInt(pos, rs.getInt(pos + 1)) @@ -470,6 +455,27 @@ object JdbcUtils extends Logging { // TODO(davies): use getBytes for better performance, if the encoding is UTF-8 row.update(pos, UTF8String.fromString(rs.getString(pos + 1))) + // SPARK-34357 - sql TIME type represents as zero epoch timestamp. + // It is represented as Spark TimestampType but fixed at 1970-01-01 for day, + // time portion is time of day, with no reference to a particular calendar, + // time zone or date, with a precision till microseconds. + // It stores the number of milliseconds after midnight, 00:00:00.000000 + case TimestampType if metadata.contains("logical_time_type") => + (rs: ResultSet, row: InternalRow, pos: Int) => { + val rawTime = rs.getTime(pos + 1) + if (rawTime != null) { + val localTimeMicro = TimeUnit.NANOSECONDS.toMicros(rawTime.toLocalTime().toNanoOfDay()) + val localTimeMillis = DateTimeUtils.microsToMillis(localTimeMicro) + val timeZoneOffset = TimeZone.getDefault match { + case zoneInfo: ZoneInfo => zoneInfo.getOffsetsByWall(localTimeMillis, null) + case timeZone: TimeZone => timeZone.getOffset(localTimeMillis - timeZone.getRawOffset) + } + row.setLong(pos, localTimeMicro - DateTimeUtils.millisToMicros(timeZoneOffset)) + } else { + row.update(pos, null) + } + } + case TimestampType => (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 073d4c5e984fe..cc2721149cffd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -21,7 +21,6 @@ import java.math.BigDecimal import java.sql.{Date, DriverManager, SQLException, Timestamp} import java.time.{Instant, LocalDate} import java.util.{Calendar, GregorianCalendar, Properties} -import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -614,13 +613,7 @@ class JDBCSuite extends QueryTest test("H2 time types") { val rows = sql("SELECT * FROM timetypes").collect() val cal = new GregorianCalendar(java.util.Locale.ROOT) - val epochMillis = java.time.LocalTime.ofNanoOfDay( - TimeUnit.MILLISECONDS.toNanos(rows(0).getAs[Int](0))) - .atDate(java.time.LocalDate.ofEpochDay(0)) - .atZone(java.time.ZoneId.systemDefault()) - .toInstant() - .toEpochMilli() - cal.setTime(new Date(epochMillis)) + cal.setTime(rows(0).getAs[java.sql.Timestamp](0)) assert(cal.get(Calendar.HOUR_OF_DAY) === 12) assert(cal.get(Calendar.MINUTE) === 34) assert(cal.get(Calendar.SECOND) === 56) @@ -639,20 +632,15 @@ class JDBCSuite extends QueryTest assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000) } - test("SPARK-33888: test TIME types") { + test("SPARK-34357: test TIME types") { val rows = spark.read.jdbc( urlWithUserAndPass, "TEST.TIMETYPES", new Properties()).collect() val cachedRows = spark.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties()) .cache().collect() - val expectedTimeRaw = java.sql.Time.valueOf("12:34:56") - val expectedTimeMillis = Math.toIntExact( - java.util.concurrent.TimeUnit.NANOSECONDS.toMillis( - expectedTimeRaw.toLocalTime().toNanoOfDay() - ) - ) - assert(rows(0).getAs[Int](0) === expectedTimeMillis) - assert(rows(1).getAs[Int](0) === expectedTimeMillis) - assert(cachedRows(0).getAs[Int](0) === expectedTimeMillis) + val expectedTimeAtEpoch = java.sql.Timestamp.valueOf("1970-01-01 12:34:56.0") + assert(rows(0).getAs[java.sql.Timestamp](0) === expectedTimeAtEpoch) + assert(rows(1).getAs[java.sql.Timestamp](0) === expectedTimeAtEpoch) + assert(cachedRows(0).getAs[java.sql.Timestamp](0) === expectedTimeAtEpoch) } test("test DATE types") { From 15e980feffe7f3bd03fb6922a87173f19ab08fba Mon Sep 17 00:00:00 2001 From: Hoa Date: Thu, 4 Feb 2021 19:18:40 +0800 Subject: [PATCH 2/3] [SPARK-34357] Update jdbc integration test suite --- .../org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 5 ++--- .../apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala | 5 ++--- .../org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 4 ++-- .../org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 2 +- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala index 6d9c9599ed1d1..62bba797413a1 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala @@ -127,11 +127,10 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types.length == 3) assert(types(0).equals("class java.sql.Date")) - assert(types(1).equals("class java.lang.Integer")) + assert(types(1).equals("class java.sql.Timestamp")) assert(types(2).equals("class java.sql.Timestamp")) assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09"))) - assert( - rows(0).getAs[Integer](1) === Timestamp.valueOf("1970-01-01 13:31:24").getTime) + assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 13:31:24"))) assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("2009-02-13 23:31:30"))) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index cd5358d8a716e..c6e5cd26958ed 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -219,14 +219,13 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(2).equals("class java.sql.Timestamp")) assert(types(3).equals("class java.lang.String")) assert(types(4).equals("class java.sql.Timestamp")) - assert(types(5).equals("class java.lang.Integer")) + assert(types(5).equals("class java.sql.Timestamp")) assert(row.getAs[Date](0).equals(Date.valueOf("1991-11-09"))) assert(row.getAs[Timestamp](1).equals(Timestamp.valueOf("1999-01-01 13:23:35.0"))) assert(row.getAs[Timestamp](2).equals(Timestamp.valueOf("9999-12-31 23:59:59.0"))) assert(row.getString(3).equals("1901-05-09 23:59:59.0000000 +14:00")) assert(row.getAs[Timestamp](4).equals(Timestamp.valueOf("1996-01-01 23:24:00.0"))) - assert( - row.getAs[Integer](5) === Timestamp.valueOf("1970-01-01 13:31:24.0").getTime) + assert(row.getAs[Timestamp](5).equals(Timestamp.valueOf("1970-01-01 13:31:24.0"))) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index 94d182dafef96..5d4da7d7ed794 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -119,13 +119,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types.length == 5) assert(types(0).equals("class java.sql.Date")) - assert(types(1).equals("class java.lang.Integer")) + assert(types(1).equals("class java.sql.Timestamp")) assert(types(2).equals("class java.sql.Timestamp")) assert(types(3).equals("class java.sql.Timestamp")) assert(types(4).equals("class java.sql.Date")) assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09"))) assert( - rows(0).getAs[Integer](1) === Timestamp.valueOf("1970-01-01 13:31:24").getTime) + rows(0).getAs[Timestamp](1) === Timestamp.valueOf("1970-01-01 13:31:24")) assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 01:23:45"))) assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 23:31:30"))) assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01"))) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 5822adf5cd156..0347c98bba2c4 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -181,7 +181,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = dfRead.collect() val types = rows(0).toSeq.map(x => x.getClass.toString) assert(types(1).equals("class java.sql.Timestamp")) - assert(types(2).equals("class java.lang.Integer")) + assert(types(2).equals("class java.sql.Timestamp")) } test("SPARK-22291: Conversion error when transforming array types of " + From 0b4a344674cddedca40790869a739f07208c7be0 Mon Sep 17 00:00:00 2001 From: Hoa Date: Thu, 4 Feb 2021 21:23:08 +0800 Subject: [PATCH 3/3] [SPARK-34357] Use SQLConf sessionLocalTimeZone for offset calculation --- .../execution/datasources/jdbc/JdbcUtils.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 476771e90616c..2ba8bed440581 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -19,14 +19,12 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, SQLFeatureNotSupportedException} import java.time.{Instant, LocalDate} -import java.util.{Locale, TimeZone} +import java.util.Locale import java.util.concurrent.TimeUnit import scala.util.Try import scala.util.control.NonFatal -import sun.util.calendar.ZoneInfo - import org.apache.spark.TaskContext import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging @@ -456,7 +454,7 @@ object JdbcUtils extends Logging { row.update(pos, UTF8String.fromString(rs.getString(pos + 1))) // SPARK-34357 - sql TIME type represents as zero epoch timestamp. - // It is represented as Spark TimestampType but fixed at 1970-01-01 for day, + // It is mapped as Spark TimestampType but fixed at 1970-01-01 for day, // time portion is time of day, with no reference to a particular calendar, // time zone or date, with a precision till microseconds. // It stores the number of milliseconds after midnight, 00:00:00.000000 @@ -464,13 +462,11 @@ object JdbcUtils extends Logging { (rs: ResultSet, row: InternalRow, pos: Int) => { val rawTime = rs.getTime(pos + 1) if (rawTime != null) { - val localTimeMicro = TimeUnit.NANOSECONDS.toMicros(rawTime.toLocalTime().toNanoOfDay()) - val localTimeMillis = DateTimeUtils.microsToMillis(localTimeMicro) - val timeZoneOffset = TimeZone.getDefault match { - case zoneInfo: ZoneInfo => zoneInfo.getOffsetsByWall(localTimeMillis, null) - case timeZone: TimeZone => timeZone.getOffset(localTimeMillis - timeZone.getRawOffset) - } - row.setLong(pos, localTimeMicro - DateTimeUtils.millisToMicros(timeZoneOffset)) + val localTimeMicro = TimeUnit.NANOSECONDS.toMicros( + rawTime.toLocalTime().toNanoOfDay()) + val utcTimeMicro = DateTimeUtils.toUTCTime( + localTimeMicro, SQLConf.get.sessionLocalTimeZone) + row.setLong(pos, utcTimeMicro) } else { row.update(pos, null) }