Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ license: |

## Upgrading from Spark SQL 3.1 to 3.2

- Since Spark 3.2, all the supported JDBC dialects use StringType for ROWID. In Spark 3.1 or earlier, Oracle dialect uses StringType and the other dialects use LongType.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dialects is an internal word? If so, how about saying "Since Spark 3.2, `java.sql.ROWID` is mapped to `StringType` when reading data from other databases via JDBC"?

Copy link
Member Author

@sarutak sarutak Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, I use dialects here as a general word, not represents specific implementations like PostgresDialect.
dialect and dialects have been used from before in the migration guide.


- In Spark 3.2, PostgreSQL JDBC dialect uses StringType for MONEY and MONEY[] is not supported due to the JDBC driver for PostgreSQL can't handle those types properly. In Spark 3.1 or earlier, DoubleType and ArrayType of DoubleType are used respectively.

- In Spark 3.2, `spark.sql.adaptive.enabled` is enabled by default. To restore the behavior before Spark 3.2, you can set `spark.sql.adaptive.enabled` to `false`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ object JdbcUtils extends Logging {
case java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.REF_CURSOR => null
case java.sql.Types.ROWID => LongType
case java.sql.Types.ROWID => StringType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically we can't assume the row ID is <= 8 bytes? if that's true then I agree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only we can't assume the length of the ROWID but also it's not required to be represented as integer.
JDBC RowId declares getBytes and toString to represent ROWID so I think we can safely map ROWID to StringType.
https://docs.oracle.com/javase/8/docs/api/java/sql/RowId.html

case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
Expand Down Expand Up @@ -310,11 +310,15 @@ object JdbcUtils extends Logging {
val metadata = new MetadataBuilder()
metadata.putLong("scale", fieldScale)

// SPARK-33888
// - include TIME type metadata
// - always build the metadata
if (dataType == java.sql.Types.TIME) {
metadata.putBoolean("logical_time_type", true)
dataType match {
case java.sql.Types.TIME =>
// SPARK-33888
// - include TIME type metadata
// - always build the metadata
metadata.putBoolean("logical_time_type", true)
case java.sql.Types.ROWID =>
metadata.putBoolean("rowid", true)
case _ =>
}

val columnType =
Expand Down Expand Up @@ -448,6 +452,10 @@ object JdbcUtils extends Logging {
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.setByte(pos, rs.getByte(pos + 1))

case StringType if metadata.contains("rowid") =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, UTF8String.fromString(rs.getRowId(pos + 1).toString))

case StringType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
// TODO(davies): use getBytes for better performance, if the encoding is UTF-8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ private case object OracleDialect extends JdbcDialect {
=> Some(TimestampType) // Value for Timestamp with Time Zone in Oracle
case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT
case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE
// scalastyle:off line.size.limit
// According to the documentation for Oracle Database 19c:
// "Values of the ROWID pseudocolumn are strings representing the address of each row."
// https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/Data-Types.html#GUID-AEF1FE4C-2DE5-4BE7-BB53-83AD8F1E34EF
// scalastyle:on line.size.limit
case Types.ROWID => Some(StringType)
case _ => None
}
}
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import java.util.{Calendar, GregorianCalendar, Properties}
import scala.collection.JavaConverters._

import org.h2.jdbc.JdbcSQLException
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkException
Expand Down Expand Up @@ -1781,4 +1783,28 @@ class JDBCSuite extends QueryTest
assert(options.asProperties.get("url") == url)
assert(options.asProperties.get("dbtable") == "table3")
}

test("SPARK-34379: Map JDBC RowID to StringType rather than LongType") {
val mockRsmd = mock(classOf[java.sql.ResultSetMetaData])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

h2 database cannot generate rowid-typed data?

Copy link
Member Author

@sarutak sarutak Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot.
H2 has _rowid_ as a hidden column but it's not compatible with JDBC ROWID.
_rowid_ is represented as long and H2 doesn't support getRowId.
https://github.com/h2database/h2database/blob/6290b79a2418189c5faa0e0506bf6503fc7630e6/h2/src/main/org/h2/jdbc/JdbcResultSet.java#L3292

when(mockRsmd.getColumnCount).thenReturn(1)
when(mockRsmd.getColumnLabel(anyInt())).thenReturn("rowid")
when(mockRsmd.getColumnType(anyInt())).thenReturn(java.sql.Types.ROWID)
when(mockRsmd.getColumnTypeName(anyInt())).thenReturn("rowid")
when(mockRsmd.getPrecision(anyInt())).thenReturn(0)
when(mockRsmd.getScale(anyInt())).thenReturn(0)
when(mockRsmd.isSigned(anyInt())).thenReturn(false)
when(mockRsmd.isNullable(anyInt())).thenReturn(java.sql.ResultSetMetaData.columnNoNulls)

val mockRs = mock(classOf[java.sql.ResultSet])
when(mockRs.getMetaData).thenReturn(mockRsmd)

val mockDialect = mock(classOf[JdbcDialect])
when(mockDialect.getCatalystType(anyInt(), anyString(), anyInt(), any[MetadataBuilder]))
.thenReturn(None)

val schema = JdbcUtils.getSchema(mockRs, mockDialect)
val fields = schema.fields
assert(fields.length === 1)
assert(fields(0).dataType === StringType)
}
}