From 9af485ca56bed380f97081bffe3710a9013da026 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 17 Sep 2024 21:32:40 +0530 Subject: [PATCH 1/3] [HUDI-7563] Add support to drop index using sql --- .../hudi/client/BaseHoodieWriteClient.java | 6 +- .../HoodieBackedTableMetadataWriter.java | 5 +- .../metadata/HoodieTableMetadataWriter.java | 2 +- ...Client.java => BaseHoodieIndexClient.java} | 12 ++- .../functional/TestHoodieBackedMetadata.java | 17 ++-- ...lient.java => HoodieSparkIndexClient.java} | 34 ++++++-- .../sql/hudi/command/IndexCommands.scala | 14 +++- .../functional/TestRecordLevelIndex.scala | 2 +- .../TestSecondaryIndexPruning.scala | 64 +++++++++++++++ .../command/index/TestFunctionalIndex.scala | 81 +++++++++++++++++++ .../apache/hudi/utilities/HoodieIndexer.java | 3 +- 11 files changed, 208 insertions(+), 32 deletions(-) rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/{BaseHoodieFunctionalIndexClient.java => BaseHoodieIndexClient.java} (88%) rename hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/{HoodieSparkFunctionalIndexClient.java => HoodieSparkIndexClient.java} (84%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 4f3a2fa02e272..c412696a0f460 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -980,9 +980,9 @@ public Option index(String indexInstantTime) { /** * Drops the index and removes the metadata partitions. * - * @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed + * @param metadataPartitions - list of metadata partitions which need to be dropped */ - public void dropIndex(List partitionTypes) { + public void dropIndex(List metadataPartitions) { HoodieTable table = createTable(config); String dropInstant = createNewInstantTime(); HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant); @@ -992,7 +992,7 @@ public void dropIndex(List partitionTypes) { Option metadataWriterOpt = table.getMetadataWriter(dropInstant); if (metadataWriterOpt.isPresent()) { try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) { - metadataWriter.dropMetadataPartitions(partitionTypes); + metadataWriter.dropMetadataPartitions(metadataPartitions); } catch (Exception e) { if (e instanceof HoodieException) { throw (HoodieException) e; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index e9657e7145daf..a29645be66cee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -895,9 +895,8 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata }, fileGroupFileIds.size()); } - public void dropMetadataPartitions(List metadataPartitions) throws IOException { - for (MetadataPartitionType partitionType : metadataPartitions) { - String partitionPath = partitionType.getPartitionPath(); + public void dropMetadataPartitions(List metadataPartitions) throws IOException { + for (String partitionPath : metadataPartitions) { // first update table config dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false); LOG.warn("Deleting Metadata Table partition: {}", partitionPath); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index e64407bb2c4c8..7578949a8e027 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -53,7 +53,7 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * @param metadataPartitions List of MDT partitions to drop * @throws IOException on failures */ - void dropMetadataPartitions(List metadataPartitions) throws IOException; + void dropMetadataPartitions(List metadataPartitions) throws IOException; /** * Update the metadata table due to a COMMIT operation. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java similarity index 88% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java index d9215c8d923ae..f9b809e32cd09 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieFunctionalIndexClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.StoragePath; import org.slf4j.Logger; @@ -28,11 +29,11 @@ import java.util.Map; -public abstract class BaseHoodieFunctionalIndexClient { +public abstract class BaseHoodieIndexClient { - private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieFunctionalIndexClient.class); + private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieIndexClient.class); - public BaseHoodieFunctionalIndexClient() { + public BaseHoodieIndexClient() { } /** @@ -61,4 +62,9 @@ public void register(HoodieTableMetaClient metaClient, String indexName, String * Create a functional index. */ public abstract void create(HoodieTableMetaClient metaClient, String indexName, String indexType, Map> columns, Map options); + + /** + * Drop an index. By default, ignore drop if index does not exist. + */ + public abstract void drop(HoodieTableMetaClient metaClient, String indexName, MetadataPartitionType metadataPartitionType, boolean ignoreIfNotExists); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index fd94073788026..5bd695adb0454 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -178,6 +178,7 @@ import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; import static org.apache.hudi.metadata.MetadataPartitionType.FILES; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -1193,9 +1194,9 @@ private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IO deleteMetaFile( metaClient.getStorage(), mdtBasePath, mdtInitCommit2, DELTA_COMMIT_EXTENSION); metaClient.getTableConfig().setMetadataPartitionState( - metaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), false); + metaClient, RECORD_INDEX.getPartitionPath(), false); metaClient.getTableConfig().setMetadataPartitionsInflight( - metaClient, MetadataPartitionType.RECORD_INDEX); + metaClient, RECORD_INDEX); timeline = metaClient.getActiveTimeline().reload(); mdtTimeline = mdtMetaClient.getActiveTimeline().reload(); assertEquals(commit, timeline.lastInstant().get().getTimestamp()); @@ -1734,7 +1735,7 @@ public void testFailedBootstrap() throws Exception { MetadataPartitionType.FILES.getPartitionPath()).size(), 1); assertEquals(HoodieTableMetadataUtil.getPartitionLatestFileSlices( metadataReader.getMetadataMetaClient(), Option.empty(), - MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size(), 5); + RECORD_INDEX.getPartitionPath()).size(), 5); } // remove the MDT partition from dataset to simulate failed bootstrap @@ -1776,7 +1777,7 @@ public void testFailedBootstrap() throws Exception { assertEquals(HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataReader.getMetadataMetaClient(), Option.empty(), MetadataPartitionType.FILES.getPartitionPath()).size(), 1); assertEquals(HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataReader.getMetadataMetaClient(), Option.empty(), - MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size(), 3); + RECORD_INDEX.getPartitionPath()).size(), 3); } } @@ -2813,11 +2814,11 @@ public void testRollbackPendingCommitWithRecordIndex(boolean performUpsert) thro // delete the metadata table partitions to check, whether rollback of pending commit succeeds and // metadata table partitions are rebootstrapped. - metadataWriter.dropMetadataPartitions(Arrays.asList(MetadataPartitionType.RECORD_INDEX, FILES)); + metadataWriter.dropMetadataPartitions(Arrays.asList(RECORD_INDEX.getPartitionPath(), FILES.getPartitionPath())); assertFalse(storage.exists(new StoragePath( getMetadataTableBasePath(basePath) + StoragePath.SEPARATOR + FILES.getPartitionPath()))); assertFalse(storage.exists(new StoragePath(getMetadataTableBasePath(basePath) - + StoragePath.SEPARATOR + MetadataPartitionType.RECORD_INDEX.getPartitionPath()))); + + StoragePath.SEPARATOR + RECORD_INDEX.getPartitionPath()))); metaClient = HoodieTableMetaClient.reload(metaClient); // Insert/upsert third batch of records @@ -2842,7 +2843,7 @@ public void testRollbackPendingCommitWithRecordIndex(boolean performUpsert) thro assertTrue(storage.exists(new StoragePath( getMetadataTableBasePath(basePath) + StoragePath.SEPARATOR + FILES.getPartitionPath()))); assertTrue(storage.exists(new StoragePath(getMetadataTableBasePath(basePath) - + StoragePath.SEPARATOR + MetadataPartitionType.RECORD_INDEX.getPartitionPath()))); + + StoragePath.SEPARATOR + RECORD_INDEX.getPartitionPath()))); } /** @@ -3451,7 +3452,7 @@ public void testDeleteWithRecordIndex() throws Exception { // Records got inserted and RI is initialized metaClient = HoodieTableMetaClient.reload(metaClient); - assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), "RI is disabled"); + assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX), "RI is disabled"); assertEquals(firstBatchOfrecords.size(), HoodieClientTestUtils.readCommit(writeConfig.getBasePath(), engineContext.getSqlContext(), metaClient.reloadActiveTimeline(), firstCommitTime).count()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java similarity index 84% rename from hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java index 5bafe49878996..f0fe11904480c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java @@ -32,7 +32,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieFunctionalIndexException; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.hudi.table.action.index.functional.BaseHoodieFunctionalIndexClient; +import org.apache.hudi.table.action.index.functional.BaseHoodieIndexClient; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; @@ -56,24 +56,24 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX; -public class HoodieSparkFunctionalIndexClient extends BaseHoodieFunctionalIndexClient { +public class HoodieSparkIndexClient extends BaseHoodieIndexClient { - private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkFunctionalIndexClient.class); + private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkIndexClient.class); - private static volatile HoodieSparkFunctionalIndexClient _instance; + private static volatile HoodieSparkIndexClient _instance; private final SparkSession sparkSession; - private HoodieSparkFunctionalIndexClient(SparkSession sparkSession) { + private HoodieSparkIndexClient(SparkSession sparkSession) { super(); this.sparkSession = sparkSession; } - public static HoodieSparkFunctionalIndexClient getInstance(SparkSession sparkSession) { + public static HoodieSparkIndexClient getInstance(SparkSession sparkSession) { if (_instance == null) { - synchronized (HoodieSparkFunctionalIndexClient.class) { + synchronized (HoodieSparkIndexClient.class) { if (_instance == null) { - _instance = new HoodieSparkFunctionalIndexClient(sparkSession); + _instance = new HoodieSparkIndexClient(sparkSession); } } } @@ -112,6 +112,24 @@ public void create(HoodieTableMetaClient metaClient, String indexName, String in } } + @Override + public void drop(HoodieTableMetaClient metaClient, String indexName, MetadataPartitionType metadataPartitionType, boolean ignoreIfNotExists) { + if (!indexExists(metaClient, indexName)) { + if (ignoreIfNotExists) { + return; + } else { + throw new HoodieFunctionalIndexException("Index does not exist: " + indexName); + } + } + + LOG.info("Dropping index {}", indexName); + HoodieIndexDefinition indexDefinition = metaClient.getIndexMetadata().get().getIndexDefinitions().get(indexName); + try (SparkRDDWriteClient writeClient = HoodieCLIUtils.createHoodieWriteClient( + sparkSession, metaClient.getBasePath().toString(), mapAsScalaImmutableMap(buildWriteConfig(metaClient, indexDefinition)), toScalaOption(Option.empty()))) { + writeClient.dropIndex(Collections.singletonList(indexName)); + } + } + private static Option doSchedule(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, String indexName) { List partitionTypes = Collections.singletonList(MetadataPartitionType.FUNCTIONAL_INDEX); checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time."); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala index b4b35595052ad..4371844fc3559 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.HoodieSparkFunctionalIndexClient +import org.apache.hudi.HoodieSparkIndexClient import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.index.secondary.SecondaryIndexManager - +import org.apache.hudi.metadata.MetadataPartitionType import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute @@ -52,7 +52,7 @@ case class CreateIndexCommand(table: CatalogTable, columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava)) if (options.contains("func") || indexType.equals("secondary_index")) { - HoodieSparkFunctionalIndexClient.getInstance(sparkSession).create( + HoodieSparkIndexClient.getInstance(sparkSession).create( metaClient, indexName, indexType, columnsMap, options.asJava) } else { SecondaryIndexManager.getInstance().create( @@ -76,7 +76,13 @@ case class DropIndexCommand(table: CatalogTable, override def run(sparkSession: SparkSession): Seq[Row] = { val tableId = table.identifier val metaClient = createHoodieTableMetaClient(tableId, sparkSession) - SecondaryIndexManager.getInstance().drop(metaClient, indexName, ignoreIfNotExists) + try { + val metadataPartitionType = MetadataPartitionType.fromPartitionPath(indexName) + HoodieSparkIndexClient.getInstance(sparkSession).drop(metaClient, indexName, metadataPartitionType, ignoreIfNotExists) + } catch { + case _: IllegalArgumentException => + SecondaryIndexManager.getInstance().drop(metaClient, indexName, ignoreIfNotExists) + } // Invalidate cached table for queries do not access related table // through {@code DefaultSource} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index f6e9abb158c3b..d89fa01334795 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -273,7 +273,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { saveMode = SaveMode.Overwrite) val writeConfig = getWriteConfig(hudiOpts) - metadataWriter(writeConfig).dropMetadataPartitions(Collections.singletonList(MetadataPartitionType.RECORD_INDEX)) + metadataWriter(writeConfig).dropMetadataPartitions(Collections.singletonList(MetadataPartitionType.RECORD_INDEX.getPartitionPath)) assertEquals(0, getFileGroupCountForRecordIndex(writeConfig)) metaClient.getTableConfig.getMetadataPartitionsInflight diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index d87e6c6cd34cf..d48249a589488 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -158,6 +158,70 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { } } + @ParameterizedTest + @MethodSource(Array("testSecondaryIndexPruningParameters")) + def testCreateAndDropSecondaryIndex(testCase: SecondaryIndexTestCase): Unit = { + if (HoodieSparkUtils.gteqSpark3_3) { + val tableType = testCase.tableType + val isPartitioned = testCase.isPartitioned + var hudiOpts = commonOpts + hudiOpts = hudiOpts + ( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType, + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") + val sqlTableType = if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor" + tableName += "test_secondary_index_create_drop" + (if (isPartitioned) "_partitioned" else "") + sqlTableType + val partitionedByClause = if (isPartitioned) "partitioned by(partition_key_col)" else "" + + spark.sql( + s""" + |create table $tableName ( + | ts bigint, + | record_key_col string, + | not_record_key_col string, + | partition_key_col string + |) using hudi + | options ( + | primaryKey ='record_key_col', + | type = '$sqlTableType', + | hoodie.metadata.enable = 'true', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'record_key_col', + | hoodie.enable.data.skipping = 'true' + | ) + | $partitionedByClause + | location '$basePath' + """.stripMargin) + // by setting small file limit to 0, each insert will create a new file + // need to generate more file for non-partitioned table to test data skipping + // as the partitioned table will have only one file per partition + spark.sql("set hoodie.parquet.small.file.limit=0") + spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')") + spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')") + spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')") + // create secondary index + spark.sql(s"create index idx_not_record_key_col on $tableName using secondary_index(not_record_key_col)") + // validate index created successfully + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(HoodieTestUtils.getDefaultStorageConf) + .build() + assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col")) + // validate the secondary index records themselves + checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from hudi_metadata('$basePath') where type=7")( + Seq("abc", "row1"), + Seq("cde", "row2"), + Seq("def", "row3") + ) + // drop secondary index + spark.sql(s"drop index secondary_index_idx_not_record_key_col on $tableName") + // validate index dropped successfully + metaClient = HoodieTableMetaClient.reload(metaClient) + assert(!metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col")) + // query metadata table and check no records for secondary index + assert(spark.sql(s"select * from hudi_metadata('$basePath') where type=7").count() == 0) + } + } + @ParameterizedTest @MethodSource(Array("testSecondaryIndexPruningParameters")) def testSecondaryIndexPruningWithUpdates(testCase: SecondaryIndexTestCase): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala index 3b15c4a2e428f..90d6339f19fb4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.hudi.command.index import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.util.Option import org.apache.hudi.hive.HiveSyncConfigHolder._ @@ -248,6 +249,86 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase { } } + test("Test Drop Functional Index") { + if (HoodieSparkUtils.gteqSpark3_3) { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val databaseName = "default" + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'id' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + + var metaClient = createMetaClient(spark, basePath) + + assert(metaClient.getTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)) + + val sqlParser: ParserInterface = spark.sessionState.sqlParser + val analyzer: Analyzer = spark.sessionState.analyzer + + var logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName") + var resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table, databaseName, tableName) + + var createIndexSql = s"create index idx_datestr on $tableName using column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')" + logicalPlan = sqlParser.parsePlan(createIndexSql) + + resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table, databaseName, tableName) + assertResult("idx_datestr")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) + assertResult("column_stats")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) + assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists) + + spark.sql(createIndexSql) + metaClient = createMetaClient(spark, basePath) + assertTrue(metaClient.getIndexMetadata.isPresent) + var functionalIndexMetadata = metaClient.getIndexMetadata.get() + assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size()) + assertEquals("func_index_idx_datestr", functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName) + + // Verify one can create more than one functional index + createIndexSql = s"create index name_lower on $tableName using column_stats(ts) options(func='identity')" + spark.sql(createIndexSql) + metaClient = createMetaClient(spark, basePath) + functionalIndexMetadata = metaClient.getIndexMetadata.get() + assertEquals(2, functionalIndexMetadata.getIndexDefinitions.size()) + assertEquals("func_index_name_lower", functionalIndexMetadata.getIndexDefinitions.get("func_index_name_lower").getIndexName) + + // Ensure that both the indexes are tracked correctly in metadata partition config + val mdtPartitions = metaClient.getTableConfig.getMetadataPartitions + assert(mdtPartitions.contains("func_index_name_lower") && mdtPartitions.contains("func_index_idx_datestr")) + + // drop functional index + spark.sql(s"drop index func_index_idx_datestr on $tableName") + // validate table config + metaClient = HoodieTableMetaClient.reload(metaClient) + assert(!metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr")) + // assert that the lower(name) index is still present + assert(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_name_lower")) + } + } + } + } + test("Test functional index update after initialization") { if (HoodieSparkUtils.gteqSpark3_3) { withTempDir(tmp => { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java index 03b6d934b5f81..66358bad4ef5c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java @@ -305,7 +305,8 @@ private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception { } private int dropIndex(JavaSparkContext jsc) throws Exception { - List partitionTypes = getRequestedPartitionTypes(cfg.indexTypes, Option.empty()); + List partitionTypes = getRequestedPartitionTypes(cfg.indexTypes, Option.empty()) + .stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient); try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { client.dropIndex(partitionTypes); From 7400bfe7ffabd0a8cae5b25ba2f201830e2c9554 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Sat, 21 Sep 2024 09:34:51 +0530 Subject: [PATCH 2/3] Address comments, remove some arg, javadoc --- .../action/index/functional/BaseHoodieIndexClient.java | 7 +++++-- .../main/java/org/apache/hudi/HoodieSparkIndexClient.java | 2 +- .../org/apache/spark/sql/hudi/command/IndexCommands.scala | 5 +++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java index f9b809e32cd09..b0282e00fb4b4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/functional/BaseHoodieIndexClient.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.StoragePath; import org.slf4j.Logger; @@ -65,6 +64,10 @@ public void register(HoodieTableMetaClient metaClient, String indexName, String /** * Drop an index. By default, ignore drop if index does not exist. + * + * @param metaClient {@link HoodieTableMetaClient} instance + * @param indexName index name for the index to be dropped + * @param ignoreIfNotExists ignore drop if index does not exist */ - public abstract void drop(HoodieTableMetaClient metaClient, String indexName, MetadataPartitionType metadataPartitionType, boolean ignoreIfNotExists); + public abstract void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java index f0fe11904480c..6264875c9fb23 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkIndexClient.java @@ -113,7 +113,7 @@ public void create(HoodieTableMetaClient metaClient, String indexName, String in } @Override - public void drop(HoodieTableMetaClient metaClient, String indexName, MetadataPartitionType metadataPartitionType, boolean ignoreIfNotExists) { + public void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists) { if (!indexExists(metaClient, indexName)) { if (ignoreIfNotExists) { return; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala index 4371844fc3559..24bf70a8c4b1d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala @@ -77,8 +77,9 @@ case class DropIndexCommand(table: CatalogTable, val tableId = table.identifier val metaClient = createHoodieTableMetaClient(tableId, sparkSession) try { - val metadataPartitionType = MetadataPartitionType.fromPartitionPath(indexName) - HoodieSparkIndexClient.getInstance(sparkSession).drop(metaClient, indexName, metadataPartitionType, ignoreIfNotExists) + // need to ensure that the index name is for a valid partition type + MetadataPartitionType.fromPartitionPath(indexName) + HoodieSparkIndexClient.getInstance(sparkSession).drop(metaClient, indexName, ignoreIfNotExists) } catch { case _: IllegalArgumentException => SecondaryIndexManager.getInstance().drop(metaClient, indexName, ignoreIfNotExists) From e8a0631fdadef4f8f4b054924f6784f566427c05 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Sat, 21 Sep 2024 11:47:44 +0530 Subject: [PATCH 3/3] simplify test --- .../TestSecondaryIndexPruning.scala | 23 ++-- .../command/index/TestFunctionalIndex.scala | 119 ++++++++---------- 2 files changed, 63 insertions(+), 79 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index d48249a589488..6131ce1522124 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -19,7 +19,7 @@ package org.apache.hudi.functional -import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} +import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy import org.apache.hudi.client.transaction.lock.InProcessLockProvider @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, E import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, Row} import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource} @@ -158,19 +158,14 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { } } - @ParameterizedTest - @MethodSource(Array("testSecondaryIndexPruningParameters")) - def testCreateAndDropSecondaryIndex(testCase: SecondaryIndexTestCase): Unit = { + @Test + def testCreateAndDropSecondaryIndex(): Unit = { if (HoodieSparkUtils.gteqSpark3_3) { - val tableType = testCase.tableType - val isPartitioned = testCase.isPartitioned var hudiOpts = commonOpts hudiOpts = hudiOpts + ( - DataSourceWriteOptions.TABLE_TYPE.key -> tableType, + DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") - val sqlTableType = if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) "cow" else "mor" - tableName += "test_secondary_index_create_drop" + (if (isPartitioned) "_partitioned" else "") + sqlTableType - val partitionedByClause = if (isPartitioned) "partitioned by(partition_key_col)" else "" + tableName += "test_secondary_index_create_drop_partitioned_mor" spark.sql( s""" @@ -182,13 +177,13 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { |) using hudi | options ( | primaryKey ='record_key_col', - | type = '$sqlTableType', + | type = 'mor', | hoodie.metadata.enable = 'true', | hoodie.metadata.record.index.enable = 'true', | hoodie.datasource.write.recordkey.field = 'record_key_col', | hoodie.enable.data.skipping = 'true' | ) - | $partitionedByClause + | partitioned by(partition_key_col) | location '$basePath' """.stripMargin) // by setting small file limit to 0, each insert will create a new file @@ -434,7 +429,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { val executor = Executors.newFixedThreadPool(2) implicit val executorContext: ExecutionContext = ExecutionContext.fromExecutor(executor) - val function = new Function1[Int, Boolean] { + val function = new (Int => Boolean) { override def apply(writerId: Int): Boolean = { try { val data = if(writerId == 1) Seq( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala index 90d6339f19fb4..4bc841cb72850 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala @@ -252,79 +252,68 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase { test("Test Drop Functional Index") { if (HoodieSparkUtils.gteqSpark3_3) { withTempDir { tmp => - Seq("cow", "mor").foreach { tableType => - val databaseName = "default" - val tableName = generateTableName - val basePath = s"${tmp.getCanonicalPath}/$tableName" - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | options ( - | primaryKey ='id', - | type = '$tableType', - | preCombineField = 'ts', - | hoodie.metadata.record.index.enable = 'true', - | hoodie.datasource.write.recordkey.field = 'id' - | ) - | partitioned by(ts) - | location '$basePath' + val databaseName = "default" + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.metadata.record.index.enable = 'true', + | hoodie.datasource.write.recordkey.field = 'id' + | ) + | partitioned by(ts) + | location '$basePath' """.stripMargin) - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - - var metaClient = createMetaClient(spark, basePath) - - assert(metaClient.getTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)) - - val sqlParser: ParserInterface = spark.sessionState.sqlParser - val analyzer: Analyzer = spark.sessionState.analyzer + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") - var logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName") - var resolvedLogicalPlan = analyzer.execute(logicalPlan) - assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table, databaseName, tableName) + var metaClient = createMetaClient(spark, basePath) - var createIndexSql = s"create index idx_datestr on $tableName using column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')" - logicalPlan = sqlParser.parsePlan(createIndexSql) + assert(metaClient.getTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)) - resolvedLogicalPlan = analyzer.execute(logicalPlan) - assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table, databaseName, tableName) - assertResult("idx_datestr")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) - assertResult("column_stats")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) - assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists) + val sqlParser: ParserInterface = spark.sessionState.sqlParser + val analyzer: Analyzer = spark.sessionState.analyzer - spark.sql(createIndexSql) - metaClient = createMetaClient(spark, basePath) - assertTrue(metaClient.getIndexMetadata.isPresent) - var functionalIndexMetadata = metaClient.getIndexMetadata.get() - assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size()) - assertEquals("func_index_idx_datestr", functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName) + val logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName") + val resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table, databaseName, tableName) - // Verify one can create more than one functional index - createIndexSql = s"create index name_lower on $tableName using column_stats(ts) options(func='identity')" - spark.sql(createIndexSql) - metaClient = createMetaClient(spark, basePath) - functionalIndexMetadata = metaClient.getIndexMetadata.get() - assertEquals(2, functionalIndexMetadata.getIndexDefinitions.size()) - assertEquals("func_index_name_lower", functionalIndexMetadata.getIndexDefinitions.get("func_index_name_lower").getIndexName) + // create functional index + spark.sql(s"create index idx_datestr on $tableName using column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')") + metaClient = createMetaClient(spark, basePath) + assertTrue(metaClient.getIndexMetadata.isPresent) + var functionalIndexMetadata = metaClient.getIndexMetadata.get() + assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size()) + assertEquals("func_index_idx_datestr", functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName) - // Ensure that both the indexes are tracked correctly in metadata partition config - val mdtPartitions = metaClient.getTableConfig.getMetadataPartitions - assert(mdtPartitions.contains("func_index_name_lower") && mdtPartitions.contains("func_index_idx_datestr")) + // Verify one can create more than one functional index + spark.sql(s"create index name_lower on $tableName using column_stats(ts) options(func='identity')") + metaClient = createMetaClient(spark, basePath) + functionalIndexMetadata = metaClient.getIndexMetadata.get() + assertEquals(2, functionalIndexMetadata.getIndexDefinitions.size()) + assertEquals("func_index_name_lower", functionalIndexMetadata.getIndexDefinitions.get("func_index_name_lower").getIndexName) - // drop functional index - spark.sql(s"drop index func_index_idx_datestr on $tableName") - // validate table config - metaClient = HoodieTableMetaClient.reload(metaClient) - assert(!metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr")) - // assert that the lower(name) index is still present - assert(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_name_lower")) - } + // Ensure that both the indexes are tracked correctly in metadata partition config + val mdtPartitions = metaClient.getTableConfig.getMetadataPartitions + assert(mdtPartitions.contains("func_index_name_lower") && mdtPartitions.contains("func_index_idx_datestr")) + + // drop functional index + spark.sql(s"drop index func_index_idx_datestr on $tableName") + // validate table config + metaClient = HoodieTableMetaClient.reload(metaClient) + assert(!metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr")) + // assert that the lower(name) index is still present + assert(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_name_lower")) } } }