diff --git a/sql/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sql/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000000..eb074c6ae3fca --- /dev/null +++ b/sql/core/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +mock-maker-inline diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index c047603934d26..496c454509bcf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -19,20 +19,22 @@ package org.apache.spark.sql.errors import java.io.{File, IOException} import java.net.{URI, URL} -import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData} +import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData} import java.util.{Locale, Properties, ServiceConfigurationError} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{mock, spy, when} import org.apache.spark._ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.util.BadRecordException import org.apache.spark.sql.execution.datasources.jdbc.{DriverRegistry, JDBCOptions} +import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider import org.apache.spark.sql.execution.datasources.orc.OrcTest import org.apache.spark.sql.execution.datasources.parquet.ParquetTest +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager import org.apache.spark.sql.execution.streaming.state.RenameReturnsFalseFileSystem import org.apache.spark.sql.functions.{lit, lower, struct, sum, udf} @@ -658,6 +660,69 @@ class QueryExecutionErrorsSuite "sourcePath" -> s"$srcPath" )) } + + test("UNSUPPORTED_FEATURE.JDBC_TRANSACTION: the target JDBC server does not support " + + "transactions and can only support ALTER TABLE with a single action") { + withTempDir { tempDir => + val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" + Utils.classForName("org.h2.Driver") + var conn: java.sql.Connection = null + try { + conn = DriverManager.getConnection(url, new Properties()) + conn.prepareStatement("""CREATE SCHEMA "test"""").executeUpdate() + conn.prepareStatement( + """CREATE TABLE "test"."people" (name TEXT(32) NOT NULL, id INTEGER NOT NULL)""") + .executeUpdate() + conn.commit() + } finally { + if (null != conn) { + conn.close() + } + } + + val testH2DialectUnsupportedJdbcTransaction = new JdbcDialect { + override def canHandle(url: String): Boolean = url.startsWith("jdbc:h2") + + override def createConnectionFactory(options: JDBCOptions): Int => Connection = { + val driverClass: String = options.driverClass + + (_: Int) => { + DriverRegistry.register(driverClass) + val driver: Driver = DriverRegistry.get(driverClass) + val connection = ConnectionProvider.create( + driver, options.parameters, options.connectionProviderName) + val spyConnection = spy(connection) + val spyMetaData = spy(connection.getMetaData) + when(spyConnection.getMetaData).thenReturn(spyMetaData) + when(spyMetaData.supportsTransactions()).thenReturn(false) + + spyConnection + } + } + } + + withSQLConf( + "spark.sql.catalog.h2" -> classOf[JDBCTableCatalog].getName, + "spark.sql.catalog.h2.url" -> url, + "spark.sql.catalog.h2.driver" -> "org.h2.Driver") { + + val existedH2Dialect = JdbcDialects.get(url) + JdbcDialects.unregisterDialect(existedH2Dialect) + JdbcDialects.registerDialect(testH2DialectUnsupportedJdbcTransaction) + + val e = intercept[AnalysisException] { + sql("alter TABLE h2.test.people SET TBLPROPERTIES (xx='xx', yy='yy')") + } + + checkError( + exception = e.getCause.asInstanceOf[SparkSQLFeatureNotSupportedException], + errorClass = "UNSUPPORTED_FEATURE.JDBC_TRANSACTION", + parameters = Map.empty) + + JdbcDialects.unregisterDialect(testH2DialectUnsupportedJdbcTransaction) + } + } + } } class FakeFileSystemSetPermission extends LocalFileSystem {