diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b8bed8569ace0..7050439cad6c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,10 +34,11 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoDir, InsertIntoTable, LogicalPlan, OverwriteByExpression, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -240,6 +241,12 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] }) } + private def fallbackToV1Relation( + t: DataSourceV1Table, + existingOutput: Seq[AttributeReference]): LogicalRelation = { + LogicalRelation(t.v1Relation, existingOutput, catalogTable = None, isStreaming = false) + } + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => @@ -253,6 +260,16 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] case UnresolvedCatalogRelation(tableMeta) => DDLUtils.readHiveTable(tableMeta) + + case AppendData(DataSourceV2Relation(t: DataSourceV1Table, output, _), query, false) => + InsertIntoDataSourceCommand(fallbackToV1Relation(t, output), query, overwrite = false) + + case OverwriteByExpression( + DataSourceV2Relation(t: DataSourceV1Table, output, _), Literal(true, _), query, false) => + InsertIntoDataSourceCommand(fallbackToV1Relation(t, output), query, overwrite = true) + + case DataSourceV2Relation(t: DataSourceV1Table, output, _) => + fallbackToV1Relation(t, output) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 86a27b5afc250..29176259390b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -183,7 +183,7 @@ object JdbcUtils extends Logging { } } - private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { + def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType = { dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse( throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 52e2896536355..4953c1959e474 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources +import org.apache.spark.sql.sources.DataSourceV1TableCatalog import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -170,6 +171,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case staging: StagingTableCatalog => AtomicCreateTableAsSelectExec( staging, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil + case v1Catalog: DataSourceV1TableCatalog => + CreateV1TableAsSelectExec( + v1Catalog, ident, parts, query, props, writeOptions, ifNotExists) :: Nil case _ => CreateTableAsSelectExec( catalog, ident, parts, planLater(query), props, writeOptions, ifNotExists) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 86b64cb8835ad..95cb168a7787a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -26,14 +26,15 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.sources.{AlwaysTrue, Filter} +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.sources.{AlwaysTrue, DataSourceV1Table, DataSourceV1TableCatalog, Filter, InsertableRelation} import org.apache.spark.sql.sources.v2.{StagedTable, SupportsWrite} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -50,6 +51,43 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) override def output: Seq[Attribute] = Nil } +case class CreateV1TableAsSelectExec( + catalog: DataSourceV1TableCatalog, + ident: Identifier, + partitioning: Seq[Transform], + query: LogicalPlan, + properties: Map[String, String], + writeOptions: CaseInsensitiveStringMap, + ifNotExists: Boolean) extends LeafExecNode { + + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { + if (catalog.tableExists(ident)) { + if (ifNotExists) { + return sparkContext.emptyRDD[InternalRow] + } + + throw new TableAlreadyExistsException(ident) + } + + Utils.tryWithSafeFinallyAndFailureCallbacks({ + catalog.createTable( + ident, query.schema, partitioning.toArray, properties.asJava) match { + case table: DataSourceV1Table => + table.v1Relation.asInstanceOf[InsertableRelation].insert( + Dataset.ofRows(sqlContext.sparkSession, query), overwrite = false) + sparkContext.emptyRDD[InternalRow] + + case _ => + throw new SparkException(s"DataSourceV1TableCatalog must create DataSourceV1Table.") + } + })(catchBlock = { + catalog.dropTable(ident) + }) + } +} + /** * Physical plan node for v2 create table as select when the catalog does not support staging * the table creation. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala new file mode 100644 index 0000000000000..864c32a8a3eac --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2.jdbc + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation} +import org.apache.spark.sql.sources.{BaseRelation, DataSourceV1Table} +import org.apache.spark.sql.types.StructType + +case class JDBCTable( + ident: Identifier, + schema: StructType, + jdbcOptions: JDBCOptions) extends DataSourceV1Table { + assert(ident.namespace().length == 1) + + override def name(): String = ident.toString + + override def v1Relation: BaseRelation = { + JDBCRelation( + schema, + // TODO: support column partitioning after we support table properties in JDBC table. + Array(JDBCPartition(null, 0)), + jdbcOptions)(SparkSession.active) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala new file mode 100644 index 0000000000000..898e0789cf7d6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2.jdbc + +import java.sql.{Connection, SQLException} +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalog.v2.{Identifier, TableChange} +import org.apache.spark.sql.catalog.v2.expressions.Transform +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException} +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRDD, JdbcUtils} +import org.apache.spark.sql.jdbc.JdbcDialects +import org.apache.spark.sql.sources.DataSourceV1TableCatalog +import org.apache.spark.sql.sources.v2.Table +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class JDBCTableCatalog extends DataSourceV1TableCatalog with Logging { + + private var _name: String = _ + private var options: JDBCOptions = _ + + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { + _name = name + val map = options.asCaseSensitiveMap().asScala.toMap + // The `JDBCOptions` checks the existence of the table option. This is required by JDBC v1, but + // JDBC V2 only knows the table option when loading a table. Here we put a table option with a + // fake value, so that it can pass the check of `JDBCOptions`. + this.options = new JDBCOptions(map + (JDBCOptions.JDBC_TABLE_NAME -> "__invalid")) + } + + override def name(): String = { + _name + } + + private def withConnection[T](f: Connection => T): T = { + val conn = JdbcUtils.createConnectionFactory(options)() + try { + f(conn) + } finally { + conn.close() + } + } + + private def checkNamespace(namespace: Array[String]): Unit = { + // In JDBC the tables must be in a database. + // TODO: support default database. + if (namespace.length != 1) { + throw new NoSuchNamespaceException(namespace) + } + } + + private def createOptionsWithTableName(ident: Identifier): JDBCOptions = { + // TODO: if table name contains special chars, we should quote it w.r.t. the JDBC dialect. + val tblName = (ident.namespace() :+ ident.name()).mkString(".") + new JDBCOptions(options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> tblName)) + } + + override def listTables(namespace: Array[String]): Array[Identifier] = { + // TODO: implement it when SHOW TABLES command support DS V2. + throw new UnsupportedOperationException("list table") + } + + override def loadTable(ident: Identifier): Table = { + checkNamespace(ident.namespace()) + val optionsWithTableName = createOptionsWithTableName(ident) + try { + val schema = JDBCRDD.resolveTable(optionsWithTableName) + JDBCTable(ident, schema, optionsWithTableName) + } catch { + case _: SQLException => throw new NoSuchTableException(ident) + } + } + + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + if (!partitions.isEmpty) { + throw new UnsupportedOperationException("Cannot create JDBC table with partition") + } + // TODO: we can support this, but we need to add an API to `JdbcDialect` to generate the SQL + // statement to specify table options. Many options are not supported because of no table + // properties, e.g. the custom schema option, the partition column option, etc. + if (!properties.isEmpty) { + logWarning("Cannot create JDBC table with properties, these properties will be " + + "ignored: " + properties.asScala.map { case (k, v) => s"$k=$v" }.mkString("[", ", ", "]")) + } + + val sb = new StringBuilder() + val dialect = JdbcDialects.get(options.url) + schema.fields.foreach { field => + val name = dialect.quoteIdentifier(field.name) + val typ = JdbcUtils.getJdbcType(field.dataType, dialect).databaseTypeDefinition + val nullable = if (field.nullable) "" else "NOT NULL" + sb.append(s", $name $typ $nullable") + } + // TODO: support the `JDBC_CREATE_TABLE_COLUMN_TYPES` option, after we support table properties. + val schemaStr = if (sb.length < 2) "" else sb.substring(2) + val sql = s"CREATE TABLE $ident ($schemaStr)" + withConnection { conn => + val statement = conn.createStatement + statement.setQueryTimeout(options.queryTimeout) + statement.executeUpdate(sql) + } + + JDBCTable(ident, schema, createOptionsWithTableName(ident)) + } + + override def alterTable(ident: Identifier, changes: TableChange*): Table = { + // TODO: support this by adding more APIs to `JdbcDialect` which can generate the SQL statement + // to alter a table. + throw new UnsupportedOperationException("alter table") + } + + override def dropTable(ident: Identifier): Boolean = { + try { + withConnection { conn => + val statement = conn.createStatement + statement.setQueryTimeout(options.queryTimeout) + statement.executeUpdate(s"DROP TABLE $ident") + true + } + } catch { + case _: SQLException => false + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6ad054c9f6403..cb279bd42d140 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.sources +import java.util + import org.apache.spark.annotation._ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalog.v2.TableCatalog import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.sources.v2.{Table, TableCapability} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -313,3 +316,27 @@ trait InsertableRelation { trait CatalystScan { def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] } + +/** + * A special `TableCatalog` which returns `DataSourceV1Table`. + * + * @since 3.0.0 + */ +@Experimental +@Unstable +trait DataSourceV1TableCatalog extends TableCatalog + +/** + * A special Data Source V2 `Table`, which doesn't need to implement the read/write capabilities. + * Spark will fallback the read/write requests to the v1 relation. + * + * @since 3.0.0 + */ +@Experimental +@Unstable +trait DataSourceV1Table extends Table { + + override def capabilities(): util.Set[TableCapability] = util.Collections.emptySet() + + def v1Relation: BaseRelation +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala new file mode 100644 index 0000000000000..f1ab5697396f1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.{Connection, DriverManager} +import java.util.{Locale, Properties} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils + +class JDBCV2Suite extends QueryTest with SharedSparkSession { + import testImplicits._ + + val tempDir = Utils.createTempDir() + val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" + var conn: java.sql.Connection = null + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.jdbc", classOf[JDBCTableCatalog].getName) + .set("spark.sql.catalog.jdbc.url", url) + .set("spark.sql.catalog.jdbc.driver", "org.h2.Driver") + + private def withConnection[T](f: Connection => T): T = { + val conn = DriverManager.getConnection(url, new Properties()) + try { + f(conn) + } finally { + conn.close() + } + } + + override def beforeAll(): Unit = { + super.beforeAll() + Utils.classForName("org.h2.Driver") + withConnection { conn => + conn.prepareStatement("CREATE SCHEMA test").executeUpdate() + conn.prepareStatement( + "CREATE TABLE test.people (name TEXT(32) NOT NULL, id INTEGER NOT NULL)").executeUpdate() + conn.prepareStatement("INSERT INTO test.people VALUES ('fred', 1)").executeUpdate() + conn.prepareStatement("INSERT INTO test.people VALUES ('mary', 2)").executeUpdate() + } + } + + override def afterAll(): Unit = { + Utils.deleteRecursively(tempDir) + super.afterAll() + } + + private def listTables(): Seq[String] = { + withConnection { conn => + val resultSet = conn.getMetaData.getTables(null, "TEST", "%", Array("TABLE")) + val tables = ArrayBuffer.empty[String] + while (resultSet.next()) { + val tblName = resultSet.getString(3) + tables.append(tblName) + } + tables.map(_.toLowerCase(Locale.ROOT)) + } + } + + test("simple scan") { + checkAnswer(sql("SELECT name, id FROM jdbc.test.people"), Seq(Row("fred", 1), Row("mary", 2))) + } + + test("create/drop table") { + // TODO: currently CREATE TABLE without USING will be treated as Hive style CREATE TABLE, which + // is unexpected. + sql("CREATE TABLE jdbc.test.abc(i INT, j STRING) USING x") + assert(listTables().contains("abc")) + sql("DROP TABLE jdbc.test.abc") + assert(!listTables().contains("abc")) + } + + test("create table as select") { + withTable("jdbc.test.abc") { + sql("CREATE TABLE jdbc.test.abc USING x AS SELECT * FROM jdbc.test.people") + checkAnswer(sql("SELECT name, id FROM jdbc.test.abc"), Seq(Row("fred", 1), Row("mary", 2))) + } + } + + test("insert") { + withTable("jdbc.test.abc") { + sql("CREATE TABLE jdbc.test.abc USING x AS SELECT * FROM jdbc.test.people") + sql("INSERT INTO jdbc.test.abc SELECT 'lucy', 3") + checkAnswer( + sql("SELECT name, id FROM jdbc.test.abc"), + Seq(Row("fred", 1), Row("mary", 2), Row("lucy", 3))) + sql("INSERT OVERWRITE jdbc.test.abc SELECT 'bob', 4") + checkAnswer(sql("SELECT name, id FROM jdbc.test.abc"), Row("bob", 4)) + } + } +}