diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml index bb39e5fde6d08..e3070f462c1ff 100644 --- a/external/docker-integration-tests/pom.xml +++ b/external/docker-integration-tests/pom.xml @@ -162,5 +162,10 @@ mssql-jdbc test + + mysql + mysql-connector-java + test + diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2NamespaceSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2NamespaceSuite.scala new file mode 100644 index 0000000000000..f0e98fc2722b0 --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2NamespaceSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc.v2 + +import java.sql.Connection + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.tags.DockerTest + +/** + * To run this test suite for a specific version (e.g., ibmcom/db2:11.5.6.0a): + * {{{ + * ENABLE_DOCKER_INTEGRATION_TESTS=1 DB2_DOCKER_IMAGE_NAME=ibmcom/db2:11.5.6.0a + * ./build/sbt -Pdocker-integration-tests "testOnly *v2.DB2NamespaceSuite" + * }}} + */ +@DockerTest +class DB2NamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespaceTest { + override val db = new DatabaseOnDocker { + override val imageName = sys.env.getOrElse("DB2_DOCKER_IMAGE_NAME", "ibmcom/db2:11.5.6.0a") + override val env = Map( + "DB2INST1_PASSWORD" -> "rootpass", + "LICENSE" -> "accept", + "DBNAME" -> "db2foo", + "ARCHIVE_LOGS" -> "false", + "AUTOCONFIG" -> "false" + ) + override val usesIpc = false + override val jdbcPort: Int = 50000 + override val privileged = true + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:db2://$ip:$port/db2foo:user=db2inst1;password=rootpass;retrieveMessagesFromServerOnGetMessage=true;" //scalastyle:ignore + } + + val map = new CaseInsensitiveStringMap( + Map("url" -> db.getJdbcUrl(dockerIp, externalPort), + "driver" -> "com.ibm.db2.jcc.DB2Driver").asJava) + + catalog.initialize("db2", map) + + override def dataPreparation(conn: Connection): Unit = {} + + override def builtinNamespaces: Array[Array[String]] = + Array(Array("NULLID"), Array("SQLJ"), Array("SYSCAT"), Array("SYSFUN"), + Array("SYSIBM"), Array("SYSIBMADM"), Array("SYSIBMINTERNAL"), Array("SYSIBMTS"), + Array("SYSPROC"), Array("SYSPUBLIC"), Array("SYSSTAT"), Array("SYSTOOLS")) + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + builtinNamespaces ++ Array(namespace) + } + + override val supportsDropSchemaCascade: Boolean = false + + testListNamespaces() + testDropNamespaces() +} diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerNamespaceSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerNamespaceSuite.scala new file mode 100644 index 0000000000000..aa8dac266380a --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerNamespaceSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc.v2 + +import java.sql.Connection + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.tags.DockerTest + +/** + * To run this test suite for a specific version (e.g., 2019-CU13-ubuntu-20.04): + * {{{ + * ENABLE_DOCKER_INTEGRATION_TESTS=1 + * MSSQLSERVER_DOCKER_IMAGE_NAME=mcr.microsoft.com/mssql/server:2019-CU13-ubuntu-20.04 + * ./build/sbt -Pdocker-integration-tests "testOnly *v2.MsSqlServerNamespaceSuite" + * }}} + */ +@DockerTest +class MsSqlServerNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespaceTest { + override val db = new DatabaseOnDocker { + override val imageName = sys.env.getOrElse("MSSQLSERVER_DOCKER_IMAGE_NAME", + "mcr.microsoft.com/mssql/server:2019-CU13-ubuntu-20.04") + override val env = Map( + "SA_PASSWORD" -> "Sapass123", + "ACCEPT_EULA" -> "Y" + ) + override val usesIpc = false + override val jdbcPort: Int = 1433 + + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;" + } + + val map = new CaseInsensitiveStringMap( + Map("url" -> db.getJdbcUrl(dockerIp, externalPort), + "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver").asJava) + + catalog.initialize("mssql", map) + + override def dataPreparation(conn: Connection): Unit = {} + + override def builtinNamespaces: Array[Array[String]] = + Array(Array("db_accessadmin"), Array("db_backupoperator"), Array("db_datareader"), + Array("db_datawriter"), Array("db_ddladmin"), Array("db_denydatareader"), + Array("db_denydatawriter"), Array("db_owner"), Array("db_securityadmin"), Array("dbo"), + Array("guest"), Array("INFORMATION_SCHEMA"), Array("sys")) + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + builtinNamespaces ++ Array(namespace) + } + + override val supportsSchemaComment: Boolean = false + + override val supportsDropSchemaCascade: Boolean = false + + testListNamespaces() + testDropNamespaces() +} diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index bc4bf54324ee5..97f521a378eb7 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -29,14 +29,11 @@ import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest /** - * * To run this test suite for a specific version (e.g., mysql:5.7.36): * {{{ * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:5.7.36 * ./build/sbt -Pdocker-integration-tests "testOnly *v2*MySQLIntegrationSuite" - * * }}} - * */ @DockerTest class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest { diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala new file mode 100644 index 0000000000000..0a0ebd1049d4a --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLNamespaceSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc.v2 + +import java.sql.Connection +import java.sql.SQLFeatureNotSupportedException + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.NamespaceChange +import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.tags.DockerTest + +/** + * To run this test suite for a specific version (e.g., mysql:5.7.36): + * {{{ + * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:5.7.36 + * ./build/sbt -Pdocker-integration-tests "testOnly *v2*MySQLNamespaceSuite" + * }}} + */ +@DockerTest +class MySQLNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespaceTest { + override val db = new DatabaseOnDocker { + override val imageName = sys.env.getOrElse("MYSQL_DOCKER_IMAGE_NAME", "mysql:5.7.36") + override val env = Map( + "MYSQL_ROOT_PASSWORD" -> "rootpass" + ) + override val usesIpc = false + override val jdbcPort: Int = 3306 + + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:mysql://$ip:$port/" + + s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false" + } + + val map = new CaseInsensitiveStringMap( + Map("url" -> db.getJdbcUrl(dockerIp, externalPort), + "driver" -> "com.mysql.jdbc.Driver").asJava) + + catalog.initialize("mysql", map) + + override def dataPreparation(conn: Connection): Unit = {} + + override def builtinNamespaces: Array[Array[String]] = + Array(Array("information_schema"), Array("mysql"), Array("performance_schema"), Array("sys")) + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + Array(builtinNamespaces.head, namespace) ++ builtinNamespaces.tail + } + + override val supportsSchemaComment: Boolean = false + + override val supportsDropSchemaRestrict: Boolean = false + + testListNamespaces() + testDropNamespaces() + + test("Create or remove comment of namespace unsupported") { + val e1 = intercept[AnalysisException] { + catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) + } + assert(e1.getMessage.contains("Failed create name space: foo")) + assert(e1.getCause.isInstanceOf[SQLFeatureNotSupportedException]) + assert(e1.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + .contains("Create namespace comment is not supported")) + assert(catalog.namespaceExists(Array("foo")) === false) + catalog.createNamespace(Array("foo"), Map.empty[String, String].asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + val e2 = intercept[AnalysisException] { + catalog.alterNamespace(Array("foo"), NamespaceChange + .setProperty("comment", "comment for foo")) + } + assert(e2.getMessage.contains("Failed create comment on name space: foo")) + assert(e2.getCause.isInstanceOf[SQLFeatureNotSupportedException]) + assert(e2.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + .contains("Create namespace comment is not supported")) + val e3 = intercept[AnalysisException] { + catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment")) + } + assert(e3.getMessage.contains("Failed remove comment on name space: foo")) + assert(e3.getCause.isInstanceOf[SQLFeatureNotSupportedException]) + assert(e3.getCause.asInstanceOf[SQLFeatureNotSupportedException].getMessage + .contains("Remove namespace comment is not supported")) + catalog.dropNamespace(Array("foo"), cascade = true) + assert(catalog.namespaceExists(Array("foo")) === false) + } +} diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleNamespaceSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleNamespaceSuite.scala new file mode 100644 index 0000000000000..31f26d2990666 --- /dev/null +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleNamespaceSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc.v2 + +import java.sql.Connection + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.tags.DockerTest + +/** + * The following are the steps to test this: + * + * 1. Choose to use a prebuilt image or build Oracle database in a container + * - The documentation on how to build Oracle RDBMS in a container is at + * https://github.com/oracle/docker-images/blob/master/OracleDatabase/SingleInstance/README.md + * - Official Oracle container images can be found at https://container-registry.oracle.com + * - A trustable and streamlined Oracle XE database image can be found on Docker Hub at + * https://hub.docker.com/r/gvenzl/oracle-xe see also https://github.com/gvenzl/oci-oracle-xe + * 2. Run: export ORACLE_DOCKER_IMAGE_NAME=image_you_want_to_use_for_testing + * - Example: export ORACLE_DOCKER_IMAGE_NAME=gvenzl/oracle-xe:latest + * 3. Run: export ENABLE_DOCKER_INTEGRATION_TESTS=1 + * 4. Start docker: sudo service docker start + * - Optionally, docker pull $ORACLE_DOCKER_IMAGE_NAME + * 5. Run Spark integration tests for Oracle with: ./build/sbt -Pdocker-integration-tests + * "testOnly org.apache.spark.sql.jdbc.v2.OracleNamespaceSuite" + * + * A sequence of commands to build the Oracle XE database container image: + * $ git clone https://github.com/oracle/docker-images.git + * $ cd docker-images/OracleDatabase/SingleInstance/dockerfiles + * $ ./buildContainerImage.sh -v 18.4.0 -x + * $ export ORACLE_DOCKER_IMAGE_NAME=oracle/database:18.4.0-xe + * + * This procedure has been validated with Oracle 18.4.0 Express Edition. + */ +@DockerTest +class OracleNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNamespaceTest { + override val db = new DatabaseOnDocker { + lazy override val imageName = + sys.env.getOrElse("ORACLE_DOCKER_IMAGE_NAME", "gvenzl/oracle-xe:18.4.0") + val oracle_password = "Th1s1sThe0racle#Pass" + override val env = Map( + "ORACLE_PWD" -> oracle_password, // oracle images uses this + "ORACLE_PASSWORD" -> oracle_password // gvenzl/oracle-xe uses this + ) + override val usesIpc = false + override val jdbcPort: Int = 1521 + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:oracle:thin:system/$oracle_password@//$ip:$port/xe" + } + + val map = new CaseInsensitiveStringMap( + Map("url" -> db.getJdbcUrl(dockerIp, externalPort), + "driver" -> "oracle.jdbc.OracleDriver").asJava) + + catalog.initialize("system", map) + + override def dataPreparation(conn: Connection): Unit = {} + + override def builtinNamespaces: Array[Array[String]] = + Array(Array("ANONYMOUS"), Array("APEX_030200"), Array("APEX_PUBLIC_USER"), Array("APPQOSSYS"), + Array("BI"), Array("DIP"), Array("FLOWS_FILES"), Array("HR"), Array("OE"), Array("PM"), + Array("SCOTT"), Array("SH"), Array("SPATIAL_CSW_ADMIN_USR"), Array("SPATIAL_WFS_ADMIN_USR"), + Array("XS$NULL")) + + // Cannot create schema dynamically + // TODO testListNamespaces() + // TODO testDropNamespaces() +} diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresNamespaceSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresNamespaceSuite.scala index a7744d18433f1..33190103d6a9a 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresNamespaceSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresNamespaceSuite.scala @@ -53,7 +53,9 @@ class PostgresNamespaceSuite extends DockerJDBCIntegrationSuite with V2JDBCNames override def dataPreparation(conn: Connection): Unit = {} - override def builtinNamespaces: Array[Array[String]] = { + override def builtinNamespaces: Array[Array[String]] = Array(Array("information_schema"), Array("pg_catalog"), Array("public")) - } + + testListNamespaces() + testDropNamespaces() } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 4f56f1f4ea1e7..bae0d7c361635 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -44,52 +44,90 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte def builtinNamespaces: Array[Array[String]] - test("listNamespaces: basic behavior") { - catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) - assert(catalog.listNamespaces() === Array(Array("foo")) ++ builtinNamespaces) - assert(catalog.listNamespaces(Array("foo")) === Array()) - assert(catalog.namespaceExists(Array("foo")) === true) - - val logAppender = new LogAppender("catalog comment") - withLogAppender(logAppender) { - catalog.alterNamespace(Array("foo"), NamespaceChange - .setProperty("comment", "comment for foo")) - catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment")) - } - val createCommentWarning = logAppender.loggingEvents - .filter(_.getLevel == Level.WARN) - .map(_.getMessage.getFormattedMessage) - .exists(_.contains("catalog comment")) - assert(createCommentWarning === false) - - catalog.dropNamespace(Array("foo"), cascade = false) - assert(catalog.namespaceExists(Array("foo")) === false) - assert(catalog.listNamespaces() === builtinNamespaces) - val msg = intercept[AnalysisException] { - catalog.listNamespaces(Array("foo")) - }.getMessage - assert(msg.contains("Namespace 'foo' not found")) + def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + Array(namespace) ++ builtinNamespaces } - test("Drop namespace") { - val ident1 = Identifier.of(Array("foo"), "tab") - // Drop empty namespace without cascade - catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) - assert(catalog.namespaceExists(Array("foo")) === true) - catalog.dropNamespace(Array("foo"), cascade = false) - assert(catalog.namespaceExists(Array("foo")) === false) - - // Drop non empty namespace without cascade - catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) - assert(catalog.namespaceExists(Array("foo")) === true) - catalog.createTable(ident1, schema, Array.empty, emptyProps) - intercept[NonEmptyNamespaceException] { - catalog.dropNamespace(Array("foo"), cascade = false) + def supportsSchemaComment: Boolean = true + + def supportsDropSchemaCascade: Boolean = true + + def supportsDropSchemaRestrict: Boolean = true + + def testListNamespaces(): Unit = { + test("listNamespaces: basic behavior") { + val commentMap = if (supportsSchemaComment) { + Map("comment" -> "test comment") + } else { + Map.empty[String, String] + } + catalog.createNamespace(Array("foo"), commentMap.asJava) + assert(catalog.listNamespaces() === listNamespaces(Array("foo"))) + assert(catalog.listNamespaces(Array("foo")) === Array()) + assert(catalog.namespaceExists(Array("foo")) === true) + + if (supportsSchemaComment) { + val logAppender = new LogAppender("catalog comment") + withLogAppender(logAppender) { + catalog.alterNamespace(Array("foo"), NamespaceChange + .setProperty("comment", "comment for foo")) + catalog.alterNamespace(Array("foo"), NamespaceChange.removeProperty("comment")) + } + val createCommentWarning = logAppender.loggingEvents + .filter(_.getLevel == Level.WARN) + .map(_.getMessage.getFormattedMessage) + .exists(_.contains("catalog comment")) + assert(createCommentWarning === false) + } + + if (supportsDropSchemaRestrict) { + catalog.dropNamespace(Array("foo"), cascade = false) + } else { + catalog.dropNamespace(Array("foo"), cascade = true) + } + assert(catalog.namespaceExists(Array("foo")) === false) + assert(catalog.listNamespaces() === builtinNamespaces) + val msg = intercept[AnalysisException] { + catalog.listNamespaces(Array("foo")) + }.getMessage + assert(msg.contains("Namespace 'foo' not found")) } + } + + def testDropNamespaces(): Unit = { + test("Drop namespace") { + val ident1 = Identifier.of(Array("foo"), "tab") + // Drop empty namespace without cascade + val commentMap = if (supportsSchemaComment) { + Map("comment" -> "test comment") + } else { + Map.empty[String, String] + } + catalog.createNamespace(Array("foo"), commentMap.asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + if (supportsDropSchemaRestrict) { + catalog.dropNamespace(Array("foo"), cascade = false) + } else { + catalog.dropNamespace(Array("foo"), cascade = true) + } + assert(catalog.namespaceExists(Array("foo")) === false) - // Drop non empty namespace with cascade - assert(catalog.namespaceExists(Array("foo")) === true) - catalog.dropNamespace(Array("foo"), cascade = true) - assert(catalog.namespaceExists(Array("foo")) === false) + // Drop non empty namespace without cascade + catalog.createNamespace(Array("foo"), commentMap.asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.createTable(ident1, schema, Array.empty, emptyProps) + if (supportsDropSchemaRestrict) { + intercept[NonEmptyNamespaceException] { + catalog.dropNamespace(Array("foo"), cascade = false) + } + } + + // Drop non empty namespace with cascade + if (supportsDropSchemaCascade) { + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.dropNamespace(Array("foo"), cascade = true) + assert(catalog.namespaceExists(Array("foo")) === false) + } + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 384016216f668..96bfd2e1f1adf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1952,4 +1952,16 @@ object QueryExecutionErrors { new IllegalArgumentException( s"The input string '$input' does not match the given number format: '$format'") } + + def unsupportedCreateNamespaceCommentError(): Throwable = { + new SQLFeatureNotSupportedException("Create namespace comment is not supported") + } + + def unsupportedRemoveNamespaceCommentError(): Throwable = { + new SQLFeatureNotSupportedException("Remove namespace comment is not supported") + } + + def unsupportedDropNamespaceRestrictError(): Throwable = { + new SQLFeatureNotSupportedException("Drop namespace restrict is not supported") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cc40d19693b4d..549cb3c7a8657 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -979,8 +979,23 @@ object JdbcUtils extends Logging with SQLConfHelper { namespace: String, comment: String): Unit = { val dialect = JdbcDialects.get(options.url) + val schemaCommentQuery = if (comment.isEmpty) { + comment + } else { + dialect.getSchemaCommentQuery(namespace, comment) + } executeStatement(conn, options, s"CREATE SCHEMA ${dialect.quoteIdentifier(namespace)}") - if (!comment.isEmpty) createNamespaceComment(conn, options, namespace, comment) + if (comment.nonEmpty) executeStatement(conn, options, schemaCommentQuery) + } + + def namespaceExists(conn: Connection, options: JDBCOptions, namespace: String): Boolean = { + val dialect = JdbcDialects.get(options.url) + dialect.namespacesExists(conn, options, namespace) + } + + def listNamespaces(conn: Connection, options: JDBCOptions): Array[Array[String]] = { + val dialect = JdbcDialects.get(options.url) + dialect.listNamespaces(conn, options) } def createNamespaceComment( @@ -989,13 +1004,7 @@ object JdbcUtils extends Logging with SQLConfHelper { namespace: String, comment: String): Unit = { val dialect = JdbcDialects.get(options.url) - try { - executeStatement( - conn, options, dialect.getSchemaCommentQuery(namespace, comment)) - } catch { - case e: Exception => - logWarning("Cannot create JDBC catalog comment. The catalog comment will be ignored.") - } + executeStatement(conn, options, dialect.getSchemaCommentQuery(namespace, comment)) } def removeNamespaceComment( @@ -1003,12 +1012,7 @@ object JdbcUtils extends Logging with SQLConfHelper { options: JDBCOptions, namespace: String): Unit = { val dialect = JdbcDialects.get(options.url) - try { - executeStatement(conn, options, dialect.removeSchemaCommentQuery(namespace)) - } catch { - case e: Exception => - logWarning("Cannot drop JDBC catalog comment.") - } + executeStatement(conn, options, dialect.removeSchemaCommentQuery(namespace)) } /** @@ -1017,12 +1021,7 @@ object JdbcUtils extends Logging with SQLConfHelper { def dropNamespace( conn: Connection, options: JDBCOptions, namespace: String, cascade: Boolean): Unit = { val dialect = JdbcDialects.get(options.url) - val dropCmd = if (cascade) { - s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)} CASCADE" - } else { - s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}" - } - executeStatement(conn, options, dropCmd) + executeStatement(conn, options, dialect.dropSchema(namespace, cascade)) } /** @@ -1153,11 +1152,17 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = { + def executeQuery(conn: Connection, options: JDBCOptions, sql: String)( + f: ResultSet => Unit): Unit = { val statement = conn.createStatement try { statement.setQueryTimeout(options.queryTimeout) - statement.executeQuery(sql) + val rs = statement.executeQuery(sql) + try { + f(rs) + } finally { + rs.close() + } } finally { statement.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index d06a28d952b38..462a09a786d34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -21,7 +21,6 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.ArrayBuilder import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} @@ -173,23 +172,14 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging override def namespaceExists(namespace: Array[String]): Boolean = namespace match { case Array(db) => JdbcUtils.withConnection(options) { conn => - val rs = conn.getMetaData.getSchemas(null, db) - while (rs.next()) { - if (rs.getString(1) == db) return true; - } - false + JdbcUtils.namespaceExists(conn, options, db) } case _ => false } override def listNamespaces(): Array[Array[String]] = { JdbcUtils.withConnection(options) { conn => - val schemaBuilder = ArrayBuilder.make[Array[String]] - val rs = conn.getMetaData.getSchemas() - while (rs.next()) { - schemaBuilder += Array(rs.getString(1)) - } - schemaBuilder.result + JdbcUtils.listNamespaces(conn, options) } } @@ -254,7 +244,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case set: NamespaceChange.SetProperty => if (set.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.createNamespaceComment(conn, options, db, set.value) + JdbcUtils.classifyException(s"Failed create comment on name space: $db", dialect) { + JdbcUtils.createNamespaceComment(conn, options, db, set.value) + } } } else { throw QueryCompilationErrors.cannotSetJDBCNamespaceWithPropertyError(set.property) @@ -263,7 +255,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case unset: NamespaceChange.RemoveProperty => if (unset.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.removeNamespaceComment(conn, options, db) + JdbcUtils.classifyException(s"Failed remove comment on name space: $db", dialect) { + JdbcUtils.removeNamespaceComment(conn, options, db) + } } } else { throw QueryCompilationErrors.cannotUnsetJDBCNamespaceWithPropertyError(unset.property) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 307aa511cc152..baa772f4546a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.jdbc -import java.sql.Types +import java.sql.{SQLException, Types} import java.util.Locale +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc} import org.apache.spark.sql.types._ @@ -101,4 +103,28 @@ private object DB2Dialect extends JdbcDialect { val nullable = if (isNullable) "DROP NOT NULL" else "SET NOT NULL" s"ALTER TABLE $tableName ALTER COLUMN ${quoteIdentifier(columnName)} $nullable" } + + override def removeSchemaCommentQuery(schema: String): String = { + s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''" + } + + override def classifyException(message: String, e: Throwable): AnalysisException = { + e match { + case sqlException: SQLException => + sqlException.getSQLState match { + // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate + case "42893" => throw NonEmptyNamespaceException(message, cause = Some(e)) + case _ => super.classifyException(message, e) + } + case _ => super.classifyException(message, e) + } + } + + override def dropSchema(schema: String, cascade: Boolean): String = { + if (cascade) { + s"DROP SCHEMA ${quoteIdentifier(schema)} CASCADE" + } else { + s"DROP SCHEMA ${quoteIdentifier(schema)} RESTRICT" + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 7b8b362e64c6d..32bdf689e98e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -229,6 +229,29 @@ abstract class JdbcDialect extends Serializable with Logging{ } } + /** + * Check namespace exists or not. + */ + def namespacesExists(conn: Connection, options: JDBCOptions, namespace: String): Boolean = { + val rs = conn.getMetaData.getSchemas(null, namespace) + while (rs.next()) { + if (rs.getString(1) == namespace) return true; + } + false + } + + /** + * Lists all the schemas in this table. + */ + def listNamespaces(conn: Connection, options: JDBCOptions): Array[Array[String]] = { + val schemaBuilder = ArrayBuilder.make[Array[String]] + val rs = conn.getMetaData.getSchemas() + while (rs.next()) { + schemaBuilder += Array(rs.getString(1)) + } + schemaBuilder.result + } + /** * Return Some[true] iff `TRUNCATE TABLE` causes cascading default. * Some[true] : TRUNCATE TABLE causes cascading. @@ -327,6 +350,14 @@ abstract class JdbcDialect extends Serializable with Logging{ s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS NULL" } + def dropSchema(schema: String, cascade: Boolean): String = { + if (cascade) { + s"DROP SCHEMA ${quoteIdentifier(schema)} CASCADE" + } else { + s"DROP SCHEMA ${quoteIdentifier(schema)}" + } + } + /** * Build a create index SQL statement. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 442c5599b3ab3..3d8a48a66ea8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.jdbc +import java.sql.SQLException import java.util.Locale +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf @@ -147,4 +150,15 @@ private object MsSqlServerDialect extends JdbcDialect { override def getLimitClause(limit: Integer): String = { "" } + + override def classifyException(message: String, e: Throwable): AnalysisException = { + e match { + case sqlException: SQLException => + sqlException.getErrorCode match { + case 3729 => throw NonEmptyNamespaceException(message, cause = Some(e)) + case _ => super.classifyException(message, e) + } + case _ => super.classifyException(message, e) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 9fcb7a27d17af..2bfb37982476c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -21,6 +21,8 @@ import java.sql.{Connection, SQLException, Types} import java.util import java.util.Locale +import scala.collection.mutable.ArrayBuilder + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} @@ -76,6 +78,26 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { s"`$colName`" } + override def namespacesExists( + conn: Connection, options: JDBCOptions, namespace: String): Boolean = { + listNamespaces(conn, options).exists(_.head == namespace) + } + + override def listNamespaces(conn: Connection, options: JDBCOptions): Array[Array[String]] = { + val schemaBuilder = ArrayBuilder.make[Array[String]] + try { + JdbcUtils.executeQuery(conn, options, "SHOW SCHEMAS") { rs => + while (rs.next()) { + schemaBuilder += Array(rs.getString("Database")) + } + } + } catch { + case _: Exception => + logWarning("Cannot show schemas.") + } + schemaBuilder.result + } + override def getTableExistsQuery(table: String): String = { s"SELECT 1 FROM $table LIMIT 1" } @@ -134,6 +156,14 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { case _ => JdbcUtils.getCommonJDBCType(dt) } + override def getSchemaCommentQuery(schema: String, comment: String): String = { + throw QueryExecutionErrors.unsupportedCreateNamespaceCommentError() + } + + override def removeSchemaCommentQuery(schema: String): String = { + throw QueryExecutionErrors.unsupportedRemoveNamespaceCommentError() + } + // CREATE INDEX syntax // https://dev.mysql.com/doc/refman/8.0/en/create-index.html override def createIndex( @@ -175,26 +205,27 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { val sql = s"SHOW INDEXES FROM $tableName" var indexMap: Map[String, TableIndex] = Map() try { - val rs = JdbcUtils.executeQuery(conn, options, sql) - while (rs.next()) { - val indexName = rs.getString("key_name") - val colName = rs.getString("column_name") - val indexType = rs.getString("index_type") - val indexComment = rs.getString("Index_comment") - if (indexMap.contains(indexName)) { - val index = indexMap.get(indexName).get - val newIndex = new TableIndex(indexName, indexType, - index.columns() :+ FieldReference(colName), - index.columnProperties, index.properties) - indexMap += (indexName -> newIndex) - } else { - // The only property we are building here is `COMMENT` because it's the only one - // we can get from `SHOW INDEXES`. - val properties = new util.Properties(); - if (indexComment.nonEmpty) properties.put("COMMENT", indexComment) - val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)), - new util.HashMap[NamedReference, util.Properties](), properties) - indexMap += (indexName -> index) + JdbcUtils.executeQuery(conn, options, sql) { rs => + while (rs.next()) { + val indexName = rs.getString("key_name") + val colName = rs.getString("column_name") + val indexType = rs.getString("index_type") + val indexComment = rs.getString("Index_comment") + if (indexMap.contains(indexName)) { + val index = indexMap.get(indexName).get + val newIndex = new TableIndex(indexName, indexType, + index.columns() :+ FieldReference(colName), + index.columnProperties, index.properties) + indexMap += (indexName -> newIndex) + } else { + // The only property we are building here is `COMMENT` because it's the only one + // we can get from `SHOW INDEXES`. + val properties = new util.Properties(); + if (indexComment.nonEmpty) properties.put("COMMENT", indexComment) + val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)), + new util.HashMap[NamedReference, util.Properties](), properties) + indexMap += (indexName -> index) + } } } } catch { @@ -219,4 +250,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { case _ => super.classifyException(message, e) } } + + override def dropSchema(schema: String, cascade: Boolean): String = { + if (cascade) { + s"DROP SCHEMA ${quoteIdentifier(schema)}" + } else { + throw QueryExecutionErrors.unsupportedDropNamespaceRestrictError() + } + } }