From c01a5f1daf1f07464070fdd8597a6fde3bbc592a Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 8 Oct 2021 22:23:31 -0700 Subject: [PATCH 1/3] [SPARK-36914][SQL] Implement dropIndex and listIndexes in JDBC (MySQL dialect) --- .../sql/jdbc/v2/MySQLIntegrationSuite.scala | 33 +++---- .../apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 87 ++++++++++++++++++- .../catalog/index/SupportsIndex.java | 3 +- .../connector/catalog/index/TableIndex.java | 17 ++-- .../analysis/NoSuchItemException.scala | 4 +- .../datasources/jdbc/JdbcUtils.scala | 24 +++++ .../datasources/v2/jdbc/JDBCTable.scala | 13 ++- .../apache/spark/sql/jdbc/JdbcDialects.scala | 25 +++++- .../apache/spark/sql/jdbc/MySQLDialect.scala | 51 ++++++++++- 9 files changed, 212 insertions(+), 45 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 3cb878774f2e9..67e81087f8fb0 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -24,8 +24,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException -import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog @@ -121,31 +119,22 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { assert(t.schema === expectedSchema) } - override def testIndex(tbl: String): Unit = { - val loaded = Catalogs.load("mysql", conf) - val jdbcTable = loaded.asInstanceOf[TableCatalog] - .loadTable(Identifier.of(Array.empty[String], "new_table")) - .asInstanceOf[SupportsIndex] - assert(jdbcTable.indexExists("i1") == false) - assert(jdbcTable.indexExists("i2") == false) + override def supportsIndex: Boolean = true + override def testIndexProperties(jdbcTable: SupportsIndex): Unit = { val properties = new util.Properties(); properties.put("KEY_BLOCK_SIZE", "10") properties.put("COMMENT", "'this is a comment'") - jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), + // MySQL doesn't allow property set on individual column, so use empty Array for + // column properties + jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), Array.empty[util.Map[NamedReference, util.Properties]], properties) - jdbcTable.createIndex("i2", "", - Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")), - Array.empty[util.Map[NamedReference, util.Properties]], new util.Properties) - - assert(jdbcTable.indexExists("i1") == true) - assert(jdbcTable.indexExists("i2") == true) - - val m = intercept[IndexAlreadyExistsException] { - jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), - Array.empty[util.Map[NamedReference, util.Properties]], properties) - }.getMessage - assert(m.contains("Failed to create index: i1 in new_table")) + var index = jdbcTable.listIndexes() + // The index property size is actually 1. Even though the index is created + // with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when + // retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`. + assert(index(0).properties.size == 1) + assert(index(0).properties.get("COMMENT").equals("this is a comment")) } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index da57ed767602a..0e6551d670c67 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -17,9 +17,15 @@ package org.apache.spark.sql.jdbc.v2 +import java.util + import org.apache.log4j.Level import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.index.SupportsIndex +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -181,12 +187,85 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu } } - def testIndex(tbl: String): Unit = {} + def supportsIndex: Boolean = false + def testIndexProperties(jdbcTable: SupportsIndex): Unit = {} test("SPARK-36913: Test INDEX") { - withTable(s"$catalogName.new_table") { - sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)") - testIndex(s"$catalogName.new_table") + if (supportsIndex) { + withTable(s"$catalogName.new_table") { + sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," + + s" col4 INT, col5 INT)") + val loaded = Catalogs.load(catalogName, conf) + val jdbcTable = loaded.asInstanceOf[TableCatalog] + .loadTable(Identifier.of(Array.empty[String], "new_table")) + .asInstanceOf[SupportsIndex] + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == false) + + val properties = new util.Properties(); + jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + + jdbcTable.createIndex("i2", "", + Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + + assert(jdbcTable.indexExists("i1") == true) + assert(jdbcTable.indexExists("i2") == true) + + var m = intercept[IndexAlreadyExistsException] { + jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + }.getMessage + assert(m.contains("Failed to create index: i1 in new_table")) + + var index = jdbcTable.listIndexes() + assert(index.length == 2) + + assert(index(0).indexName.equals("i1")) + assert(index(0).indexType.equals("BTREE")) + var cols = index(0).columns + assert(cols.length == 1) + assert(cols(0).describe().equals("col1")) + assert(index(0).properties.size == 0) + + assert(index(1).indexName.equals("i2")) + assert(index(1).indexType.equals("BTREE")) + cols = index(1).columns + assert(cols.length == 3) + assert(cols(0).describe().equals("col2")) + assert(cols(1).describe().equals("col3")) + assert(cols(2).describe().equals("col5")) + assert(index(1).properties.size == 0) + + jdbcTable.dropIndex("i1") + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == true) + + index = jdbcTable.listIndexes() + assert(index.length == 1) + + assert(index(0).indexName.equals("i2")) + assert(index(0).indexType.equals("BTREE")) + cols = index(0).columns + assert(cols.length == 3) + assert(cols(0).describe().equals("col2")) + assert(cols(1).describe().equals("col3")) + assert(cols(2).describe().equals("col5")) + + jdbcTable.dropIndex("i2") + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == false) + index = jdbcTable.listIndexes() + assert(index.length == 0) + + m = intercept[NoSuchIndexException] { + jdbcTable.dropIndex("i2") + }.getMessage + assert(m.contains("Failed to drop index: i2")) + + testIndexProperties(jdbcTable) + } } } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java index 24961e460cc26..02ae6811a2cc8 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java @@ -55,10 +55,9 @@ void createIndex(String indexName, * Drops the index with the given name. * * @param indexName the name of the index to be dropped. - * @return true if the index is dropped * @throws NoSuchIndexException If the index does not exist (optional) */ - boolean dropIndex(String indexName) throws NoSuchIndexException; + void dropIndex(String indexName) throws NoSuchIndexException; /** * Checks whether an index exists in this table. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java index 99fce806a11b9..33e96180e12b0 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java @@ -53,27 +53,30 @@ public TableIndex( /** * @return the Index name. */ - String indexName() { return indexName; } + public String indexName() { return indexName; } /** * @return the indexType of this Index. */ - String indexType() { return indexType; } + public String indexType() { return indexType; } /** * @return the column(s) this Index is on. Could be multi columns (a multi-column index). */ - NamedReference[] columns() { return columns; } + public NamedReference[] columns() { return columns; } + + /** + * set columns using the passed in param columns + */ + public void columns_(NamedReference[] columns) { this.columns = columns; } /** * @return the map of column and column property map. */ - Map columnProperties() { return columnProperties; } + public Map columnProperties() { return columnProperties; } /** * Returns the index properties. */ - Properties properties() { - return properties; - } + public Properties properties() { return properties; } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 500121cb66791..805f3080c8472 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -108,5 +108,5 @@ case class NoSuchPartitionsException(override val message: String) case class NoSuchTempFunctionException(func: String) extends AnalysisException(s"Temporary function '$func' not found") -class NoSuchIndexException(indexName: String) - extends AnalysisException(s"Index '$indexName' not found") +class NoSuchIndexException(message: String, cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 168d16a3dd55a..d482245fb31fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp} import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider @@ -1040,6 +1041,29 @@ object JdbcUtils extends Logging with SQLConfHelper { dialect.indexExists(conn, indexName, tableName, options) } + /** + * Drop an index. + */ + def dropIndex( + conn: Connection, + indexName: String, + tableName: String, + options: JDBCOptions): Unit = { + val dialect = JdbcDialects.get(options.url) + executeStatement(conn, options, dialect.dropIndex(indexName, tableName)) + } + + /** + * List all the indexes in a table. + */ + def listIndexes( + conn: Connection, + tableName: String, + options: JDBCOptions): Array[TableIndex] = { + val dialect = JdbcDialects.get(options.url) + dialect.listIndexes(conn, tableName, options) + } + private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = { val statement = conn.createStatement try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 1db938e449d8f..23ff503308460 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -73,11 +73,18 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt } } - override def dropIndex(indexName: String): Boolean = { - throw new UnsupportedOperationException("dropIndex is not supported yet") + override def dropIndex(indexName: String): Unit = { + JdbcUtils.withConnection(jdbcOptions) { conn => + JdbcUtils.classifyException(s"Failed to drop index: $indexName", + JdbcDialects.get(jdbcOptions.url)) { + JdbcUtils.dropIndex(conn, indexName, name, jdbcOptions) + } + } } override def listIndexes(): Array[TableIndex] = { - throw new UnsupportedOperationException("listIndexes is not supported yet") + JdbcUtils.withConnection(jdbcOptions) { conn => + JdbcUtils.listIndexes(conn, name, jdbcOptions) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d1c4f8d30a634..eb3986ce79b36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange._ +import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -290,7 +291,7 @@ abstract class JdbcDialect extends Serializable with Logging{ } /** - * Creates an index. + * Build a create index SQL statement. * * @param indexName the name of the index to be created * @param indexType the type of the index to be created @@ -298,6 +299,7 @@ abstract class JdbcDialect extends Serializable with Logging{ * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created + * @return the SQL statement to use for creating the index. */ def createIndex( indexName: String, @@ -326,6 +328,27 @@ abstract class JdbcDialect extends Serializable with Logging{ throw new UnsupportedOperationException("indexExists is not supported") } + /** + * Build a drop index SQL statement. + * + * @param indexName the name of the index to be dropped. + * @param tableName the table name on which index to be dropped. + * @return the SQL statement to use for dropping the index. + */ + def dropIndex(indexName: String, tableName: String): String = { + throw new UnsupportedOperationException("dropIndex is not supported") + } + + /** + * Lists all the indexes in this table. + */ + def listIndexes( + conn: Connection, + tableName: String, + options: JDBCOptions): Array[TableIndex] = { + throw new UnsupportedOperationException("listIndexes is not supported") + } + /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param message The error message to be placed to the returned exception. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 5c16ef6a947ba..b462a58b0bf19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -25,8 +25,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException -import org.apache.spark.sql.connector.expressions.NamedReference +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.connector.catalog.index.TableIndex +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType, MetadataBuilder} @@ -129,8 +130,9 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { } // columnsProperties doesn't apply to MySQL so it is ignored - s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" + - s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")}) $indexProperties" + val iType = if (indexType.isEmpty) "" else s"USING $indexType" + s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" + + s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties" } // SHOW INDEX syntax @@ -157,6 +159,45 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { } } + override def dropIndex(indexName: String, tableName: String): String = { + s"DROP INDEX ${quoteIdentifier(indexName)} ON $tableName" + } + + // SHOW INDEX syntax + // https://dev.mysql.com/doc/refman/8.0/en/show-index.html + override def listIndexes( + conn: Connection, + tableName: String, + options: JDBCOptions): Array[TableIndex] = { + val sql = s"SHOW INDEXES FROM $tableName" + var indexMap: Map[String, TableIndex] = Map() + try { + val rs = JdbcUtils.executeQuery(conn, options, sql) + while (rs.next()) { + val indexName = rs.getString("key_name") + val colName = rs.getString("column_name") + val indexType = rs.getString("index_type") + val indexComment = rs.getString("Index_comment") + if (indexMap.contains(indexName)) { + val index = indexMap.get(indexName).get + index.columns_(index.columns() :+ FieldReference(colName)) + } else { + // The only property we are building here is `COMMENT` because it's the only one + // we can get from `SHOW INDEXES`. + val properties = new util.Properties(); + if (indexComment.nonEmpty) properties.put("COMMENT", indexComment) + val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)), + new util.HashMap[NamedReference, util.Properties](), properties) + indexMap += (indexName -> index) + } + } + } catch { + case _: Exception => + logWarning("Cannot retrieved index info.") + } + indexMap.values.toArray + } + override def classifyException(message: String, e: Throwable): AnalysisException = { if (e.isInstanceOf[SQLException]) { // Error codes are from @@ -165,6 +206,8 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // ER_DUP_KEYNAME case 1061 => throw new IndexAlreadyExistsException(message, cause = Some(e)) + case 1091 => + throw new NoSuchIndexException(message, cause = Some(e)) case _ => } } From 9c320782d0ae1d5f576d1e3fce24b4d067a1b547 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 11 Oct 2021 09:38:03 -0700 Subject: [PATCH 2/3] address comments --- .../catalog/index/SupportsIndex.java | 4 +- .../connector/catalog/index/TableIndex.java | 5 --- .../apache/spark/sql/jdbc/MySQLDialect.scala | 41 ++++++++++++------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java index 02ae6811a2cc8..4181cf5f25118 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java @@ -42,7 +42,7 @@ public interface SupportsIndex extends Table { * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created - * @throws IndexAlreadyExistsException If the index already exists (optional) + * @throws IndexAlreadyExistsException If the index already exists. */ void createIndex(String indexName, String indexType, @@ -55,7 +55,7 @@ void createIndex(String indexName, * Drops the index with the given name. * * @param indexName the name of the index to be dropped. - * @throws NoSuchIndexException If the index does not exist (optional) + * @throws NoSuchIndexException If the index does not exist. */ void dropIndex(String indexName) throws NoSuchIndexException; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java index 33e96180e12b0..977ed8d6c7528 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/TableIndex.java @@ -65,11 +65,6 @@ public TableIndex( */ public NamedReference[] columns() { return columns; } - /** - * set columns using the passed in param columns - */ - public void columns_(NamedReference[] columns) { this.columns = columns; } - /** * @return the map of column and column property map. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index b462a58b0bf19..7e85b3bbb84e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -128,9 +128,17 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexProperties = indexProperties + " " + s"$k $v" } } - + val iType = if (indexType.isEmpty) { + "" + } else { + if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") && + !indexType.equalsIgnoreCase("HASH")) { + throw new UnsupportedOperationException(s"Index Type $indexType is not supported." + + " The supported Index Types are: BTREE and HASH") + } + s"USING $indexType" + } // columnsProperties doesn't apply to MySQL so it is ignored - val iType = if (indexType.isEmpty) "" else s"USING $indexType" s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" + s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties" } @@ -180,7 +188,10 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { val indexComment = rs.getString("Index_comment") if (indexMap.contains(indexName)) { val index = indexMap.get(indexName).get - index.columns_(index.columns() :+ FieldReference(colName)) + val newIndex = new TableIndex(indexName, indexType, + index.columns() :+ FieldReference(colName), + index.columnProperties, index.properties) + indexMap += (indexName -> newIndex) } else { // The only property we are building here is `COMMENT` because it's the only one // we can get from `SHOW INDEXES`. @@ -199,18 +210,18 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { } override def classifyException(message: String, e: Throwable): AnalysisException = { - if (e.isInstanceOf[SQLException]) { - // Error codes are from - // https://mariadb.com/kb/en/mariadb-error-codes/#shared-mariadbmysql-error-codes - e.asInstanceOf[SQLException].getErrorCode match { - // ER_DUP_KEYNAME - case 1061 => - throw new IndexAlreadyExistsException(message, cause = Some(e)) - case 1091 => - throw new NoSuchIndexException(message, cause = Some(e)) - case _ => - } + e match { + case sqlException: SQLException => + sqlException.getErrorCode match { + // ER_DUP_KEYNAME + case 1061 => + throw new IndexAlreadyExistsException(message, cause = Some(e)) + case 1091 => + throw new NoSuchIndexException(message, cause = Some(e)) + case _ => super.classifyException(message, e) + } + case unsupported: UnsupportedOperationException => throw unsupported + case _ => super.classifyException(message, e) } - super.classifyException(message, e) } } From 6a7b700dd5426248884c8f5765b4a2d00cf411e6 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 11 Oct 2021 23:52:33 -0700 Subject: [PATCH 3/3] add a test for invalid index type --- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 0e6551d670c67..f3e3b34356c8b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -203,6 +203,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(jdbcTable.indexExists("i2") == false) val properties = new util.Properties(); + val indexType = "DUMMY" + var m = intercept[UnsupportedOperationException] { + jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + }.getMessage + assert(m.contains(s"Index Type $indexType is not supported." + + s" The supported Index Types are: BTREE and HASH")) + jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), Array.empty[util.Map[NamedReference, util.Properties]], properties) @@ -213,7 +221,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(jdbcTable.indexExists("i1") == true) assert(jdbcTable.indexExists("i2") == true) - var m = intercept[IndexAlreadyExistsException] { + m = intercept[IndexAlreadyExistsException] { jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), Array.empty[util.Map[NamedReference, util.Properties]], properties) }.getMessage