Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", DoubleType)
expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from DOUBLE to STRING
val msg1 = intercept[AnalysisException] {
Expand All @@ -81,8 +81,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testCreateTableWithProperty(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('CCSID'='UNICODE')")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBC
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {
Expand Down Expand Up @@ -110,8 +110,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest {
override def testCreateTableWithProperty(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", DecimalType(10, 0))
var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
override def testUpdateColumnType(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INTEGER)")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
t = spark.table(tbl)
expectedSchema = new StructType().add("ID", StringType)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg = intercept[AnalysisException] {
Expand All @@ -70,8 +70,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes
override def testCreateTableWithProperty(tbl: String): Unit = {
sql(s"CREATE TABLE $tbl (ID INT)" +
s" TBLPROPERTIES('TABLESPACE'='pg_default')")
var t = spark.table(tbl)
var expectedSchema = new StructType().add("ID", IntegerType)
val t = spark.table(tbl)
val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {

def notSupportsTableComment: Boolean = false

val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()

def testUpdateColumnNullability(tbl: String): Unit = {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL)")
var t = spark.table(s"$catalogName.alt_table")
// nullable is true in the expectedSchema because Spark always sets nullable to true
// regardless of the JDBC metadata https://github.com/apache/spark/pull/18445
var expectedSchema = new StructType().add("ID", StringType, nullable = true)
var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = new StructType().add("ID", StringType, nullable = true)
expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
Expand All @@ -53,8 +55,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {
def testRenameColumn(tbl: String): Unit = {
sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED")
val t = spark.table(s"$tbl")
val expectedSchema = new StructType().add("RENAMED", StringType, nullable = true)
.add("ID1", StringType, nullable = true).add("ID2", StringType, nullable = true)
val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata)
.add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
}

Expand All @@ -64,15 +66,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {
withTable(s"$catalogName.alt_table") {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)")
var t = spark.table(s"$catalogName.alt_table")
var expectedSchema = new StructType().add("ID", StringType)
var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = expectedSchema.add("C1", StringType).add("C2", StringType)
expectedSchema = expectedSchema.add("C1", StringType, true, defaultMetadata)
.add("C2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)")
t = spark.table(s"$catalogName.alt_table")
expectedSchema = expectedSchema.add("C3", StringType)
expectedSchema = expectedSchema.add("C3", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Add already existing column
val msg = intercept[AnalysisException] {
Expand All @@ -93,7 +96,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession {
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1")
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3")
val t = spark.table(s"$catalogName.alt_table")
val expectedSchema = new StructType().add("C2", StringType)
val expectedSchema = new StructType().add("C2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Drop not existing column
val msg = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,15 @@ object JdbcUtils extends Logging {
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
}
val metadata = new MetadataBuilder()
metadata.putLong("scale", fieldScale)

// SPARK-33888
// - include scale in metadata for only DECIMAL & NUMERIC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple change is to always include the scale metadata even if it's 0. I think having extra metadata doesn't hurt, and we can update some tests if needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the first solution is like what you said but I noticed we need to update some tests.
If it's reasonable, I'll do it.

// - include TIME type metadata
// - always build the metadata
dataType match {
// scalastyle:off
case java.sql.Types.NUMERIC => metadata.putLong("scale", fieldScale)
case java.sql.Types.DECIMAL => metadata.putLong("scale", fieldScale)
case java.sql.Types.TIME => metadata.putBoolean("logical_time_type", true)
case _ =>
// scalastyle:on
if (dataType == java.sql.Types.TIME) {
metadata.putBoolean("logical_time_type", true)
}

val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

val tempDir = Utils.createTempDir()
val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()
var conn: java.sql.Connection = null

override def sparkConf: SparkConf = super.sparkConf
Expand Down Expand Up @@ -138,8 +139,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
test("load a table") {
val t = spark.table("h2.test.people")
val expectedSchema = new StructType()
.add("NAME", StringType)
.add("ID", IntegerType)
.add("NAME", StringType, true, defaultMetadata)
.add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
val msg = intercept[AnalysisException] {
Expand Down Expand Up @@ -177,13 +178,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)")
var t = spark.table(tableName)
var expectedSchema = new StructType()
.add("ID", IntegerType)
.add("C1", IntegerType)
.add("C2", StringType)
.add("ID", IntegerType, true, defaultMetadata)
.add("C1", IntegerType, true, defaultMetadata)
.add("C2", StringType, true, defaultMetadata)
assert(t.schema === expectedSchema)
sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)")
t = spark.table(tableName)
expectedSchema = expectedSchema.add("c3", DoubleType)
expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Add already existing column
val msg = intercept[AnalysisException] {
Expand All @@ -207,8 +208,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C")
val t = spark.table(tableName)
val expectedSchema = new StructType()
.add("C", IntegerType)
.add("C0", IntegerType)
.add("C", IntegerType, true, defaultMetadata)
.add("C0", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Rename to already existing column
val msg = intercept[AnalysisException] {
Expand All @@ -232,7 +233,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName DROP COLUMN C1")
sql(s"ALTER TABLE $tableName DROP COLUMN c3")
val t = spark.table(tableName)
val expectedSchema = new StructType().add("C2", IntegerType)
val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Drop not existing column
val msg = intercept[AnalysisException] {
Expand All @@ -256,7 +257,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName ALTER COLUMN id TYPE DOUBLE")
sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE")
val t = spark.table(tableName)
val expectedSchema = new StructType().add("ID", DoubleType).add("deptno", DoubleType)
val expectedSchema = new StructType()
.add("ID", DoubleType, true, defaultMetadata)
.add("deptno", DoubleType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update not existing column
val msg1 = intercept[AnalysisException] {
Expand Down Expand Up @@ -286,7 +289,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL")
val t = spark.table(tableName)
val expectedSchema = new StructType()
.add("ID", IntegerType, nullable = true).add("deptno", IntegerType, nullable = true)
.add("ID", IntegerType, true, defaultMetadata)
.add("deptno", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
Expand Down Expand Up @@ -332,7 +336,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
withTable(tableName) {
sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)")
var t = spark.table(tableName)
var expectedSchema = new StructType().add("c1", IntegerType).add("c2", IntegerType)
var expectedSchema = new StructType()
.add("c1", IntegerType, true, defaultMetadata)
.add("c2", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
Expand All @@ -344,7 +350,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
expectedSchema = new StructType().add("c1", IntegerType).add("c3", IntegerType)
expectedSchema = new StructType()
.add("c1", IntegerType, true, defaultMetadata)
.add("c3", IntegerType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
Expand All @@ -358,7 +366,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName DROP COLUMN C3")
expectedSchema = new StructType().add("c1", IntegerType)
expectedSchema = new StructType().add("c1", IntegerType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
Expand All @@ -372,7 +380,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
expectedSchema = new StructType().add("c1", DoubleType)
expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
Expand All @@ -386,7 +394,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
expectedSchema = new StructType().add("c1", DoubleType, nullable = true)
expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata)
t = spark.table(tableName)
assert(t.schema === expectedSchema)
}
Expand Down
18 changes: 12 additions & 6 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class JDBCSuite extends QueryTest
}
}

val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()

override def beforeAll(): Unit = {
super.beforeAll()
Utils.classForName("org.h2.Driver")
Expand Down Expand Up @@ -1252,8 +1254,8 @@ class JDBCSuite extends QueryTest
}

test("SPARK-16848: jdbc API throws an exception for user specified schema") {
val schema = StructType(Seq(
StructField("name", StringType, false), StructField("theid", IntegerType, false)))
val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata),
StructField("theid", IntegerType, false, defaultMetadata)))
val parts = Array[String]("THEID < 2", "THEID >= 2")
val e1 = intercept[AnalysisException] {
spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties())
Expand All @@ -1273,7 +1275,9 @@ class JDBCSuite extends QueryTest
props.put("customSchema", customSchema)
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
assert(df.schema.size === 2)
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map(
f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray)
assert(df.schema === expectedSchema)
assert(df.count() === 3)
}

Expand All @@ -1289,7 +1293,9 @@ class JDBCSuite extends QueryTest
""".stripMargin.replaceAll("\n", " "))
val df = sql("select * from people_view")
assert(df.schema.length === 2)
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema)
.map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray)
assert(df.schema === expectedSchema)
assert(df.count() === 3)
}
}
Expand Down Expand Up @@ -1404,8 +1410,8 @@ class JDBCSuite extends QueryTest
}

test("jdbc data source shouldn't have unnecessary metadata in its schema") {
val schema = StructType(Seq(
StructField("NAME", StringType, true), StructField("THEID", IntegerType, true)))
val schema = StructType(Seq(StructField("NAME", StringType, true, defaultMetadata),
StructField("THEID", IntegerType, true, defaultMetadata)))

val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
Expand Down