diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 592f7d668d08..71adc51b8744 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -117,4 +117,6 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { } override def supportsIndex: Boolean = true + + override def indexOptions: String = "KEY_BLOCK_SIZE=10" } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 1a1a592d00bc..7fba6671ffe7 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -80,4 +80,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTes } override def supportsTableSample: Boolean = true + + override def supportsIndex: Boolean = true + + override def indexOptions: String = "FILLFACTOR=70" } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index d29205155015..2f50c876c2bd 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -191,6 +191,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def supportsIndex: Boolean = false + def indexOptions: String = "" + test("SPARK-36895: Test INDEX Using SQL") { if (supportsIndex) { withTable(s"$catalogName.new_table") { @@ -208,11 +210,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") }.getMessage assert(m.contains(s"Index Type $indexType is not supported." + - s" The supported Index Types are: BTREE and HASH")) + s" The supported Index Types are:")) sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" + - s" OPTIONS (KEY_BLOCK_SIZE=10)") + s" OPTIONS ($indexOptions)") assert(jdbcTable.indexExists("i1") == true) assert(jdbcTable.indexExists("i2") == true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 526f9d07aac6..48150777d187 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -23,6 +23,8 @@ import java.util import java.util.Locale import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import scala.util.Try import scala.util.control.NonFatal @@ -38,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp} import org.apache.spark.sql.connector.catalog.TableChange -import org.apache.spark.sql.connector.catalog.index.TableIndex +import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider @@ -1073,6 +1075,73 @@ object JdbcUtils extends Logging with SQLConfHelper { } } + /** + * Check if index exists in a table + */ + def checkIfIndexExists( + conn: Connection, + sql: String, + options: JDBCOptions): Boolean = { + val statement = conn.createStatement + try { + statement.setQueryTimeout(options.queryTimeout) + val rs = statement.executeQuery(sql) + rs.next + } catch { + case _: Exception => + logWarning("Cannot retrieved index info.") + false + } finally { + statement.close() + } + } + + /** + * Process index properties and return tuple of indexType and list of the other index properties. + */ + def processIndexProperties( + properties: util.Map[String, String], + catalogName: String): (String, Array[String]) = { + var indexType = "" + val indexPropertyList: ArrayBuffer[String] = ArrayBuffer[String]() + val supportedIndexTypeList = getSupportedIndexTypeList(catalogName) + + if (!properties.isEmpty) { + properties.asScala.foreach { case (k, v) => + if (k.equals(SupportsIndex.PROP_TYPE)) { + if (containsIndexTypeIgnoreCase(supportedIndexTypeList, v)) { + indexType = s"USING $v" + } else { + throw new UnsupportedOperationException(s"Index Type $v is not supported." + + s" The supported Index Types are: ${supportedIndexTypeList.mkString(" AND ")}") + } + } else { + indexPropertyList.append(s"$k = $v") + } + } + } + (indexType, indexPropertyList.toArray) + } + + def containsIndexTypeIgnoreCase(supportedIndexTypeList: Array[String], value: String): Boolean = { + if (supportedIndexTypeList.isEmpty) { + throw new UnsupportedOperationException( + "Cannot specify 'USING index_type' in 'CREATE INDEX'") + } + for (indexType <- supportedIndexTypeList) { + if (value.equalsIgnoreCase(indexType)) return true + } + false + } + + def getSupportedIndexTypeList(catalogName: String): Array[String] = { + catalogName match { + case "mysql" => Array("BTREE", "HASH") + case "postgresql" => Array("BTREE", "HASH", "BRIN") + case _ => Array.empty + } + } + def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = { val statement = conn.createStatement try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 28e15b461c84..fb98996e6bf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -21,12 +21,10 @@ import java.sql.{Connection, SQLException, Types} import java.util import java.util.Locale -import scala.collection.JavaConverters._ - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} -import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} +import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -120,25 +118,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): String = { val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head)) - var indexProperties: String = "" - var indexType = "" - if (!properties.isEmpty) { - properties.asScala.foreach { case (k, v) => - if (k.equals(SupportsIndex.PROP_TYPE)) { - if (v.equalsIgnoreCase("BTREE") || v.equalsIgnoreCase("HASH")) { - indexType = s"USING $v" - } else { - throw new UnsupportedOperationException(s"Index Type $v is not supported." + - " The supported Index Types are: BTREE and HASH") - } - } else { - indexProperties = indexProperties + " " + s"$k $v" - } - } - } + val (indexType, indexPropertyList) = JdbcUtils.processIndexProperties(properties, "mysql") + // columnsProperties doesn't apply to MySQL so it is ignored s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" + - s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties" + s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")})" + + s" ${indexPropertyList.mkString(" ")}" } // SHOW INDEX syntax @@ -148,21 +133,8 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexName: String, tableName: String, options: JDBCOptions): Boolean = { - val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)}" - try { - val rs = JdbcUtils.executeQuery(conn, options, sql) - while (rs.next()) { - val retrievedIndexName = rs.getString("key_name") - if (conf.resolver(retrievedIndexName, indexName)) { - return true - } - } - false - } catch { - case _: Exception => - logWarning("Cannot retrieved index info.") - false - } + val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)} WHERE key_name = '$indexName'" + JdbcUtils.checkIfIndexExists(conn, sql, options) } override def dropIndex(indexName: String, tableName: String): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 317ae19ed914..356cb4ddbd00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -17,15 +17,20 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, Types} +import java.sql.{Connection, SQLException, Types} +import java.util import java.util.Locale +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.types._ -private object PostgresDialect extends JdbcDialect { +private object PostgresDialect extends JdbcDialect with SQLConfHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql") @@ -164,4 +169,56 @@ private object PostgresDialect extends JdbcDialect { s"TABLESAMPLE BERNOULLI" + s" (${(sample.upperBound - sample.lowerBound) * 100}) REPEATABLE (${sample.seed})" } + + // CREATE INDEX syntax + // https://www.postgresql.org/docs/14/sql-createindex.html + override def createIndex( + indexName: String, + tableName: String, + columns: Array[NamedReference], + columnsProperties: util.Map[NamedReference, util.Map[String, String]], + properties: util.Map[String, String]): String = { + val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head)) + var indexProperties = "" + val (indexType, indexPropertyList) = JdbcUtils.processIndexProperties(properties, "postgresql") + + if (indexPropertyList.nonEmpty) { + indexProperties = "WITH (" + indexPropertyList.mkString(", ") + ")" + } + + s"CREATE INDEX ${quoteIdentifier(indexName)} ON ${quoteIdentifier(tableName)}" + + s" $indexType (${columnList.mkString(", ")}) $indexProperties" + } + + // SHOW INDEX syntax + // https://www.postgresql.org/docs/14/view-pg-indexes.html + override def indexExists( + conn: Connection, + indexName: String, + tableName: String, + options: JDBCOptions): Boolean = { + val sql = s"SELECT * FROM pg_indexes WHERE tablename = '$tableName' AND" + + s" indexname = '$indexName'" + JdbcUtils.checkIfIndexExists(conn, sql, options) + } + + // DROP INDEX syntax + // https://www.postgresql.org/docs/14/sql-dropindex.html + override def dropIndex(indexName: String, tableName: String): String = { + s"DROP INDEX ${quoteIdentifier(indexName)}" + } + + override def classifyException(message: String, e: Throwable): AnalysisException = { + e match { + case sqlException: SQLException => + sqlException.getSQLState match { + // https://www.postgresql.org/docs/14/errcodes-appendix.html + case "42P07" => throw new IndexAlreadyExistsException(message, cause = Some(e)) + case "42704" => throw new NoSuchIndexException(message, cause = Some(e)) + case _ => super.classifyException(message, e) + } + case unsupported: UnsupportedOperationException => throw unsupported + case _ => super.classifyException(message, e) + } + } }