Skip to content

Conversation

@prashantwason
Copy link
Member

@prashantwason prashantwason commented May 10, 2023

[HUDI-6200] Enhancements to the MDT for improving performance of larger indexes.

Change Logs

Major changes

  1. Moved MDT compaction out of the pre-commit phase into the initialization phase within write client

    • MDT compaction and clean is attempted during WriteClient initialization stage before any changes to the dataset are initiated.
    • This ensures correct ordering of the compactions
    • This also creates a codepath to call compaction on the MDT without actual commits
  2. Removed support for setting fileGroup count to the MetadataPartitionType enum

    • Since enums are singletons, this design prevented having multiple different datasets with different fileGroup count for MDT parttitions in the same JVM instance
    • When there are large number of fileGroups (assume 10K+) for a MDT partition, listing fileSlices for it may take a considerable amount of time (10K slices * M basefile version + 10K * N log files per slice). Hence, this should not be done as part of the HoodieBackedTableMetadataWriter constructor and delayed to whenever actually required (prepRecords).
  3. When initializing multiple MDT partitions together, they are handled one at a time

    • Reduces memory requirements as larger indexes may need a huge amount of memory
    • Improves chances of indexes being built.
    • If the FILES partition is already initialized, it is listed to find the files thereby saving expensive entire dataset listing while
      initializing addtional partitions
  4. Initial commit into MDT to initialize a new partition is not a bulkCommit

    • HFiles are created directly for the first commit
    • Since first commits write a large amount of data, bulkCommit into a basefile is much faster than writing log files
    • Added SparkHoodieMetadataBulkInsertPartitioner as the bulk insert partitioner. It is optimized to partition records in a single pass.
  5. The number of fileGroups for each MDT partition are determined as part of the initialization process.

    • This allows the fileGroup count to be optimized based on the number of records being added

Other changes:

  1. Created helper method to find if MDT partition is enabled: HoodieTableConfig.isMetadataPartitionEnabled()
  2. Created helper method to set state (enabled/disabled) of MDT partition: HoodieTableConfig.setMetadataPartitionState()
  3. Created helper method to delete MDT: HoodieTableMetadataUtil.deleteMetadataTable(). It correctly updates the hoodie.properties file.
  4. Created helper method to set inflight state (enabled/disabled) of MDT partition: HoodieTableMetadataUtil.setMetadataInflightPartitions
  5. Created helper method to delete MDT partition: deleteMetadataTablePartition. It correctly updates the hoodie.properties file.

Impact

TBD

Risk level (write none, low medium or high below)

TBD

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will continue to review tmrw. sending the feedback I have for now

.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
.setPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the indentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of pending items to me to review or follow up

  • Revisit async indexer code/flow
  • where exactly we determine the file group count for every partition?

@prashantwason prashantwason force-pushed the pw_one_index_at_a_time branch from e2785f4 to 364b1fe Compare May 12, 2023 17:05
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into compilation issue while reviewing this patch. here is the diff to fix it

git diff
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 5313d63575..a3bd3d9536 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -598,7 +598,7 @@ public class TestCleaner extends HoodieClientTestBase {
           }
         })
     );
-    metadataWriter.update(commitMetadata, "00000000000001", false);
+    metadataWriter.update(commitMetadata, "00000000000001");
     metaClient.getActiveTimeline().saveAsComplete(
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -1053,7 +1053,7 @@ public class TestCleaner extends HoodieClientTestBase {
           }
         })
     );
-    metadataWriter.update(commitMetadata, "00000000000001", false);
+    metadataWriter.update(commitMetadata, "00000000000001");
     metaClient.getActiveTimeline().saveAsComplete(
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
         Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
@@ -1179,7 +1179,7 @@ public class TestCleaner extends HoodieClientTestBase {
         throw new RuntimeException(e);
       }
     });
-    metadataWriter.update(commitMeta, instantTime, false);
+    metadataWriter.update(commitMeta, instantTime);
     metaClient.getActiveTimeline().saveAsComplete(
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime),
         Option.of(commitMeta.toJsonString().getBytes(StandardCharsets.UTF_8)));

Went through async indexer flow. looks ok to me. I have not tested it though. will test it out while we try to land this patch

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-reviewed again

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, I have reviewed and left some comments

partitionsInflight.remove(partition.getPartitionPath());
}
setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to persist these options?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is persisted from the caller side - HoodieTableMetadataUtil.setMetadataPartitionState

I have removed the HoodieTableMetadataUtil.setMetadataPartitionState as it was unnecessary.

case UPSERT_PREPPED:
case BULK_INSERT:
case BULK_INSERT_PREPPED:
case DELETE:
Copy link
Contributor

@danny0405 danny0405 May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enuming the write operation is really hard to maintain, can we trigger the table service whatever the operation is ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thoughts, the switch is not necessary. The above code is within a transaction lock so there should not be any conflicts of multiple writers optimizing MDT together. The checks within performTableServices should be light enough or we can optimize them.

if (DEFAULT_METADATA_POPULATE_META_FIELDS != metadataMetaClient.getTableConfig().populateMetaFields()) {
LOG.info("Re-initiating metadata table properties since populate meta fields have changed");
metadataMetaClient = initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
metadataMetaClient = initializeMetaClient();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If MDT does not exist and metadataMetaClient.getTableConfig().populateMetaFields() is true, the initializeMetaClient() could be invoked 2 times, which could incur exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Moved this block to within the try-catch

.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
.withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
.withFs(dataMetaClient.getFs())
.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the indentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


// Do timeline validation before scheduling compaction/logcompaction operations.
if (!validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not return directly because the archiving is also blocked, if no compaction plan should be scheduled, the archiving should also be triggered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. but in reality, I feel we may not gain much. unless compaction in MDT kicks in, archival might not have anything to do after last time it was able to archive something. So, not a bad idea to not trigger archival if there is no compaction.
Same applies for clean as well. but lets not optimize anything in this patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless compaction in MDT kicks in, archival might not have anything to do after last time it was able to archive something.

Then archiving will always be blocked by the compaction.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, there are many unnecessary changes for the indentation, can you set up the checkstyle of your IDEA again following the rules here: https://github.com/apache/hudi/blob/master/style/checkstyle.xml (if you installed the checkstyle plugin, you can import it manually)

@danny0405
Copy link
Contributor

@prashantwason Did you notice that there are conflicts in the codes?

@prashantwason prashantwason force-pushed the pw_one_index_at_a_time branch from 226c96c to ad81a08 Compare June 6, 2023 09:57
Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, looks good, we are good to land once all the CI tests are green.

} else if (FSUtils.isDataFile(status.getPath())) {
// Regular HUDI data file (base file or log file)
filenameToSizeMap.put(status.getPath().getName(), status.getLen());
String dataFileCommitTime = FSUtils.getCommitTime(status.getPath().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incase of MOR table, for a log file, the base instance time could be < actual delta commit time. So, we might skip the log files based on this logic?
since in L1125, we are filtering for files whose commit time < max instant time. May be we should use last mod time instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be this bug exists even if not for this patch. we might need a follow up fix.

try (HoodieTableMetadataWriter writer = SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, Option.empty(), inFlightInstantTimestamp)) {
context, Option.empty(), inFlightInstantTimestamp)) {
if (writer.isInitialized()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added the guard rail for table services in MDT to be triggered only by regular writers in Data table, so that for a single writer modes with async table services, there won't be any race conditions.
Ref: #3900
But the code evolved and we automatically enable in process lock provider for single writer mode w/ async table services. And so, we should be good to remove the constraint. Just that we might keep triggering the schedule of compaction in MDT everytime. May be we can intercept from the active timeline on when is the last time, compaction was triggered and add in some optimization.
Nothing required for this patch. But as a follow up.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. once CI passes, we can land

Comment on lines -134 to -139
if (canTriggerTableService) {
// trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table,
// we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata
// table (since reader will filter out only from delta commits)
compactIfNecessary(writeClient, instantTime);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this removed? Don't we want compaction to run?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we should trigger the metadata table compaction on each commit.

@codope codope force-pushed the pw_one_index_at_a_time branch from df3a5b5 to da92c6c Compare June 8, 2023 10:26
@hudi-bot
Copy link
Collaborator

hudi-bot commented Jun 8, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan
Copy link
Contributor

CI is green

image


@Override
public String getFileIdPfx(int partitionId) {
return fileIDPfxs.get(partitionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be right when one Hfile in one file group is too large to write with, there is a upper threshold for each base file handle, HoodieAvroHFileWrite#canWrite may return false, then the write handle factory would suffix the file group id with another auto inc number like -1 which is not correct any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this is known. we have set file size as 1GB. so, users have to set the right config for num of file groups.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No one can have awareness the file group number is pertinent with the correctness. It is a bug, not a usability issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind sharding in MDT is that you can create more shards rather than splitting a shard into two files. Spliiting will cause one large and one small size file which is not optimal.
For optimal performance, all shards should be of similar size which we achieve by hash partitioning the keys.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also adds support for automatic estimation of the shard counts for each partition, that can be enhanced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

automatic estimation of the shard counts for each partition, that can be enhanced

This may be a solution if we can make accurate estimation of the file group size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker release-0.14.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants