Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -980,9 +980,9 @@ public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
/**
* Drops the index and removes the metadata partitions.
*
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
* @param metadataPartitions - list of metadata partitions which need to be dropped
*/
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
public void dropIndex(List<String> metadataPartitions) {
HoodieTable table = createTable(config);
String dropInstant = createNewInstantTime();
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
Expand All @@ -992,7 +992,7 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(dropInstant);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) {
metadataWriter.dropMetadataPartitions(partitionTypes);
metadataWriter.dropMetadataPartitions(metadataPartitions);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,9 +895,8 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
}, fileGroupFileIds.size());
}

public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
for (MetadataPartitionType partitionType : metadataPartitions) {
String partitionPath = partitionType.getPartitionPath();
public void dropMetadataPartitions(List<String> metadataPartitions) throws IOException {
for (String partitionPath : metadataPartitions) {
// first update table config
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false);
LOG.warn("Deleting Metadata Table partition: {}", partitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
* @param metadataPartitions List of MDT partitions to drop
* @throws IOException on failures
*/
void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException;
void dropMetadataPartitions(List<String> metadataPartitions) throws IOException;

/**
* Update the metadata table due to a COMMIT operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@

import java.util.Map;

public abstract class BaseHoodieFunctionalIndexClient {
public abstract class BaseHoodieIndexClient {

private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieFunctionalIndexClient.class);
private static final Logger LOG = LoggerFactory.getLogger(BaseHoodieIndexClient.class);

public BaseHoodieFunctionalIndexClient() {
public BaseHoodieIndexClient() {
}

/**
Expand Down Expand Up @@ -61,4 +61,13 @@ public void register(HoodieTableMetaClient metaClient, String indexName, String
* Create a functional index.
*/
public abstract void create(HoodieTableMetaClient metaClient, String indexName, String indexType, Map<String, Map<String, String>> columns, Map<String, String> options);

/**
* Drop an index. By default, ignore drop if index does not exist.
*
* @param metaClient {@link HoodieTableMetaClient} instance
* @param indexName index name for the index to be dropped
* @param ignoreIfNotExists ignore drop if index does not exist
*/
public abstract void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists);
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -1193,9 +1194,9 @@ private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IO
deleteMetaFile(
metaClient.getStorage(), mdtBasePath, mdtInitCommit2, DELTA_COMMIT_EXTENSION);
metaClient.getTableConfig().setMetadataPartitionState(
metaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), false);
metaClient, RECORD_INDEX.getPartitionPath(), false);
metaClient.getTableConfig().setMetadataPartitionsInflight(
metaClient, MetadataPartitionType.RECORD_INDEX);
metaClient, RECORD_INDEX);
timeline = metaClient.getActiveTimeline().reload();
mdtTimeline = mdtMetaClient.getActiveTimeline().reload();
assertEquals(commit, timeline.lastInstant().get().getTimestamp());
Expand Down Expand Up @@ -1734,7 +1735,7 @@ public void testFailedBootstrap() throws Exception {
MetadataPartitionType.FILES.getPartitionPath()).size(), 1);
assertEquals(HoodieTableMetadataUtil.getPartitionLatestFileSlices(
metadataReader.getMetadataMetaClient(), Option.empty(),
MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size(), 5);
RECORD_INDEX.getPartitionPath()).size(), 5);
}

// remove the MDT partition from dataset to simulate failed bootstrap
Expand Down Expand Up @@ -1776,7 +1777,7 @@ public void testFailedBootstrap() throws Exception {
assertEquals(HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataReader.getMetadataMetaClient(), Option.empty(),
MetadataPartitionType.FILES.getPartitionPath()).size(), 1);
assertEquals(HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataReader.getMetadataMetaClient(), Option.empty(),
MetadataPartitionType.RECORD_INDEX.getPartitionPath()).size(), 3);
RECORD_INDEX.getPartitionPath()).size(), 3);
}
}

Expand Down Expand Up @@ -2813,11 +2814,11 @@ public void testRollbackPendingCommitWithRecordIndex(boolean performUpsert) thro

// delete the metadata table partitions to check, whether rollback of pending commit succeeds and
// metadata table partitions are rebootstrapped.
metadataWriter.dropMetadataPartitions(Arrays.asList(MetadataPartitionType.RECORD_INDEX, FILES));
metadataWriter.dropMetadataPartitions(Arrays.asList(RECORD_INDEX.getPartitionPath(), FILES.getPartitionPath()));
assertFalse(storage.exists(new StoragePath(
getMetadataTableBasePath(basePath) + StoragePath.SEPARATOR + FILES.getPartitionPath())));
assertFalse(storage.exists(new StoragePath(getMetadataTableBasePath(basePath)
+ StoragePath.SEPARATOR + MetadataPartitionType.RECORD_INDEX.getPartitionPath())));
+ StoragePath.SEPARATOR + RECORD_INDEX.getPartitionPath())));

metaClient = HoodieTableMetaClient.reload(metaClient);
// Insert/upsert third batch of records
Expand All @@ -2842,7 +2843,7 @@ public void testRollbackPendingCommitWithRecordIndex(boolean performUpsert) thro
assertTrue(storage.exists(new StoragePath(
getMetadataTableBasePath(basePath) + StoragePath.SEPARATOR + FILES.getPartitionPath())));
assertTrue(storage.exists(new StoragePath(getMetadataTableBasePath(basePath)
+ StoragePath.SEPARATOR + MetadataPartitionType.RECORD_INDEX.getPartitionPath())));
+ StoragePath.SEPARATOR + RECORD_INDEX.getPartitionPath())));
}

/**
Expand Down Expand Up @@ -3451,7 +3452,7 @@ public void testDeleteWithRecordIndex() throws Exception {

// Records got inserted and RI is initialized
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX), "RI is disabled");
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX), "RI is disabled");
assertEquals(firstBatchOfrecords.size(),
HoodieClientTestUtils.readCommit(writeConfig.getBasePath(), engineContext.getSqlContext(), metaClient.reloadActiveTimeline(), firstCommitTime).count());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieFunctionalIndexException;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.index.functional.BaseHoodieFunctionalIndexClient;
import org.apache.hudi.table.action.index.functional.BaseHoodieIndexClient;

import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
Expand All @@ -56,24 +56,24 @@
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;

public class HoodieSparkFunctionalIndexClient extends BaseHoodieFunctionalIndexClient {
public class HoodieSparkIndexClient extends BaseHoodieIndexClient {

private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkFunctionalIndexClient.class);
private static final Logger LOG = LoggerFactory.getLogger(HoodieSparkIndexClient.class);

private static volatile HoodieSparkFunctionalIndexClient _instance;
private static volatile HoodieSparkIndexClient _instance;

private final SparkSession sparkSession;

private HoodieSparkFunctionalIndexClient(SparkSession sparkSession) {
private HoodieSparkIndexClient(SparkSession sparkSession) {
super();
this.sparkSession = sparkSession;
}

public static HoodieSparkFunctionalIndexClient getInstance(SparkSession sparkSession) {
public static HoodieSparkIndexClient getInstance(SparkSession sparkSession) {
if (_instance == null) {
synchronized (HoodieSparkFunctionalIndexClient.class) {
synchronized (HoodieSparkIndexClient.class) {
if (_instance == null) {
_instance = new HoodieSparkFunctionalIndexClient(sparkSession);
_instance = new HoodieSparkIndexClient(sparkSession);
}
}
}
Expand Down Expand Up @@ -112,6 +112,24 @@ public void create(HoodieTableMetaClient metaClient, String indexName, String in
}
}

@Override
public void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists) {
if (!indexExists(metaClient, indexName)) {
if (ignoreIfNotExists) {
return;
} else {
throw new HoodieFunctionalIndexException("Index does not exist: " + indexName);
}
}

LOG.info("Dropping index {}", indexName);
HoodieIndexDefinition indexDefinition = metaClient.getIndexMetadata().get().getIndexDefinitions().get(indexName);
try (SparkRDDWriteClient writeClient = HoodieCLIUtils.createHoodieWriteClient(
sparkSession, metaClient.getBasePath().toString(), mapAsScalaImmutableMap(buildWriteConfig(metaClient, indexDefinition)), toScalaOption(Option.empty()))) {
writeClient.dropIndex(Collections.singletonList(indexName));
Comment on lines +117 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference, this logic does not have much specifics to engine itself, so it can be abstracted to the index client by plugging in the engine-specific write client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need the engine-specific write client to call the base API BaseHoodieWriteClient.dropIndex?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is BaseHoodieWriteClient to abstract the write logic. Engine-specific implementation only needs to instantiate the engine-specific write client. The index client should only care about calling APIs in BaseHoodieWriteClient, which is the case here (need to generalize HoodieCLIUtils.createHoodieWriteClient too).

}
}

private static Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client, HoodieTableMetaClient metaClient, String indexName) {
List<MetadataPartitionType> partitionTypes = Collections.singletonList(MetadataPartitionType.FUNCTIONAL_INDEX);
checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package org.apache.spark.sql.hudi.command

import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieSparkFunctionalIndexClient
import org.apache.hudi.HoodieSparkIndexClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.index.secondary.SecondaryIndexManager

import org.apache.hudi.metadata.MetadataPartitionType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
Expand All @@ -52,7 +52,7 @@ case class CreateIndexCommand(table: CatalogTable,
columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava))

if (options.contains("func") || indexType.equals("secondary_index")) {
HoodieSparkFunctionalIndexClient.getInstance(sparkSession).create(
HoodieSparkIndexClient.getInstance(sparkSession).create(
metaClient, indexName, indexType, columnsMap, options.asJava)
} else {
SecondaryIndexManager.getInstance().create(
Expand All @@ -76,7 +76,14 @@ case class DropIndexCommand(table: CatalogTable,
override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = table.identifier
val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
SecondaryIndexManager.getInstance().drop(metaClient, indexName, ignoreIfNotExists)
try {
// need to ensure that the index name is for a valid partition type
MetadataPartitionType.fromPartitionPath(indexName)
HoodieSparkIndexClient.getInstance(sparkSession).drop(metaClient, indexName, ignoreIfNotExists)
} catch {
case _: IllegalArgumentException =>
SecondaryIndexManager.getInstance().drop(metaClient, indexName, ignoreIfNotExists)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why drop here again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is legacy code due to incomplete RFC-52. SecondaryIndexManager was introduced in RFC-52 but it's just a wrapper code and does not really manage any index underneath. From RFC, it's supposed to support index built on third party libraries such as Lucene. But, we have not yet added any support so far. In my opinion, we should remove all that code. Just keeping it here to make some tests pass. If you agree, I can take the cleanup as a followup later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let's have JIRA to track that.

}

// Invalidate cached table for queries do not access related table
// through {@code DefaultSource}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase {
saveMode = SaveMode.Overwrite)

val writeConfig = getWriteConfig(hudiOpts)
metadataWriter(writeConfig).dropMetadataPartitions(Collections.singletonList(MetadataPartitionType.RECORD_INDEX))
metadataWriter(writeConfig).dropMetadataPartitions(Collections.singletonList(MetadataPartitionType.RECORD_INDEX.getPartitionPath))
assertEquals(0, getFileGroupCountForRecordIndex(writeConfig))
metaClient.getTableConfig.getMetadataPartitionsInflight

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.DataSourceWriteOptions.{HIVE_STYLE_PARTITIONING, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
Expand All @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, E
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, Row}
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
Expand Down Expand Up @@ -158,6 +158,65 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness {
}
}

@Test
def testCreateAndDropSecondaryIndex(): Unit = {
if (HoodieSparkUtils.gteqSpark3_3) {
var hudiOpts = commonOpts
hudiOpts = hudiOpts + (
DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
tableName += "test_secondary_index_create_drop_partitioned_mor"

spark.sql(
s"""
|create table $tableName (
| ts bigint,
| record_key_col string,
| not_record_key_col string,
| partition_key_col string
|) using hudi
| options (
| primaryKey ='record_key_col',
| type = 'mor',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.datasource.write.recordkey.field = 'record_key_col',
| hoodie.enable.data.skipping = 'true'
| )
| partitioned by(partition_key_col)
| location '$basePath'
""".stripMargin)
// by setting small file limit to 0, each insert will create a new file
// need to generate more file for non-partitioned table to test data skipping
// as the partitioned table will have only one file per partition
spark.sql("set hoodie.parquet.small.file.limit=0")
spark.sql(s"insert into $tableName values(1, 'row1', 'abc', 'p1')")
spark.sql(s"insert into $tableName values(2, 'row2', 'cde', 'p2')")
spark.sql(s"insert into $tableName values(3, 'row3', 'def', 'p2')")
// create secondary index
spark.sql(s"create index idx_not_record_key_col on $tableName using secondary_index(not_record_key_col)")
// validate index created successfully
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(HoodieTestUtils.getDefaultStorageConf)
.build()
assert(metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
// validate the secondary index records themselves
checkAnswer(s"select key, SecondaryIndexMetadata.recordKey from hudi_metadata('$basePath') where type=7")(
Seq("abc", "row1"),
Seq("cde", "row2"),
Seq("def", "row3")
)
// drop secondary index
spark.sql(s"drop index secondary_index_idx_not_record_key_col on $tableName")
// validate index dropped successfully
metaClient = HoodieTableMetaClient.reload(metaClient)
assert(!metaClient.getTableConfig.getMetadataPartitions.contains("secondary_index_idx_not_record_key_col"))
// query metadata table and check no records for secondary index
assert(spark.sql(s"select * from hudi_metadata('$basePath') where type=7").count() == 0)
}
}

@ParameterizedTest
@MethodSource(Array("testSecondaryIndexPruningParameters"))
def testSecondaryIndexPruningWithUpdates(testCase: SecondaryIndexTestCase): Unit = {
Expand Down Expand Up @@ -370,7 +429,7 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness {

val executor = Executors.newFixedThreadPool(2)
implicit val executorContext: ExecutionContext = ExecutionContext.fromExecutor(executor)
val function = new Function1[Int, Boolean] {
val function = new (Int => Boolean) {
override def apply(writerId: Int): Boolean = {
try {
val data = if(writerId == 1) Seq(
Expand Down
Loading