diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index 9cfe23f86cc65..64ee645ba0f14 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} -import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.internal.SQLConf trait HoodieCatalystPlansUtils { @@ -79,6 +78,25 @@ trait HoodieCatalystPlansUtils { */ def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)] + /** + * Decomposes [[MatchCreateIndex]] into its arguments with accommodation. + */ + def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] + + /** + * Decomposes [[MatchDropIndex]] into its arguments with accommodation. + */ + def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] + + /** + * Decomposes [[MatchShowIndexes]] into its arguments with accommodation. + */ + def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] + + /** + * Decomposes [[MatchRefreshIndex]] into its arguments with accommodation. + */ + def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] /** * Spark requires file formats to append the partition path fields to the end of the schema. diff --git a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 index 8a3106f7a56fe..a98ad9791400b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 +++ b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 @@ -48,13 +48,6 @@ statement : compactionStatement #compactionCommand | CALL multipartIdentifier callArgumentList? #call - | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? - tableIdentifier (USING indexType=identifier)? - LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN - (OPTIONS indexOptions=propertyList)? #createIndex - | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex - | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes - | REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex | .*? #passThrough ; @@ -110,14 +103,6 @@ | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral ; - multipartIdentifierPropertyList - : multipartIdentifierProperty (COMMA multipartIdentifierProperty)* - ; - - multipartIdentifierProperty - : multipartIdentifier (OPTIONS options=propertyList)? - ; - multipartIdentifier : parts+=identifier ('.' parts+=identifier)* ; @@ -135,51 +120,13 @@ nonReserved : CALL | COMPACTION - | CREATE - | DROP - | EXISTS - | FROM - | IN - | INDEX - | INDEXES - | IF | LIMIT - | NOT | ON - | OPTIONS - | REFRESH | RUN | SCHEDULE | SHOW - | TABLE - | USING ; - propertyList - : LEFT_PAREN property (COMMA property)* RIGHT_PAREN - ; - - property - : key=propertyKey (EQ? value=propertyValue)? - ; - - propertyKey - : identifier (DOT identifier)* - | STRING - ; - - propertyValue - : INTEGER_VALUE - | DECIMAL_VALUE - | booleanValue - | STRING - ; - - LEFT_PAREN: '('; - RIGHT_PAREN: ')'; - COMMA: ','; - DOT: '.'; - ALL: 'ALL'; AT: 'AT'; CALL: 'CALL'; @@ -195,21 +142,6 @@ FALSE: 'FALSE'; INTERVAL: 'INTERVAL'; TO: 'TO'; - CREATE: 'CREATE'; - INDEX: 'INDEX'; - INDEXES: 'INDEXES'; - IF: 'IF'; - NOT: 'NOT'; - EXISTS: 'EXISTS'; - TABLE: 'TABLE'; - USING: 'USING'; - OPTIONS: 'OPTIONS'; - DROP: 'DROP'; - FROM: 'FROM'; - IN: 'IN'; - REFRESH: 'REFRESH'; - - EQ: '=' | '=='; PLUS: '+'; MINUS: '-'; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index eb7f00ef26b2d..051c38d956775 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields} -import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateTableLike, MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable, sparkAdapter} +import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateIndex, MatchCreateTableLike, MatchDropIndex, MatchInsertIntoStatement, MatchMergeIntoTable, MatchRefreshIndex, MatchShowIndexes, ResolvesToHudiTable, sparkAdapter} import org.apache.spark.sql.hudi.command._ import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure, ProcedureArgs} import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -354,6 +354,26 @@ object HoodieAnalysis extends SparkAdapterSupport { sparkAdapter.getCatalystPlanUtils.unapplyCreateTableLikeCommand(plan) } + private[sql] object MatchCreateIndex { + def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = + sparkAdapter.getCatalystPlanUtils.unapplyCreateIndex(plan) + } + + private[sql] object MatchDropIndex { + def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] = + sparkAdapter.getCatalystPlanUtils.unapplyDropIndex(plan) + } + + private[sql] object MatchShowIndexes { + def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = + sparkAdapter.getCatalystPlanUtils.unapplyShowIndexes(plan) + } + + private[sql] object MatchRefreshIndex { + def unapply(plan: LogicalPlan): Option[(LogicalPlan, String)] = + sparkAdapter.getCatalystPlanUtils.unapplyRefreshIndex(plan) + } + private[sql] def failAnalysis(msg: String): Nothing = { throw new AnalysisException(msg) } @@ -442,21 +462,20 @@ case class ResolveImplementations() extends Rule[LogicalPlan] { } // Convert to CreateIndexCommand - case ci @ CreateIndex(plan @ ResolvesToHudiTable(table), indexName, indexType, ignoreIfExists, columns, options, output) => - // TODO need to resolve columns - CreateIndexCommand(table, indexName, indexType, ignoreIfExists, columns, options, output) + case ci @ MatchCreateIndex(plan @ ResolvesToHudiTable(table), indexName, indexType, ignoreIfExists, columns, options) if ci.resolved => + CreateIndexCommand(table, indexName, indexType, ignoreIfExists, columns, options) // Convert to DropIndexCommand - case di @ DropIndex(plan @ ResolvesToHudiTable(table), indexName, ignoreIfNotExists, output) if di.resolved => - DropIndexCommand(table, indexName, ignoreIfNotExists, output) + case di @ MatchDropIndex(plan @ ResolvesToHudiTable(table), indexName, ignoreIfNotExists) if di.resolved => + DropIndexCommand(table, indexName, ignoreIfNotExists) // Convert to ShowIndexesCommand - case si @ ShowIndexes(plan @ ResolvesToHudiTable(table), output) if si.resolved => + case si @ MatchShowIndexes(plan @ ResolvesToHudiTable(table), output) if si.resolved => ShowIndexesCommand(table, output) // Covert to RefreshCommand - case ri @ RefreshIndex(plan @ ResolvesToHudiTable(table), indexName, output) if ri.resolved => - RefreshIndexCommand(table, indexName, output) + case ri @ MatchRefreshIndex(plan @ ResolvesToHudiTable(table), indexName) if ri.resolved => + RefreshIndexCommand(table, indexName) case _ => plan } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala index 8ac0831a22f5a..da7f99fa41f83 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala @@ -19,10 +19,9 @@ package org.apache.spark.sql.hudi.command -import com.fasterxml.jackson.annotation.{JsonAutoDetect, PropertyAccessor} -import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.secondary.index.SecondaryIndexManager import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -32,23 +31,21 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation import org.apache.spark.sql.{Row, SparkSession} import java.util - import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter} case class CreateIndexCommand(table: CatalogTable, indexName: String, indexType: String, ignoreIfExists: Boolean, - columns: Seq[(Attribute, Map[String, String])], - options: Map[String, String], - override val output: Seq[Attribute]) extends IndexBaseCommand { + columns: Seq[(Seq[String], Map[String, String])], + options: Map[String, String]) extends IndexBaseCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val tableId = table.identifier val metaClient = createHoodieTableMetaClient(tableId, sparkSession) val columnsMap: java.util.LinkedHashMap[String, java.util.Map[String, String]] = new util.LinkedHashMap[String, java.util.Map[String, String]]() - columns.map(c => columnsMap.put(c._1.name, c._2.asJava)) + columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava)) SecondaryIndexManager.getInstance().create( metaClient, indexName, indexType, ignoreIfExists, columnsMap, options.asJava) @@ -65,8 +62,7 @@ case class CreateIndexCommand(table: CatalogTable, case class DropIndexCommand(table: CatalogTable, indexName: String, - ignoreIfNotExists: Boolean, - override val output: Seq[Attribute]) extends IndexBaseCommand { + ignoreIfNotExists: Boolean) extends IndexBaseCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val tableId = table.identifier @@ -90,7 +86,7 @@ case class ShowIndexesCommand(table: CatalogTable, val metaClient = createHoodieTableMetaClient(table.identifier, sparkSession) val secondaryIndexes = SecondaryIndexManager.getInstance().show(metaClient) - val mapper = getObjectMapper + val mapper = JsonUtils.getObjectMapper toScalaOption(secondaryIndexes).map(x => x.asScala.map(i => { val colOptions = @@ -100,18 +96,10 @@ case class ShowIndexesCommand(table: CatalogTable, i.getIndexType.name().toLowerCase, colOptions, options) }).toSeq).getOrElse(Seq.empty[Row]) } - - protected def getObjectMapper: ObjectMapper = { - val mapper = new ObjectMapper - mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) - mapper - } } case class RefreshIndexCommand(table: CatalogTable, - indexName: String, - override val output: Seq[Attribute]) extends IndexBaseCommand { + indexName: String) extends IndexBaseCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val metaClient = createHoodieTableMetaClient(table.identifier, sparkSession) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala index 4005ef97e4561..25cf2641a3cfb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala @@ -25,7 +25,7 @@ import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser._ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.parser.{ParserInterface, ParserUtils} import org.apache.spark.sql.catalyst.plans.logical._ @@ -149,144 +149,4 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface private def typedVisit[T](ctx: ParseTree): T = { ctx.accept(this).asInstanceOf[T] } - - /** - * Create an index, returning a [[CreateIndex]] logical plan. - * For example: - * {{{ - * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list) - * [OPTIONS indexPropertyList] - * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ] - * indexPropertyList: index_property_name [= index_property_value] [ , . . . ] - * }}} - */ - override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) { - val (indexName, indexType) = if (ctx.identifier.size() == 1) { - (ctx.identifier(0).getText, "") - } else { - (ctx.identifier(0).getText, ctx.identifier(1).getText) - } - - val columns = ctx.columns.multipartIdentifierProperty.asScala - .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq - val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala - .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq - val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty) - - CreateIndex( - visitTableIdentifier(ctx.tableIdentifier()), - indexName, - indexType, - ctx.EXISTS != null, - columns.map(UnresolvedAttribute(_)).zip(columnsProperties), - options) - } - - /** - * Drop an index, returning a [[DropIndex]] logical plan. - * For example: - * {{{ - * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name - * }}} - */ - override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) { - val indexName = ctx.identifier.getText - DropIndex( - visitTableIdentifier(ctx.tableIdentifier()), - indexName, - ctx.EXISTS != null) - } - - /** - * Show indexes, returning a [[ShowIndexes]] logical plan. - * For example: - * {{{ - * SHOW INDEXES (FROM | IN) [TABLE] table_name - * }}} - */ - override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = withOrigin(ctx) { - ShowIndexes(visitTableIdentifier(ctx.tableIdentifier())) - } - - /** - * Refresh index, returning a [[RefreshIndex]] logical plan - * For example: - * {{{ - * REFRESH INDEX index_name ON [TABLE] table_name - * }}} - */ - override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = withOrigin(ctx) { - RefreshIndex(visitTableIdentifier(ctx.tableIdentifier()), ctx.identifier.getText) - } - - /** - * Convert a property list into a key-value map. - * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. - */ - override def visitPropertyList( - ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { - val properties = ctx.property.asScala.map { property => - val key = visitPropertyKey(property.key) - val value = visitPropertyValue(property.value) - key -> value - } - // Check for duplicate property names. - checkDuplicateKeys(properties.toSeq, ctx) - properties.toMap - } - - /** - * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified. - */ - def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { - val props = visitPropertyList(ctx) - val badKeys = props.collect { case (key, null) => key } - if (badKeys.nonEmpty) { - operationNotAllowed( - s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) - } - props - } - - /** - * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified. - */ - def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = { - val props = visitPropertyList(ctx) - val badKeys = props.filter { case (_, v) => v != null }.keys - if (badKeys.nonEmpty) { - operationNotAllowed( - s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) - } - props.keys.toSeq - } - - /** - * A property key can either be String or a collection of dot separated elements. This - * function extracts the property key based on whether its a string literal or a property - * identifier. - */ - override def visitPropertyKey(key: PropertyKeyContext): String = { - if (key.STRING != null) { - string(key.STRING) - } else { - key.getText - } - } - - /** - * A property value can be String, Integer, Boolean or Decimal. This function extracts - * the property value based on whether its a string, integer, boolean or decimal literal. - */ - override def visitPropertyValue(value: PropertyValueContext): String = { - if (value == null) { - null - } else if (value.STRING != null) { - string(value.STRING) - } else if (value.booleanValue != null) { - value.getText.toLowerCase(Locale.ROOT) - } else { - value.getText - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala index cb04c9d8d8b13..43b4063260faf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command.index +import org.apache.hudi.HoodieSparkUtils import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.parser.ParserInterface @@ -28,59 +29,61 @@ import org.apache.spark.sql.hudi.command.{CreateIndexCommand, DropIndexCommand, class TestIndexSyntax extends HoodieSparkSqlTestBase { test("Test Create/Drop/Show/Refresh Index") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val databaseName = "default" - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' + if (HoodieSparkUtils.gteqSpark3_2) { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val databaseName = "default" + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - val sqlParser: ParserInterface = spark.sessionState.sqlParser - val analyzer: Analyzer = spark.sessionState.analyzer + val sqlParser: ParserInterface = spark.sessionState.sqlParser + val analyzer: Analyzer = spark.sessionState.analyzer - var logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName") - var resolvedLogicalPlan = analyzer.execute(logicalPlan) - assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table, databaseName, tableName) + var logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName") + var resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table, databaseName, tableName) - logicalPlan = sqlParser.parsePlan(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)") - resolvedLogicalPlan = analyzer.execute(logicalPlan) - assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table, databaseName, tableName) - assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) - assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) - assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists) - assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options) + logicalPlan = sqlParser.parsePlan(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)") + resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table, databaseName, tableName) + assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) + assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) + assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists) + assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options) - logicalPlan = sqlParser.parsePlan(s"create index if not exists idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)") - resolvedLogicalPlan = analyzer.execute(logicalPlan) - assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table, databaseName, tableName) - assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) - assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) - assertResult(Map("order" -> "desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2) - assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options) + logicalPlan = sqlParser.parsePlan(s"create index if not exists idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)") + resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table, databaseName, tableName) + assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) + assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) + assertResult(Map("order" -> "desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2) + assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options) - logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on $tableName") - resolvedLogicalPlan = analyzer.execute(logicalPlan) - assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].table, databaseName, tableName) - assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName) - assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists) + logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on $tableName") + resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].table, databaseName, tableName) + assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName) + assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists) + } } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala index eae89099a621c..816fecc38f518 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala @@ -19,73 +19,77 @@ package org.apache.spark.sql.hudi.command.index +import org.apache.hudi.HoodieSparkUtils import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase class TestSecondaryIndex extends HoodieSparkSqlTestBase { + test("Test Create/Show/Drop Secondary Index") { - withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts' - | ) - | partitioned by(ts) - | location '$basePath' + if (HoodieSparkUtils.gteqSpark3_2) { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - checkAnswer(s"show indexes from default.$tableName")() + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + checkAnswer(s"show indexes from default.$tableName")() - checkAnswer(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")() - checkAnswer(s"create index idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")() + checkAnswer(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")() + checkAnswer(s"create index idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")() - // Create an index with multiple columns - checkException(s"create index idx_id_ts on $tableName using lucene (id, ts)")("Lucene index only support single column") + // Create an index with multiple columns + checkException(s"create index idx_id_ts on $tableName using lucene (id, ts)")("Lucene index only support single column") - // Create an index with the occupied name - checkException(s"create index idx_price on $tableName using lucene (price)")( - "Secondary index already exists: idx_price" - ) + // Create an index with the occupied name + checkException(s"create index idx_price on $tableName using lucene (price)")( + "Secondary index already exists: idx_price" + ) - // Create indexes repeatedly on columns(index name is different, but the index type and involved column is same) - checkException(s"create index idx_price_1 on $tableName using lucene (price)")( - "Secondary index already exists: idx_price_1" - ) + // Create indexes repeatedly on columns(index name is different, but the index type and involved column is same) + checkException(s"create index idx_price_1 on $tableName using lucene (price)")( + "Secondary index already exists: idx_price_1" + ) - spark.sql(s"show indexes from $tableName").show() - checkAnswer(s"show indexes from $tableName")( - Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"), - Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}") - ) + spark.sql(s"show indexes from $tableName").show() + checkAnswer(s"show indexes from $tableName")( + Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"), + Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}") + ) - checkAnswer(s"drop index idx_name on $tableName")() - checkException(s"drop index idx_name on $tableName")("Secondary index not exists: idx_name") + checkAnswer(s"drop index idx_name on $tableName")() + checkException(s"drop index idx_name on $tableName")("Secondary index not exists: idx_name") - spark.sql(s"show indexes from $tableName").show() - checkAnswer(s"show indexes from $tableName")( - Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}") - ) + spark.sql(s"show indexes from $tableName").show() + checkAnswer(s"show indexes from $tableName")( + Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}") + ) - checkAnswer(s"drop index idx_price on $tableName")() - checkAnswer(s"show indexes from $tableName")() + checkAnswer(s"drop index idx_price on $tableName")() + checkAnswer(s"show indexes from $tableName")() - checkException(s"drop index idx_price on $tableName")("Secondary index not exists: idx_price") + checkException(s"drop index idx_price on $tableName")("Secondary index not exists: idx_price") - checkException(s"create index idx_price_1 on $tableName using lucene (field_not_exist)")( - "Field not exists: field_not_exist" - ) + checkExceptionContain(s"create index idx_price_1 on $tableName using lucene (field_not_exist)")( + "Missing field field_not_exist" + ) + } } } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index 6fb1719cedeb6..069a460807351 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -107,4 +107,17 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { case _ => plan } } + + /** + * Commands of managing indexes are not supported for Spark2. + */ + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { + None + } + + override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] = None + + override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None + + override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None } diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala index e9757c821d988..6cd5da79b861a 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.ResolvedTable -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} @@ -71,4 +71,15 @@ object HoodieSpark30CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { Some((c.tableName, true, false, c.cmd)) } } + + /** + * Commands of managing indexes are not supported for Spark3.0 + */ + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = None + + override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] = None + + override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None + + override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala index df94529ce12b4..8a56d0fba25d9 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.ResolvedTable -import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} @@ -71,4 +71,15 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { Some((c.tableName, true, false, c.cmd)) } } + + /** + * Managing Indexes commands are not supported for Spark3.1 + */ + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = None + + override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] = None + + override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None + + override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4 b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4 index d4e1e48351ccc..d7f87b4e5aa59 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4 +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/imports/SqlBase.g4 @@ -755,6 +755,34 @@ functionIdentifier : (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier ; +multipartIdentifierPropertyList + : multipartIdentifierProperty (COMMA multipartIdentifierProperty)* + ; + +multipartIdentifierProperty + : multipartIdentifier (OPTIONS options=propertyList)? + ; + +propertyList + : LEFT_PAREN property (COMMA property)* RIGHT_PAREN + ; + +property + : key=propertyKey (EQ? value=propertyValue)? + ; + +propertyKey + : identifier (DOT identifier)* + | STRING + ; + +propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | STRING + ; + namedExpression : expression (AS? (name=errorCapturingIdentifier | identifierList))? ; @@ -1797,6 +1825,10 @@ TIMESTAMP: 'TIMESTAMP'; //============================ // End of the keywords list //============================ +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +COMMA: ','; +DOT: '.'; EQ : '=' | '=='; NSEQ: '<=>'; diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 index 585a7f1c2fb00..ddbecfefc760d 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 @@ -29,5 +29,12 @@ statement | createTableHeader ('(' colTypeList ')')? tableProvider? createTableClauses (AS? query)? #createTable + | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? + tableIdentifier (USING indexType=identifier)? + LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN + (OPTIONS indexOptions=propertyList)? #createIndex + | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex + | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes + | REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex | .*? #passThrough ; diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala index d4624625d7537..1bb4638fcdbcb 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} +import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex, LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.RepairTableCommand import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat @@ -91,4 +91,40 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { override def failAnalysisForMIT(a: Attribute, cols: String): Unit = { a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns [$cols]") } + + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { + plan match { + case ci @ CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties) => + Some((table, indexName, indexType, ignoreIfExists, columns.map(col => (col._1.name, col._2)), properties)) + case _ => + None + } + } + + override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] = { + plan match { + case ci @ DropIndex(table, indexName, ignoreIfNotExists) => + Some((table, indexName, ignoreIfNotExists)) + case _ => + None + } + } + + override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = { + plan match { + case ci @ ShowIndexes(table, output) => + Some((table, output)) + case _ => + None + } + } + + override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = { + plan match { + case ci @ RefreshIndex(table, indexName) => + Some((table, indexName)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala index 196a77cb13af5..f750ddaf9c193 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlAstBuilder.scala @@ -3317,6 +3317,145 @@ class HoodieSpark3_2ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterfa position = Option(ctx.colPosition).map(pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos)))) } + + /** + * Create an index, returning a [[CreateIndex]] logical plan. + * For example: + * {{{ + * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list) + * [OPTIONS indexPropertyList] + * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ] + * indexPropertyList: index_property_name [= index_property_value] [ , . . . ] + * }}} + */ + override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) { + val (indexName, indexType) = if (ctx.identifier.size() == 1) { + (ctx.identifier(0).getText, "") + } else { + (ctx.identifier(0).getText, ctx.identifier(1).getText) + } + + val columns = ctx.columns.multipartIdentifierProperty.asScala + .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq + val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala + .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq + val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty) + + CreateIndex( + UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), + indexName, + indexType, + ctx.EXISTS != null, + columns.map(UnresolvedFieldName).zip(columnsProperties), + options) + } + + /** + * Drop an index, returning a [[DropIndex]] logical plan. + * For example: + * {{{ + * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name + * }}} + */ + override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) { + val indexName = ctx.identifier.getText + DropIndex( + UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), + indexName, + ctx.EXISTS != null) + } + + /** + * Show indexes, returning a [[ShowIndexes]] logical plan. + * For example: + * {{{ + * SHOW INDEXES (FROM | IN) [TABLE] table_name + * }}} + */ + override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = withOrigin(ctx) { + ShowIndexes(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier()))) + } + + /** + * Refresh index, returning a [[RefreshIndex]] logical plan + * For example: + * {{{ + * REFRESH INDEX index_name ON [TABLE] table_name + * }}} + */ + override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = withOrigin(ctx) { + RefreshIndex(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), ctx.identifier.getText) + } + + /** + * Convert a property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitPropertyList(ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties.toSeq, ctx) + properties.toMap + } + + /** + * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified. + */ + def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props + } + + /** + * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified. + */ + def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = { + val props = visitPropertyList(ctx) + val badKeys = props.filter { case (_, v) => v != null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props.keys.toSeq + } + + /** + * A property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a property + * identifier. + */ + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * A property value can be String, Integer, Boolean or Decimal. This function extracts + * the property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } } /** diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala index 1f8d02340d909..a8a684dde7598 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_2ExtendedSqlParser.scala @@ -121,7 +121,11 @@ class HoodieSpark3_2ExtendedSqlParser(session: SparkSession, delegate: ParserInt normalized.contains("system_time as of") || normalized.contains("timestamp as of") || normalized.contains("system_version as of") || - normalized.contains("version as of") + normalized.contains("version as of") || + normalized.contains("create index") || + normalized.contains("drop index") || + normalized.contains("show indexes") || + normalized.contains("refresh index") } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala similarity index 70% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala rename to hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala index 1cc8c99728448..2524a838d5f87 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala @@ -19,20 +19,19 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.analysis.FieldName import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.types.StringType /** * The logical plan of the CREATE INDEX command. */ -case class CreateIndex( - table: LogicalPlan, - indexName: String, - indexType: String, - ignoreIfExists: Boolean, - columns: Seq[(Attribute, Map[String, String])], - options: Map[String, String], - override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends Command { +case class CreateIndex(table: LogicalPlan, + indexName: String, + indexType: String, + ignoreIfExists: Boolean, + columns: Seq[(FieldName, Map[String, String])], + properties: Map[String, String]) extends Command { override def children: Seq[LogicalPlan] = Seq(table) @@ -43,18 +42,12 @@ case class CreateIndex( } } -object CreateIndex { - def getOutputAttrs: Seq[Attribute] = Seq.empty -} - /** * The logical plan of the DROP INDEX command. */ -case class DropIndex( - table: LogicalPlan, - indexName: String, - ignoreIfNotExists: Boolean, - override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends Command { +case class DropIndex(table: LogicalPlan, + indexName: String, + ignoreIfNotExists: Boolean) extends Command { override def children: Seq[LogicalPlan] = Seq(table) @@ -63,16 +56,11 @@ case class DropIndex( } } -object DropIndex { - def getOutputAttrs: Seq[Attribute] = Seq.empty -} - /** * The logical plan of the SHOW INDEXES command. */ -case class ShowIndexes( - table: LogicalPlan, - override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends Command { +case class ShowIndexes(table: LogicalPlan, + override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends Command { override def children: Seq[LogicalPlan] = Seq(table) @@ -94,10 +82,8 @@ object ShowIndexes { /** * The logical plan of the REFRESH INDEX command. */ -case class RefreshIndex( - table: LogicalPlan, - indexName: String, - override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends Command { +case class RefreshIndex(table: LogicalPlan, + indexName: String) extends Command { override def children: Seq[LogicalPlan] = Seq(table) @@ -105,7 +91,3 @@ case class RefreshIndex( copy(table = newChild.head) } } - -object RefreshIndex { - def getOutputAttrs: Seq[Attribute] = Seq.empty -} diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala index d64bc94301a12..1c08717455aae 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala @@ -21,12 +21,13 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.{DataSourceReadOptions, DefaultSource, SparkAdapterSupport} import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren -import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, EliminateSubqueryAliases, NamedRelation, UnresolvedAttribute, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NamedRelation, ResolvedFieldName, UnresolvedAttribute, UnresolvedFieldName, UnresolvedPartitionSpec} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, HoodieTableChanges, HoodieTableChangesOptionsParser} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper import org.apache.spark.sql.connector.catalog.{Table, V1Table} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -201,6 +202,12 @@ case class HoodieSpark32PlusResolveReferences(spark: SparkSession) extends Rule[ matchedActions = newMatchedActions, notMatchedActions = newNotMatchedActions) } + + case cmd: CreateIndex if cmd.table.resolved && cmd.columns.exists(_._1.isInstanceOf[UnresolvedFieldName]) => + cmd.copy(columns = cmd.columns.map { + case (u: UnresolvedFieldName, prop) => resolveFieldNames(cmd.table, u.name, u) -> prop + case other => other + }) } def resolveAssignments( @@ -244,6 +251,35 @@ case class HoodieSpark32PlusResolveReferences(spark: SparkSession) extends Rule[ } } + /** + * Returns the resolved field name if the field can be resolved, returns None if the column is + * not found. An error will be thrown in CheckAnalysis for columns that can't be resolved. + */ + private def resolveFieldNames(table: LogicalPlan, + fieldName: Seq[String], + context: Expression): ResolvedFieldName = { + resolveFieldNamesOpt(table, fieldName, context) + .getOrElse(throw missingFieldError(fieldName, table, context.origin)) + } + + private def resolveFieldNamesOpt(table: LogicalPlan, + fieldName: Seq[String], + context: Expression): Option[ResolvedFieldName] = { + table.schema.findNestedField( + fieldName, includeCollections = true, conf.resolver, context.origin + ).map { + case (path, field) => ResolvedFieldName(path, field) + } + } + + private def missingFieldError(fieldName: Seq[String], table: LogicalPlan, context: Origin): Throwable = { + throw new AnalysisException( + s"Missing field ${fieldName.mkString(".")} with schema:\n" + + table.schema.treeString, + context.line, + context.startPosition) + } + private[sql] object MatchMergeIntoTable { def unapply(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)] = sparkAdapter.getCatalystPlanUtils.unapplyMergeIntoTable(plan) diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4 b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4 index d4e1e48351ccc..d7f87b4e5aa59 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4 +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/imports/SqlBase.g4 @@ -755,6 +755,34 @@ functionIdentifier : (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier ; +multipartIdentifierPropertyList + : multipartIdentifierProperty (COMMA multipartIdentifierProperty)* + ; + +multipartIdentifierProperty + : multipartIdentifier (OPTIONS options=propertyList)? + ; + +propertyList + : LEFT_PAREN property (COMMA property)* RIGHT_PAREN + ; + +property + : key=propertyKey (EQ? value=propertyValue)? + ; + +propertyKey + : identifier (DOT identifier)* + | STRING + ; + +propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | STRING + ; + namedExpression : expression (AS? (name=errorCapturingIdentifier | identifierList))? ; @@ -1797,6 +1825,10 @@ TIMESTAMP: 'TIMESTAMP'; //============================ // End of the keywords list //============================ +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +COMMA: ','; +DOT: '.'; EQ : '=' | '=='; NSEQ: '<=>'; diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 index 585a7f1c2fb00..ddbecfefc760d 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 @@ -29,5 +29,12 @@ statement | createTableHeader ('(' colTypeList ')')? tableProvider? createTableClauses (AS? query)? #createTable + | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? + tableIdentifier (USING indexType=identifier)? + LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN + (OPTIONS indexOptions=propertyList)? #createIndex + | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex + | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes + | REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex | .*? #passThrough ; diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala index 16f2517d128ed..85bd4a2c5e59d 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.AnalysisErrorAt import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} +import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex, LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.RepairTableCommand import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat @@ -76,4 +76,40 @@ object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { override def failAnalysisForMIT(a: Attribute, cols: String): Unit = { a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns [$cols]") } + + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { + plan match { + case ci @ CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties) => + Some((table, indexName, indexType, ignoreIfExists, columns.map(col => (col._1.name, col._2)), properties)) + case _ => + None + } + } + + override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] = { + plan match { + case ci @ DropIndex(table, indexName, ignoreIfNotExists) => + Some((table, indexName, ignoreIfNotExists)) + case _ => + None + } + } + + override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = { + plan match { + case ci @ ShowIndexes(table, output) => + Some((table, output)) + case _ => + None + } + } + + override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = { + plan match { + case ci @ RefreshIndex(table, indexName) => + Some((table, indexName)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala index 694a7133e4bfd..4e5e32e76fea3 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala @@ -3327,6 +3327,145 @@ class HoodieSpark3_3ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterfa position = Option(ctx.colPosition).map(pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos)))) } + + /** + * Create an index, returning a [[CreateIndex]] logical plan. + * For example: + * {{{ + * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list) + * [OPTIONS indexPropertyList] + * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ] + * indexPropertyList: index_property_name [= index_property_value] [ , . . . ] + * }}} + */ + override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) { + val (indexName, indexType) = if (ctx.identifier.size() == 1) { + (ctx.identifier(0).getText, "") + } else { + (ctx.identifier(0).getText, ctx.identifier(1).getText) + } + + val columns = ctx.columns.multipartIdentifierProperty.asScala + .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq + val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala + .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq + val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty) + + CreateIndex( + UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), + indexName, + indexType, + ctx.EXISTS != null, + columns.map(UnresolvedFieldName).zip(columnsProperties), + options) + } + + /** + * Drop an index, returning a [[DropIndex]] logical plan. + * For example: + * {{{ + * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name + * }}} + */ + override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) { + val indexName = ctx.identifier.getText + DropIndex( + UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), + indexName, + ctx.EXISTS != null) + } + + /** + * Show indexes, returning a [[ShowIndexes]] logical plan. + * For example: + * {{{ + * SHOW INDEXES (FROM | IN) [TABLE] table_name + * }}} + */ + override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = withOrigin(ctx) { + ShowIndexes(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier()))) + } + + /** + * Refresh index, returning a [[RefreshIndex]] logical plan + * For example: + * {{{ + * REFRESH INDEX index_name ON [TABLE] table_name + * }}} + */ + override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = withOrigin(ctx) { + RefreshIndex(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), ctx.identifier.getText) + } + + /** + * Convert a property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitPropertyList(ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties.toSeq, ctx) + properties.toMap + } + + /** + * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified. + */ + def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props + } + + /** + * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified. + */ + def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = { + val props = visitPropertyList(ctx) + val badKeys = props.filter { case (_, v) => v != null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props.keys.toSeq + } + + /** + * A property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a property + * identifier. + */ + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * A property value can be String, Integer, Boolean or Decimal. This function extracts + * the property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } } /** diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala index 4c59f56828f2d..24b665c8a37fe 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_3ExtendedSqlParser.scala @@ -123,7 +123,11 @@ class HoodieSpark3_3ExtendedSqlParser(session: SparkSession, delegate: ParserInt normalized.contains("system_time as of") || normalized.contains("timestamp as of") || normalized.contains("system_version as of") || - normalized.contains("version as of") + normalized.contains("version as of") || + normalized.contains("create index") || + normalized.contains("drop index") || + normalized.contains("show indexes") || + normalized.contains("refresh index") } } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4 b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4 index d4e1e48351ccc..d7f87b4e5aa59 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4 +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/imports/SqlBase.g4 @@ -755,6 +755,34 @@ functionIdentifier : (db=errorCapturingIdentifier '.')? function=errorCapturingIdentifier ; +multipartIdentifierPropertyList + : multipartIdentifierProperty (COMMA multipartIdentifierProperty)* + ; + +multipartIdentifierProperty + : multipartIdentifier (OPTIONS options=propertyList)? + ; + +propertyList + : LEFT_PAREN property (COMMA property)* RIGHT_PAREN + ; + +property + : key=propertyKey (EQ? value=propertyValue)? + ; + +propertyKey + : identifier (DOT identifier)* + | STRING + ; + +propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | STRING + ; + namedExpression : expression (AS? (name=errorCapturingIdentifier | identifierList))? ; @@ -1797,6 +1825,10 @@ TIMESTAMP: 'TIMESTAMP'; //============================ // End of the keywords list //============================ +LEFT_PAREN: '('; +RIGHT_PAREN: ')'; +COMMA: ','; +DOT: '.'; EQ : '=' | '=='; NSEQ: '<=>'; diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 index 585a7f1c2fb00..ddbecfefc760d 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlBase.g4 @@ -29,5 +29,12 @@ statement | createTableHeader ('(' colTypeList ')')? tableProvider? createTableClauses (AS? query)? #createTable + | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? + tableIdentifier (USING indexType=identifier)? + LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN + (OPTIONS indexOptions=propertyList)? #createIndex + | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex + | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes + | REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex | .*? #passThrough ; diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala index 947a73285f5b9..41b629aac8e84 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} +import org.apache.spark.sql.catalyst.plans.logical.{CreateIndex, DropIndex, LogicalPlan, MergeIntoTable, Project, RefreshIndex, ShowIndexes} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.RepairTableCommand import org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat @@ -79,4 +79,40 @@ object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { "sqlExpr" -> a.sql, "cols" -> cols)) } + + override def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] = { + plan match { + case ci@CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties) => + Some((table, indexName, indexType, ignoreIfExists, columns.map(col => (col._1.name, col._2)), properties)) + case _ => + None + } + } + + override def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] = { + plan match { + case ci@DropIndex(table, indexName, ignoreIfNotExists) => + Some((table, indexName, ignoreIfNotExists)) + case _ => + None + } + } + + override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = { + plan match { + case ci@ShowIndexes(table, output) => + Some((table, output)) + case _ => + None + } + } + + override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = { + plan match { + case ci@RefreshIndex(table, indexName) => + Some((table, indexName)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala index fe07aeb74e8b2..670d56ea37a42 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala @@ -3331,6 +3331,145 @@ class HoodieSpark3_4ExtendedSqlAstBuilder(conf: SQLConf, delegate: ParserInterfa UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))), default = Option(null)) } + + /** + * Create an index, returning a [[CreateIndex]] logical plan. + * For example: + * {{{ + * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list) + * [OPTIONS indexPropertyList] + * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ] + * indexPropertyList: index_property_name [= index_property_value] [ , . . . ] + * }}} + */ + override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) { + val (indexName, indexType) = if (ctx.identifier.size() == 1) { + (ctx.identifier(0).getText, "") + } else { + (ctx.identifier(0).getText, ctx.identifier(1).getText) + } + + val columns = ctx.columns.multipartIdentifierProperty.asScala + .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq + val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala + .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq + val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty) + + CreateIndex( + UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), + indexName, + indexType, + ctx.EXISTS != null, + columns.map(UnresolvedFieldName).zip(columnsProperties), + options) + } + + /** + * Drop an index, returning a [[DropIndex]] logical plan. + * For example: + * {{{ + * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name + * }}} + */ + override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) { + val indexName = ctx.identifier.getText + DropIndex( + UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), + indexName, + ctx.EXISTS != null) + } + + /** + * Show indexes, returning a [[ShowIndexes]] logical plan. + * For example: + * {{{ + * SHOW INDEXES (FROM | IN) [TABLE] table_name + * }}} + */ + override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = withOrigin(ctx) { + ShowIndexes(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier()))) + } + + /** + * Refresh index, returning a [[RefreshIndex]] logical plan + * For example: + * {{{ + * REFRESH INDEX index_name ON [TABLE] table_name + * }}} + */ + override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = withOrigin(ctx) { + RefreshIndex(UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier())), ctx.identifier.getText) + } + + /** + * Convert a property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitPropertyList(ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties.toSeq, ctx) + properties.toMap + } + + /** + * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified. + */ + def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props + } + + /** + * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified. + */ + def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = { + val props = visitPropertyList(ctx) + val badKeys = props.filter { case (_, v) => v != null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props.keys.toSeq + } + + /** + * A property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a property + * identifier. + */ + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * A property value can be String, Integer, Boolean or Decimal. This function extracts + * the property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } } /** diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala index 8b554af6af343..dc7472ff83db5 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/parser/HoodieSpark3_4ExtendedSqlParser.scala @@ -123,7 +123,11 @@ class HoodieSpark3_4ExtendedSqlParser(session: SparkSession, delegate: ParserInt normalized.contains("system_time as of") || normalized.contains("timestamp as of") || normalized.contains("system_version as of") || - normalized.contains("version as of") + normalized.contains("version as of") || + normalized.contains("create index") || + normalized.contains("drop index") || + normalized.contains("show indexes") || + normalized.contains("refresh index") } }