diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index ec958cd55c943..6cf0f56ee7eeb 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -50,7 +50,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { override val jdbcPort: Int = 3306 override def getJdbcUrl(ip: String, port: Int): String = - s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass" + s"jdbc:mysql://$ip:$port/" + + s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false" } override def sparkConf: SparkConf = super.sparkConf @@ -59,7 +60,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { override val connectionTimeout = timeout(7.minutes) - override def dataPreparation(conn: Connection): Unit = {} + private var mySQLVersion = -1 + + override def dataPreparation(conn: Connection): Unit = { + mySQLVersion = conn.getMetaData.getDatabaseMajorVersion + } override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER) USING _") @@ -77,11 +82,26 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int")) } + override def testRenameColumn(tbl: String): Unit = { + assert(mySQLVersion > 0) + if (mySQLVersion < 8) { + // Rename is unsupported for mysql versions < 8.0. + val exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED") + } + assert(exception.getCause != null, s"Wrong exception thrown: $exception") + val msg = exception.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + assert(msg.contains("Rename column is only supported for MySQL version 8.0 and above.")) + } else { + super.testRenameColumn(tbl) + } + } + override def testUpdateColumnNullability(tbl: String): Unit = { - sql("CREATE TABLE mysql.alt_table (ID STRING NOT NULL) USING _") + sql(s"CREATE TABLE $tbl (ID STRING NOT NULL) USING _") // Update nullability is unsupported for mysql db. val msg = intercept[AnalysisException] { - sql("ALTER TABLE mysql.alt_table ALTER COLUMN ID DROP NOT NULL") + sql(s"ALTER TABLE $tbl ALTER COLUMN ID DROP NOT NULL") }.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage assert(msg.contains("UpdateColumnNullability is not supported")) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 8419db7784e88..92af29d9c9467 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @DockerTest -trait V2JDBCTest extends SharedSparkSession { +private[v2] trait V2JDBCTest extends SharedSparkSession { val catalogName: String // dialect specific update column type test def testUpdateColumnType(tbl: String): Unit @@ -46,6 +46,14 @@ trait V2JDBCTest extends SharedSparkSession { assert(msg.contains("Cannot update missing field bad_column")) } + def testRenameColumn(tbl: String): Unit = { + sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED") + val t = spark.table(s"$tbl") + val expectedSchema = new StructType().add("RENAMED", StringType, nullable = true) + .add("ID1", StringType, nullable = true).add("ID2", StringType, nullable = true) + assert(t.schema === expectedSchema) + } + test("SPARK-33034: ALTER TABLE ... add new columns") { withTable(s"$catalogName.alt_table") { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING) USING _") @@ -110,6 +118,24 @@ trait V2JDBCTest extends SharedSparkSession { assert(msg.contains("Table not found")) } + test("SPARK-33034: ALTER TABLE ... rename column") { + withTable(s"$catalogName.alt_table") { + sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL," + + s" ID1 STRING NOT NULL, ID2 STRING NOT NULL) USING _") + testRenameColumn(s"$catalogName.alt_table") + // Rename to already existing column + val msg = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.alt_table RENAME COLUMN ID1 TO ID2") + }.getMessage + assert(msg.contains("Cannot rename column, because ID2 already exists")) + } + // Rename a column in a not existing table + val msg = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") + }.getMessage + assert(msg.contains("Table not found")) + } + test("SPARK-33034: ALTER TABLE ... update column nullability") { withTable(s"$catalogName.alt_table") { testUpdateColumnNullability(s"$catalogName.alt_table") @@ -121,3 +147,4 @@ trait V2JDBCTest extends SharedSparkSession { assert(msg.contains("Table not found")) } } + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 24e380e3be3e1..9aaa55980436e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -895,11 +895,12 @@ object JdbcUtils extends Logging { changes: Seq[TableChange], options: JDBCOptions): Unit = { val dialect = JdbcDialects.get(options.url) + val metaData = conn.getMetaData if (changes.length == 1) { - executeStatement(conn, options, dialect.alterTable(tableName, changes)(0)) + executeStatement(conn, options, dialect.alterTable(tableName, changes, + metaData.getDatabaseMajorVersion)(0)) } else { - val metadata = conn.getMetaData - if (!metadata.supportsTransactions) { + if (!metaData.supportsTransactions) { throw new SQLFeatureNotSupportedException("The target JDBC server does not support " + "transaction and can only support ALTER TABLE with a single action.") } else { @@ -907,7 +908,7 @@ object JdbcUtils extends Logging { val statement = conn.createStatement try { statement.setQueryTimeout(options.queryTimeout) - for (sql <- dialect.alterTable(tableName, changes)) { + for (sql <- dialect.alterTable(tableName, changes, metaData.getDatabaseMajorVersion)) { statement.executeUpdate(sql) } conn.commit() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index e0703195051dc..0a857b99966fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -205,7 +205,10 @@ abstract class JdbcDialect extends Serializable { * @param changes Changes to apply to the table. * @return The SQL statements to use for altering the table. */ - def alterTable(tableName: String, changes: Seq[TableChange]): Array[String] = { + def alterTable( + tableName: String, + changes: Seq[TableChange], + dbMajorVersion: Int): Array[String] = { val updateClause = ArrayBuilder.make[String] for (change <- changes) { change match { @@ -215,7 +218,7 @@ abstract class JdbcDialect extends Serializable { updateClause += getAddColumnQuery(tableName, name(0), dataType) case rename: RenameColumn if rename.fieldNames.length == 1 => val name = rename.fieldNames - updateClause += getRenameColumnQuery(tableName, name(0), rename.newName) + updateClause += getRenameColumnQuery(tableName, name(0), rename.newName, dbMajorVersion) case delete: DeleteColumn if delete.fieldNames.length == 1 => val name = delete.fieldNames updateClause += getDeleteColumnQuery(tableName, name(0)) @@ -237,7 +240,11 @@ abstract class JdbcDialect extends Serializable { def getAddColumnQuery(tableName: String, columnName: String, dataType: String): String = s"ALTER TABLE $tableName ADD COLUMN ${quoteIdentifier(columnName)} $dataType" - def getRenameColumnQuery(tableName: String, columnName: String, newName: String): String = + def getRenameColumnQuery( + tableName: String, + columnName: String, + newName: String, + dbMajorVersion: Int): String = s"ALTER TABLE $tableName RENAME COLUMN ${quoteIdentifier(columnName)} TO" + s" ${quoteIdentifier(newName)}" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index a516e9e76ef31..942cdc9619b56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -57,6 +57,25 @@ private case object MySQLDialect extends JdbcDialect { s"ALTER TABLE $tableName MODIFY COLUMN ${quoteIdentifier(columnName)} $newDataType" } + // See Old Syntax: https://dev.mysql.com/doc/refman/5.6/en/alter-table.html + // According to https://dev.mysql.com/worklog/task/?id=10761 old syntax works for + // both versions of MySQL i.e. 5.x and 8.0 + // The old syntax requires us to have type definition. Since we do not have type + // information, we throw the exception for old version. + override def getRenameColumnQuery( + tableName: String, + columnName: String, + newName: String, + dbMajorVersion: Int): String = { + if (dbMajorVersion >= 8) { + s"ALTER TABLE $tableName RENAME COLUMN ${quoteIdentifier(columnName)} TO" + + s" ${quoteIdentifier(newName)}" + } else { + throw new SQLFeatureNotSupportedException( + s"Rename column is only supported for MySQL version 8.0 and above.") + } + } + // See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html // require to have column data type to change the column nullability // ALTER TABLE tbl_name MODIFY [COLUMN] col_name column_definition