From 13069440fa104050560e5750a16713d5cde44ca1 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 13 Oct 2020 17:55:07 +0530 Subject: [PATCH 1/2] Add dialect support for alter table and a docker integration suite for MsSql Server. Added unsupported feature exception for switching nullability using alter table. --- .../jdbc/v2/MsSqlServerIntegrationSuite.scala | 91 +++++++++++++++++++ .../spark/sql/jdbc/MsSqlServerDialect.scala | 38 ++++++++ 2 files changed, 129 insertions(+) create mode 100644 external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala new file mode 100644 index 0000000000000..bf65a9b30fcbe --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc.v2 + +import java.sql.Connection +import java.sql.SQLFeatureNotSupportedException + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkConf +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog +import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} +import org.apache.spark.sql.types._ +import org.apache.spark.tags.DockerTest + +/** + * To run this test suite for a specific version (e.g., 2019-GA-ubuntu-16.04): + * {{{ + * MSSQLSERVER_DOCKER_IMAGE_NAME=2019-GA-ubuntu-16.04 + * ./build/sbt -Pdocker-integration-tests "testOnly *v2*MsSqlServerIntegrationSuite" + * }}} + */ +@DockerTest +class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { + + override val catalogName: String = "mssql" + + override val db = new DatabaseOnDocker { + override val imageName = sys.env.getOrElse("MSSQLSERVER_DOCKER_IMAGE_NAME", + "mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04") + override val env = Map( + "SA_PASSWORD" -> "Sapass123", + "ACCEPT_EULA" -> "Y" + ) + override val usesIpc = false + override val jdbcPort: Int = 1433 + + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;" + } + + override def testUpdateColumnType(tbl: String): Unit = { + sql(s"CREATE TABLE $tbl (ID INTEGER) USING _") + var t = spark.table(tbl) + var expectedSchema = new StructType().add("ID", IntegerType) + assert(t.schema === expectedSchema) + sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") + t = spark.table(tbl) + expectedSchema = new StructType().add("ID", StringType) + assert(t.schema === expectedSchema) + // Update column type from STRING to INTEGER + val msg1 = intercept[AnalysisException] { + sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER") + }.getMessage + assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int")) + } + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.mssql", classOf[JDBCTableCatalog].getName) + .set("spark.sql.catalog.mssql.url", db.getJdbcUrl(dockerIp, externalPort)) + + override val connectionTimeout = timeout(7.minutes) + + override def dataPreparation(conn: Connection): Unit = {} + + override def testUpdateColumnNullability(tbl: String): Unit = { + sql("CREATE TABLE mssql.alt_table (ID STRING NOT NULL) USING _") + // Update nullability is unsupported for mssql db. + val msg = intercept[AnalysisException] { + sql("ALTER TABLE mssql.alt_table ALTER COLUMN ID DROP NOT NULL") + }.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + + assert(msg.contains("UpdateColumnNullability is not supported")) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 1c6e8c359aa15..dc39a10987c91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.jdbc +import java.sql.SQLFeatureNotSupportedException import java.util.Locale import org.apache.spark.sql.internal.SQLConf @@ -64,4 +65,41 @@ private object MsSqlServerDialect extends JdbcDialect { override def renameTable(oldTable: String, newTable: String): String = { s"EXEC sp_rename $oldTable, $newTable" } + + // scalastyle:off line.size.limit + // see https://docs.microsoft.com/en-us/sql/relational-databases/tables/add-columns-to-a-table-database-engine?view=sql-server-ver15 + // scalastyle:on line.size.limit + override def getAddColumnQuery( + tableName: String, + columnName: String, + dataType: String): String = { + s"ALTER TABLE $tableName ADD ${quoteIdentifier(columnName)} $dataType" + } + + // scalastyle:off line.size.limit + // See https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-rename-transact-sql?view=sql-server-ver15 + // scalastyle:on line.size.limit + override def getRenameColumnQuery( + tableName: String, + columnName: String, + newName: String, + dbMajorVersion: Int): String = { + s"EXEC sp_rename '$tableName.${quoteIdentifier(columnName)}'," + + s" ${quoteIdentifier(newName)}, 'COLUMN'" + } + + // scalastyle:off line.size.limit + // see https://docs.microsoft.com/en-us/sql/t-sql/statements/alter-table-transact-sql?view=sql-server-ver15 + // scalastyle:on line.size.limit + // require to have column data type to change the column nullability + // ALTER TABLE tbl_name ALTER COLUMN col_name datatype [NULL | NOT NULL] + // column_definition: + // data_type [NOT NULL | NULL] + // We don't have column data type here, so we throw Exception for now + override def getUpdateColumnNullabilityQuery( + tableName: String, + columnName: String, + isNullable: Boolean): String = { + throw new SQLFeatureNotSupportedException(s"UpdateColumnNullability is not supported") + } } From 449a7ad2f695d5422a12740b65ab67d75a397551 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 3 Nov 2020 17:13:16 +0530 Subject: [PATCH 2/2] feedback --- .../jdbc/v2/MsSqlServerIntegrationSuite.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala index bf65a9b30fcbe..905e32aaa918e 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.jdbc.v2 -import java.sql.Connection -import java.sql.SQLFeatureNotSupportedException +import java.sql.{Connection, SQLFeatureNotSupportedException} import org.scalatest.time.SpanSugar._ @@ -55,6 +54,14 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBC s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;" } + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.mssql", classOf[JDBCTableCatalog].getName) + .set("spark.sql.catalog.mssql.url", db.getJdbcUrl(dockerIp, externalPort)) + + override val connectionTimeout = timeout(7.minutes) + + override def dataPreparation(conn: Connection): Unit = {} + override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER) USING _") var t = spark.table(tbl) @@ -71,19 +78,11 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBC assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int")) } - override def sparkConf: SparkConf = super.sparkConf - .set("spark.sql.catalog.mssql", classOf[JDBCTableCatalog].getName) - .set("spark.sql.catalog.mssql.url", db.getJdbcUrl(dockerIp, externalPort)) - - override val connectionTimeout = timeout(7.minutes) - - override def dataPreparation(conn: Connection): Unit = {} - override def testUpdateColumnNullability(tbl: String): Unit = { - sql("CREATE TABLE mssql.alt_table (ID STRING NOT NULL) USING _") + sql(s"CREATE TABLE $tbl (ID STRING NOT NULL) USING _") // Update nullability is unsupported for mssql db. val msg = intercept[AnalysisException] { - sql("ALTER TABLE mssql.alt_table ALTER COLUMN ID DROP NOT NULL") + sql(s"ALTER TABLE $tbl ALTER COLUMN ID DROP NOT NULL") }.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage assert(msg.contains("UpdateColumnNullability is not supported"))