diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala new file mode 100644 index 000000000000..82f9f978c5da --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc.v2 + +import java.sql.Connection + +import org.apache.spark.SparkConf +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog +import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} +import org.apache.spark.sql.types._ +import org.apache.spark.tags.DockerTest + +/** + * To run this test suite for a specific version (e.g., ibmcom/db2:11.5.4.0): + * {{{ + * DB2_DOCKER_IMAGE_NAME=ibmcom/db2:11.5.4.0 + * ./build/sbt -Pdocker-integration-tests "test-only *DB2IntegrationSuite" + * }}} + */ +@DockerTest +class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { + override val catalogName: String = "db2" + override val db = new DatabaseOnDocker { + override val imageName = sys.env.getOrElse("DB2_DOCKER_IMAGE_NAME", "ibmcom/db2:11.5.4.0") + override val env = Map( + "DB2INST1_PASSWORD" -> "rootpass", + "LICENSE" -> "accept", + "DBNAME" -> "foo", + "ARCHIVE_LOGS" -> "false", + "AUTOCONFIG" -> "false" + ) + override val usesIpc = false + override val jdbcPort: Int = 50000 + override val privileged = true + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:db2://$ip:$port/foo:user=db2inst1;password=rootpass;retrieveMessagesFromServerOnGetMessage=true;" //scalastyle:ignore + } + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.db2", classOf[JDBCTableCatalog].getName) + .set("spark.sql.catalog.db2.url", db.getJdbcUrl(dockerIp, externalPort)) + + override def dataPreparation(conn: Connection): Unit = {} + + override def testUpdateColumnType(tbl: String): Unit = { + sql(s"CREATE TABLE $tbl (ID INTEGER) USING _") + var t = spark.table(tbl) + var expectedSchema = new StructType().add("ID", IntegerType) + assert(t.schema === expectedSchema) + sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE") + t = spark.table(tbl) + expectedSchema = new StructType().add("ID", DoubleType) + assert(t.schema === expectedSchema) + // Update column type from DOUBLE to STRING + val msg1 = intercept[AnalysisException] { + sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)") + }.getMessage + assert(msg1.contains("Cannot update alt_table field ID: double cannot be cast to varchar")) + } +} diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala index 400459c0ea17..1b51d43c1d13 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala @@ -23,10 +23,8 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @@ -54,7 +52,8 @@ import org.apache.spark.tags.DockerTest * It has been validated with 18.4.0 Express Edition. */ @DockerTest -class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession { +class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { + override val catalogName: String = "oracle" override val db = new DatabaseOnDocker { override val imageName = sys.env("ORACLE_DOCKER_IMAGE_NAME") override val env = Map( @@ -73,80 +72,19 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark override val connectionTimeout = timeout(7.minutes) override def dataPreparation(conn: Connection): Unit = {} - test("SPARK-33034: ALTER TABLE ... add new columns") { - withTable("oracle.alt_table") { - sql("CREATE TABLE oracle.alt_table (ID STRING) USING _") - sql("ALTER TABLE oracle.alt_table ADD COLUMNS (C1 STRING, C2 STRING)") - var t = spark.table("oracle.alt_table") - var expectedSchema = new StructType() - .add("ID", StringType) - .add("C1", StringType) - .add("C2", StringType) - assert(t.schema === expectedSchema) - sql("ALTER TABLE oracle.alt_table ADD COLUMNS (C3 STRING)") - t = spark.table("oracle.alt_table") - expectedSchema = expectedSchema.add("C3", StringType) - assert(t.schema === expectedSchema) - // Add already existing column - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE oracle.alt_table ADD COLUMNS (C3 DOUBLE)") - }.getMessage - assert(msg.contains("Cannot add column, because C3 already exists")) - } - // Add a column to not existing table - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE oracle.not_existing_table ADD COLUMNS (C4 STRING)") + override def testUpdateColumnType(tbl: String): Unit = { + sql(s"CREATE TABLE $tbl (ID INTEGER) USING _") + var t = spark.table(tbl) + var expectedSchema = new StructType().add("ID", DecimalType(10, 0)) + assert(t.schema === expectedSchema) + sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") + t = spark.table(tbl) + expectedSchema = new StructType().add("ID", StringType) + assert(t.schema === expectedSchema) + // Update column type from STRING to INTEGER + val msg1 = intercept[AnalysisException] { + sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER") }.getMessage - assert(msg.contains("Table not found")) - } - - test("SPARK-33034: ALTER TABLE ... update column type") { - withTable("oracle.alt_table") { - sql("CREATE TABLE oracle.alt_table (ID INTEGER) USING _") - sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE STRING") - val t = spark.table("oracle.alt_table") - val expectedSchema = new StructType().add("ID", StringType) - assert(t.schema === expectedSchema) - // Update column type from STRING to INTEGER - val msg1 = intercept[AnalysisException] { - sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE INTEGER") - }.getMessage - assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int")) - // Update not existing column - val msg2 = intercept[AnalysisException] { - sql("ALTER TABLE oracle.alt_table ALTER COLUMN bad_column TYPE DOUBLE") - }.getMessage - assert(msg2.contains("Cannot update missing field bad_column")) - // Update column to wrong type - val msg3 = intercept[ParseException] { - sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE bad_type") - }.getMessage - assert(msg3.contains("DataType bad_type is not supported")) - } - // Update column type in not existing table - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE oracle.not_existing_table ALTER COLUMN id TYPE DOUBLE") - }.getMessage - assert(msg.contains("Table not found")) - } - - test("SPARK-33034: ALTER TABLE ... update column nullability") { - withTable("oracle.alt_table") { - sql("CREATE TABLE oracle.alt_table (ID STRING NOT NULL) USING _") - sql("ALTER TABLE oracle.alt_table ALTER COLUMN ID DROP NOT NULL") - val t = spark.table("oracle.alt_table") - val expectedSchema = new StructType().add("ID", StringType, nullable = true) - assert(t.schema === expectedSchema) - // Update nullability of not existing column - val msg = intercept[AnalysisException] { - sql("ALTER TABLE oracle.alt_table ALTER COLUMN bad_column DROP NOT NULL") - }.getMessage - assert(msg.contains("Cannot update missing field bad_column")) - } - // Update column nullability in not existing table - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE oracle.not_existing_table ALTER COLUMN ID DROP NOT NULL") - }.getMessage - assert(msg.contains("Table not found")) + assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int")) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala new file mode 100644 index 000000000000..384bcc22f27d --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc.v2 + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.tags.DockerTest + +@DockerTest +trait V2JDBCTest extends SharedSparkSession { + val catalogName: String + // dialect specific update column type test + def testUpdateColumnType(tbl: String): Unit + + test("SPARK-33034: ALTER TABLE ... add new columns") { + withTable(s"$catalogName.alt_table") { + sql(s"CREATE TABLE $catalogName.alt_table (ID STRING) USING _") + var t = spark.table(s"$catalogName.alt_table") + var expectedSchema = new StructType().add("ID", StringType) + assert(t.schema === expectedSchema) + sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)") + t = spark.table(s"$catalogName.alt_table") + expectedSchema = expectedSchema.add("C1", StringType).add("C2", StringType) + assert(t.schema === expectedSchema) + sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)") + t = spark.table(s"$catalogName.alt_table") + expectedSchema = expectedSchema.add("C3", StringType) + assert(t.schema === expectedSchema) + // Add already existing column + val msg = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 DOUBLE)") + }.getMessage + assert(msg.contains("Cannot add column, because C3 already exists")) + } + // Add a column to not existing table + val msg = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") + }.getMessage + assert(msg.contains("Table not found")) + } + + test("SPARK-33034: ALTER TABLE ... update column type") { + withTable(s"$catalogName.alt_table") { + testUpdateColumnType(s"$catalogName.alt_table") + // Update not existing column + val msg2 = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column TYPE DOUBLE") + }.getMessage + assert(msg2.contains("Cannot update missing field bad_column")) + } + // Update column type in not existing table + val msg = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") + }.getMessage + assert(msg.contains("Table not found")) + } + + test("SPARK-33034: ALTER TABLE ... update column nullability") { + withTable(s"$catalogName.alt_table") { + sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL) USING _") + var t = spark.table(s"$catalogName.alt_table") + // nullable is true in the expecteSchema because Spark always sets nullable to true + // regardless of the JDBC metadata https://github.com/apache/spark/pull/18445 + var expectedSchema = new StructType().add("ID", StringType, nullable = true) + assert(t.schema === expectedSchema) + sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL") + t = spark.table(s"$catalogName.alt_table") + expectedSchema = new StructType().add("ID", StringType, nullable = true) + assert(t.schema === expectedSchema) + // Update nullability of not existing column + val msg = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column DROP NOT NULL") + }.getMessage + assert(msg.contains("Cannot update missing field bad_column")) + } + // Update column nullability in not existing table + val msg = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") + }.getMessage + assert(msg.contains("Table not found")) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 8b8531b2bb3b..621d416c5545 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -591,6 +591,13 @@ class DDLParserSuite extends AnalysisTest { None)) } + test("alter table: update column type invalid type") { + val msg = intercept[ParseException] { + parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bad_type") + }.getMessage + assert(msg.contains("DataType bad_type is not supported")) + } + test("alter table: update column type") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 430ca9edab79..908e03726d88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -58,4 +58,24 @@ private object DB2Dialect extends JdbcDialect { override def renameTable(oldTable: String, newTable: String): String = { s"RENAME TABLE $oldTable TO $newTable" } + + // scalastyle:off line.size.limit + // See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html + // scalastyle:on line.size.limit + override def getUpdateColumnTypeQuery( + tableName: String, + columnName: String, + newDataType: String): String = + s"ALTER TABLE $tableName ALTER COLUMN $columnName SET DATA TYPE $newDataType" + + // scalastyle:off line.size.limit + // See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000888.html + // scalastyle:on line.size.limit + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + val nullable = if (isNullable) "DROP NOT NULL" else "SET NOT NULL" + s"ALTER TABLE $tableName ALTER COLUMN $columnName $nullable" + } }