diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala index d3230155b892..d8dee61d70ea 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.jdbc.v2 -import java.sql.Connection +import java.sql.{Connection, SQLFeatureNotSupportedException} import scala.collection.JavaConverters._ +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.NamespaceChange import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.tags.DockerTest @@ -55,11 +57,47 @@ class MySQLNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespac override def dataPreparation(conn: Connection): Unit = {} - override def builtinNamespaces: Array[Array[String]] = Array() + override def builtinNamespaces: Array[Array[String]] = + Array(Array("information_schema"), Array("mysql"), Array("performance_schema"), Array("sys")) + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + Array(builtinNamespaces.head, namespace) ++ builtinNamespaces.tail + } override val supportsSchemaComment: Boolean = false - // Cannot get namespaces with conn.getMetaData.getSchemas - // TODO testListNamespaces() - // TODO testDropNamespaces() + override val supportsDropSchemaRestrict: Boolean = false + + testListNamespaces() + testDropNamespaces() + + test("Create or remove comment of namespace unsupported") { + val e1 = intercept[AnalysisException] { + catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) + } + assert(e1.getMessage.contains("Failed create name space: foo")) + assert(e1.getCause.isInstanceOf[SQLFeatureNotSupportedException]) + assert(e1.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + .contains("Create namespace comment is not supported")) + assert(catalog.namespaceExists(Array("foo")) === false) + catalog.createNamespace(Array("foo"), Map.empty[String, String].asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + val e2 = intercept[AnalysisException] { + catalog.alterNamespace(Array("foo"), NamespaceChange + .setProperty("comment", "comment for foo")) + } + assert(e2.getMessage.contains("Failed create comment on name space: foo")) + assert(e2.getCause.isInstanceOf[SQLFeatureNotSupportedException]) + assert(e2.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + .contains("Create namespace comment is not supported")) + val e3 = intercept[AnalysisException] { + catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment")) + } + assert(e3.getMessage.contains("Failed remove comment on name space: foo")) + assert(e3.getCause.isInstanceOf[SQLFeatureNotSupportedException]) + assert(e3.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + .contains("Remove namespace comment is not supported")) + catalog.dropNamespace(Array("foo"), cascade = true) + assert(catalog.namespaceExists(Array("foo")) === false) + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 8d97ac45568e..bae0d7c36163 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -52,6 +52,8 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte def supportsDropSchemaCascade: Boolean = true + def supportsDropSchemaRestrict: Boolean = true + def testListNamespaces(): Unit = { test("listNamespaces: basic behavior") { val commentMap = if (supportsSchemaComment) { @@ -78,7 +80,11 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte assert(createCommentWarning === false) } - catalog.dropNamespace(Array("foo"), cascade = false) + if (supportsDropSchemaRestrict) { + catalog.dropNamespace(Array("foo"), cascade = false) + } else { + catalog.dropNamespace(Array("foo"), cascade = true) + } assert(catalog.namespaceExists(Array("foo")) === false) assert(catalog.listNamespaces() === builtinNamespaces) val msg = intercept[AnalysisException] { @@ -99,15 +105,21 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte } catalog.createNamespace(Array("foo"), commentMap.asJava) assert(catalog.namespaceExists(Array("foo")) === true) - catalog.dropNamespace(Array("foo"), cascade = false) + if (supportsDropSchemaRestrict) { + catalog.dropNamespace(Array("foo"), cascade = false) + } else { + catalog.dropNamespace(Array("foo"), cascade = true) + } assert(catalog.namespaceExists(Array("foo")) === false) // Drop non empty namespace without cascade - catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) + catalog.createNamespace(Array("foo"), commentMap.asJava) assert(catalog.namespaceExists(Array("foo")) === true) catalog.createTable(ident1, schema, Array.empty, emptyProps) - intercept[NonEmptyNamespaceException] { - catalog.dropNamespace(Array("foo"), cascade = false) + if (supportsDropSchemaRestrict) { + intercept[NonEmptyNamespaceException] { + catalog.dropNamespace(Array("foo"), cascade = false) + } } // Drop non empty namespace with cascade diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 76eb4311e41b..d07ebb322c9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1944,4 +1944,16 @@ object QueryExecutionErrors { def MultipleBucketTransformsError(): Throwable = { new UnsupportedOperationException("Multiple bucket transforms are not supported.") } + + def unsupportedCreateNamespaceCommentError(): Throwable = { + new SQLFeatureNotSupportedException("Create namespace comment is not supported") + } + + def unsupportedRemoveNamespaceCommentError(): Throwable = { + new SQLFeatureNotSupportedException("Remove namespace comment is not supported") + } + + def unsupportedDropNamespaceRestrictError(): Throwable = { + new SQLFeatureNotSupportedException("Drop namespace restrict is not supported") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 1a4e4aaf16da..ed167c07756e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -971,53 +971,57 @@ object JdbcUtils extends Logging with SQLConfHelper { } /** - * Creates a namespace. + * Creates a schema. */ - def createNamespace( + def createSchema( conn: Connection, options: JDBCOptions, - namespace: String, + schema: String, comment: String): Unit = { + val statement = conn.createStatement + try { + statement.setQueryTimeout(options.queryTimeout) + val dialect = JdbcDialects.get(options.url) + dialect.createSchema(statement, schema, comment) + } finally { + statement.close() + } + } + + def schemaExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = { + val dialect = JdbcDialects.get(options.url) + dialect.schemasExists(conn, options, schema) + } + + def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = { val dialect = JdbcDialects.get(options.url) - executeStatement(conn, options, s"CREATE SCHEMA ${dialect.quoteIdentifier(namespace)}") - if (!comment.isEmpty) createNamespaceComment(conn, options, namespace, comment) + dialect.listSchemas(conn, options) } - def createNamespaceComment( + def alterSchemaComment( conn: Connection, options: JDBCOptions, - namespace: String, + schema: String, comment: String): Unit = { val dialect = JdbcDialects.get(options.url) - try { - executeStatement( - conn, options, dialect.getSchemaCommentQuery(namespace, comment)) - } catch { - case e: Exception => - logWarning("Cannot create JDBC catalog comment. The catalog comment will be ignored.") - } + executeStatement(conn, options, dialect.getSchemaCommentQuery(schema, comment)) } - def removeNamespaceComment( + def removeSchemaComment( conn: Connection, options: JDBCOptions, - namespace: String): Unit = { + schema: String): Unit = { val dialect = JdbcDialects.get(options.url) - try { - executeStatement(conn, options, dialect.removeSchemaCommentQuery(namespace)) - } catch { - case e: Exception => - logWarning("Cannot drop JDBC catalog comment.") - } + executeStatement(conn, options, dialect.removeSchemaCommentQuery(schema)) } /** - * Drops a namespace from the JDBC database. + * Drops a schema from the JDBC database. */ - def dropNamespace( - conn: Connection, options: JDBCOptions, namespace: String, cascade: Boolean): Unit = { + def dropSchema( + conn: Connection, options: JDBCOptions, schema: String, cascade: Boolean): Unit = { val dialect = JdbcDialects.get(options.url) - executeStatement(conn, options, dialect.dropSchema(namespace, cascade)) + executeStatement(conn, options, dialect.dropSchema(schema, cascade)) } /** @@ -1148,11 +1152,17 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = { + def executeQuery(conn: Connection, options: JDBCOptions, sql: String)( + f: ResultSet => Unit): Unit = { val statement = conn.createStatement try { statement.setQueryTimeout(options.queryTimeout) - statement.executeQuery(sql) + val rs = statement.executeQuery(sql) + try { + f(rs) + } finally { + rs.close() + } } finally { statement.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index d06a28d952b3..03200d5a6f37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -21,7 +21,6 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.ArrayBuilder import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} @@ -173,23 +172,14 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging override def namespaceExists(namespace: Array[String]): Boolean = namespace match { case Array(db) => JdbcUtils.withConnection(options) { conn => - val rs = conn.getMetaData.getSchemas(null, db) - while (rs.next()) { - if (rs.getString(1) == db) return true; - } - false + JdbcUtils.schemaExists(conn, options, db) } case _ => false } override def listNamespaces(): Array[Array[String]] = { JdbcUtils.withConnection(options) { conn => - val schemaBuilder = ArrayBuilder.make[Array[String]] - val rs = conn.getMetaData.getSchemas() - while (rs.next()) { - schemaBuilder += Array(rs.getString(1)) - } - schemaBuilder.result + JdbcUtils.listSchemas(conn, options) } } @@ -236,7 +226,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging } JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException(s"Failed create name space: $db", dialect) { - JdbcUtils.createNamespace(conn, options, db, comment) + JdbcUtils.createSchema(conn, options, db, comment) } } @@ -254,7 +244,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case set: NamespaceChange.SetProperty => if (set.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.createNamespaceComment(conn, options, db, set.value) + JdbcUtils.classifyException(s"Failed create comment on name space: $db", dialect) { + JdbcUtils.alterSchemaComment(conn, options, db, set.value) + } } } else { throw QueryCompilationErrors.cannotSetJDBCNamespaceWithPropertyError(set.property) @@ -263,7 +255,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case unset: NamespaceChange.RemoveProperty => if (unset.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.removeNamespaceComment(conn, options, db) + JdbcUtils.classifyException(s"Failed remove comment on name space: $db", dialect) { + JdbcUtils.removeSchemaComment(conn, options, db) + } } } else { throw QueryCompilationErrors.cannotUnsetJDBCNamespaceWithPropertyError(unset.property) @@ -284,7 +278,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case Array(db) if namespaceExists(namespace) => JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) { - JdbcUtils.dropNamespace(conn, options, db, cascade) + JdbcUtils.dropSchema(conn, options, db, cascade) true } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 7dd987e0a44b..983fa496bc23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, Date, Timestamp} +import java.sql.{Connection, Date, Statement, Timestamp} import java.time.{Instant, LocalDate} import java.util @@ -229,6 +229,45 @@ abstract class JdbcDialect extends Serializable with Logging{ } } + /** + * Create schema with an optional comment. Empty string means no comment. + */ + def createSchema(statement: Statement, schema: String, comment: String): Unit = { + val schemaCommentQuery = if (comment.nonEmpty) { + // We generate comment query here so that it can fail earlier without creating the schema. + getSchemaCommentQuery(schema, comment) + } else { + comment + } + statement.executeUpdate(s"CREATE SCHEMA ${quoteIdentifier(schema)}") + if (comment.nonEmpty) { + statement.executeUpdate(schemaCommentQuery) + } + } + + /** + * Check schema exists or not. + */ + def schemasExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = { + val rs = conn.getMetaData.getSchemas(null, schema) + while (rs.next()) { + if (rs.getString(1) == schema) return true; + } + false + } + + /** + * Lists all the schemas in this table. + */ + def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = { + val schemaBuilder = ArrayBuilder.make[Array[String]] + val rs = conn.getMetaData.getSchemas() + while (rs.next()) { + schemaBuilder += Array(rs.getString(1)) + } + schemaBuilder.result + } + /** * Return Some[true] iff `TRUNCATE TABLE` causes cascading default. * Some[true] : TRUNCATE TABLE causes cascading. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 9fcb7a27d17a..3cca81048e81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -21,6 +21,8 @@ import java.sql.{Connection, SQLException, Types} import java.util import java.util.Locale +import scala.collection.mutable.ArrayBuilder + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} @@ -76,6 +78,25 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { s"`$colName`" } + override def schemasExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = { + listSchemas(conn, options).exists(_.head == schema) + } + + override def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = { + val schemaBuilder = ArrayBuilder.make[Array[String]] + try { + JdbcUtils.executeQuery(conn, options, "SHOW SCHEMAS") { rs => + while (rs.next()) { + schemaBuilder += Array(rs.getString("Database")) + } + } + } catch { + case _: Exception => + logWarning("Cannot show schemas.") + } + schemaBuilder.result + } + override def getTableExistsQuery(table: String): String = { s"SELECT 1 FROM $table LIMIT 1" } @@ -134,6 +155,14 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { case _ => JdbcUtils.getCommonJDBCType(dt) } + override def getSchemaCommentQuery(schema: String, comment: String): String = { + throw QueryExecutionErrors.unsupportedCreateNamespaceCommentError() + } + + override def removeSchemaCommentQuery(schema: String): String = { + throw QueryExecutionErrors.unsupportedRemoveNamespaceCommentError() + } + // CREATE INDEX syntax // https://dev.mysql.com/doc/refman/8.0/en/create-index.html override def createIndex( @@ -175,26 +204,27 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { val sql = s"SHOW INDEXES FROM $tableName" var indexMap: Map[String, TableIndex] = Map() try { - val rs = JdbcUtils.executeQuery(conn, options, sql) - while (rs.next()) { - val indexName = rs.getString("key_name") - val colName = rs.getString("column_name") - val indexType = rs.getString("index_type") - val indexComment = rs.getString("Index_comment") - if (indexMap.contains(indexName)) { - val index = indexMap.get(indexName).get - val newIndex = new TableIndex(indexName, indexType, - index.columns() :+ FieldReference(colName), - index.columnProperties, index.properties) - indexMap += (indexName -> newIndex) - } else { - // The only property we are building here is `COMMENT` because it's the only one - // we can get from `SHOW INDEXES`. - val properties = new util.Properties(); - if (indexComment.nonEmpty) properties.put("COMMENT", indexComment) - val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)), - new util.HashMap[NamedReference, util.Properties](), properties) - indexMap += (indexName -> index) + JdbcUtils.executeQuery(conn, options, sql) { rs => + while (rs.next()) { + val indexName = rs.getString("key_name") + val colName = rs.getString("column_name") + val indexType = rs.getString("index_type") + val indexComment = rs.getString("Index_comment") + if (indexMap.contains(indexName)) { + val index = indexMap.get(indexName).get + val newIndex = new TableIndex(indexName, indexType, + index.columns() :+ FieldReference(colName), + index.columnProperties, index.properties) + indexMap += (indexName -> newIndex) + } else { + // The only property we are building here is `COMMENT` because it's the only one + // we can get from `SHOW INDEXES`. + val properties = new util.Properties(); + if (indexComment.nonEmpty) properties.put("COMMENT", indexComment) + val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)), + new util.HashMap[NamedReference, util.Properties](), properties) + indexMap += (indexName -> index) + } } } } catch { @@ -219,4 +249,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { case _ => super.classifyException(message, e) } } + + override def dropSchema(schema: String, cascade: Boolean): String = { + if (cascade) { + s"DROP SCHEMA ${quoteIdentifier(schema)}" + } else { + throw QueryExecutionErrors.unsupportedDropNamespaceRestrictError() + } + } }