-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-2488][HUDI-3175] Implement async metadata indexing #4693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
238b128 to
ca12a78
Compare
manojpec
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments so far.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
c5c563f to
06c6dd9
Compare
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
Show resolved
Hide resolved
06c6dd9 to
7920cb1
Compare
prashantwason
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good progress on this one. Getting close to being complete.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Outdated
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
| return UtilHelpers.retry(retry, () -> { | ||
| switch (cfg.runningMode.toLowerCase()) { | ||
| case SCHEDULE: { | ||
| LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you move all this code into scheduleIndexing(), you can simplify SCHEDULE_AND_EXECUTE and remove some code duplication.
| return result; | ||
| } | ||
| case SCHEDULE_AND_EXECUTE: { | ||
| LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simply scheduleIndexing(..); followed by runIndexing(..)
| } | ||
|
|
||
| private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception { | ||
| String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code duplication here. I suggested above on how to remove this function.
| } | ||
| } | ||
|
|
||
| private int handleError(Option<HoodieIndexCommitMetadata> commitMetadata) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean seems a better return value from this func.
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
Show resolved
Hide resolved
7920cb1 to
e6e3e16
Compare
Thanks @prashantwason for reviewing. I'll address your comments soon. |
e6e3e16 to
4a036d8
Compare
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will sync up directly w/ you on some of the feedback
| final String fileGroupFileId = String.format("%s%04d", metadataPartition.getFileIdPrefix(), i); | ||
| // if a writer or async indexer had already initialized the filegroup then continue | ||
| if (!fileSlices.isEmpty() && fileSlices.stream().anyMatch(fileSlice -> fileGroupFileId.equals(fileSlice.getFileGroupId().getFileId()))) { | ||
| continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you help me understand how does partially failed filegroup instantiation is handled. Do we clean up all file groups and start from scratch or do we continue from where we left ? I mean, if indexer restarts next time around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not handled currently.
So, first we check whether a particular partition needs to be initialized or not. If yes, then initialize but in case of partial failed filegroup instantiation, we will clean up all file groups and start from scratch. Will add this logic.
| } | ||
|
|
||
| public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws IOException { | ||
| // TODO: update table config and do it in a transaction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please file a tracking ticket if we don't have one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a writer is holding onto an instance of hoodieTableConfig, it may not refresh from time to time right. So, if a partition was deleted mid-way, when the writer tries to apply a commit to metadata table, wont hoodieTableConfig.getMetadataPartitionsToUpdate() return stale values?
Do we ensure such flow succeeds even if there are partitions to update, but actual MD partition is deleted?
| * record-index-bucket-0000, .... -> ..., record-index-bucket-0009 | ||
| */ | ||
| private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, | ||
| public void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check the bootstrapping code snippet. for eg, we check latest synced instant in metadata table and check if its already archived in data table.
With multiple partitions, each partition could be instantiated at different points in time. Can we check all such guards/conditions and ensure its all intact with latest state of metadata table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even with each partition could be instantiated at different physical times, the logical times (hudi instatnt timestamp) will be the same. Are you taking from rescheduing index pov? Anyway, i think we should check run the same checks as in initializeIfNeeded before initializing file groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it.
what happens if someone trigger hoodie Indexer even w/o enabling MDT for regular writers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think thru what incase someone sets FILES partition also to be indexed?
|
|
||
| private List<String> getMetadataPartitionsToUpdate() { | ||
| // find last (pending or) completed index instant and get partitions (to be) written | ||
| Option<HoodieInstant> lastIndexingInstant = dataMetaClient.getActiveTimeline() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess we have to fix this to read from table Properties ?
| if (enabled && metadata != null) { | ||
| Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata(); | ||
| commit(instantTime, partitionRecordsMap, canTriggerTableService); | ||
| List<String> partitionsToUpdate = getMetadataPartitionsToUpdate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this work for a table that migrated from 0.10.0 for eg. they may not have added "files" partition to table properties right? i.e. list of fully completed metadata partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scyced up offline. but can you show me, where do we update files partition to hoodieTable config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it in 3 ot 4 upgrade handler. but what if somone enables after few commits after upgrade?
| Future<?> postRequestIndexingTaskFuture = executorService.submit(new PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex)); | ||
| try { | ||
| // TODO: configure timeout | ||
| postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
60 secs is too short. if there are 100+ instants to catch up, would we complete in 60 secs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now, configured it to be 5 minutes by default. i did 10 small deltastreamer commits (12 columns, 1000 records in each round) using ksql-datagen and it was fine. I understand this could be time-consuming. I'll run a scale test later and try to figure out a better default value.
| HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build(); | ||
| Set<HoodieInstant> metadataCompletedTimeline = metadataMetaClient.getActiveTimeline() | ||
| .getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet()); | ||
| List<HoodieInstant> finalRemainingInstantsToIndex = remainingInstantsToIndex.map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we fetch all instants (pending, complete) at L106. so, I assume finalRemainingInstantsToIndex could contain inflight commits as well. And so, there are chances that when executing PostRequestIndexingTask, the actual writer would have already applied the commit to MDT. have we considered this scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so now, we're checking against completed instants in MDT timeline (see getCompletedArchivedAndActiveInstantsAfter method). Only if an instant in data timeline that is yet to be indexed but not present in MDT, we wait till it gets completed (reload timeline pariodically until timeout).
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Show resolved
Hide resolved
4e8f6e0 to
680a99a
Compare
d02e0c2 to
69071c6
Compare
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Show resolved
Hide resolved
| } | ||
|
|
||
| private boolean scheduleIndexingAtInstant(List<MetadataPartitionType> partitionTypes, String instantTime) throws HoodieIOException { | ||
| Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if someone tries to trigger indexing twice? I expect we would fail the 2nd trigger conveying that already an indexing is in progress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we check table config to see inflight/completed indexes and this would return false in case triggered twice.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Show resolved
Hide resolved
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
| System.exit(1); | ||
| } | ||
|
|
||
| final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we validate hoodie.metadata.enable is set to true. if not, let's throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets also think through what is a user wants to initialize the entire MDT via hoodieIndexer. i.e. they are not bringing up any regular writers. But first bring up HoodieIndexer and wait for everything to be built out. and then start other regular writers. from what I see, its taken care of. but do verify it once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validated this by:
- writing 2 commits with metadata disabled.
- schedule and build index.
- do another upsert with metadata disabled.
- schedule index
- do another upsert with metadata disabled.
- run index (it also does catchup).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But first bring up HoodieIndexer and wait for everything to be built out.
But we should be able to support fully async mode of operation right
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Outdated
Show resolved
Hide resolved
...ent-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
ee361b1 to
be08ba4
Compare
Fix timeline tests Make MetadataPartitionType#all generic Add instant info to plan and address review comments [HUDI-3175] Add index planner and executor Fix enum constant Rebase and resolve conflicts Fix some failing tests in CI Add indexer job utility
Resolve minor rebase conflict
checkstyle fix
be08ba4 to
010de76
Compare
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to make another pass on the core logic for RunIndexActionExecutor. But there is enough code fixes, as well as design/config/corner case clarifications to first address.
| * @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed | ||
| * @return instant time for the requested INDEX action | ||
| */ | ||
| public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this api also take additional args for what kind of indexes to build?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consistent use of indexing vs index
| { | ||
| "namespace": "org.apache.hudi.avro.model", | ||
| "type": "record", | ||
| "name": "HoodieIndexCommitMetadata", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to call everything "Indexing" vs "index"
| "fields": [ | ||
| { | ||
| "name": "version", | ||
| "doc": "This field replaces the field filesToBeDeletedPerPartition", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix docs
| ], | ||
| "default": 1 | ||
| }, | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Outdated
Show resolved
Hide resolved
...i-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
Show resolved
Hide resolved
|
|
||
| // index catchup for all remaining instants with a timeout | ||
| currentIndexedInstant = indexUptoInstant; | ||
| ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double check to shut this down in all failure scenarios
| updateTableConfig(table.getMetaClient(), finalIndexPartitionInfos); | ||
| table.getActiveTimeline().saveAsComplete( | ||
| new HoodieInstant(true, INDEX_ACTION, indexInstant.getTimestamp()), | ||
| TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if we fail before reaching L165. i.e the timeline saving. How do we recover/reconcile
| } | ||
| } | ||
|
|
||
| private static List<HoodieInstant> getRemainingArchivedAndActiveInstantsSince(String instant, HoodieTableMetaClient metaClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests for all these methods
- Abort gracefully after deleting partition and instant. - Handle other actions in timeline to consider before catching up
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple nits
| completedIndexes.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList())); | ||
| dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_INDEX_COMPLETED.key(), String.join(",", completedIndexes)); | ||
| private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) { | ||
| Set<String> completedIndexes = getCompletedMetadataPartitions(dataMetaClient.getTableConfig()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
completedPartitions
| String REQUESTED_EXTENSION = ".requested"; | ||
| String RESTORE_ACTION = "restore"; | ||
| String INDEX_ACTION = "index"; | ||
| String INDEX_ACTION = "indexing"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INDEXING_ACTION
| // schema for the files partition is same between the two versions | ||
| if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES)) { | ||
| tablePropsToAdd.put(TABLE_METADATA_PARTITIONS, MetadataPartitionType.FILES.getPartitionPath()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @codope Just thinking, when users set current version 4 which means there is no need for upgrade/downgrade. Then how can we update the TABLE_METADATA_PARTITIONS column here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangyue19921010 Good question! So, if no upgrade is required.. or let's say you upgraded to current version with metadata disabled and then later after few commits metadata was enabled, then this table config will get update in the metadata initialization path i.e. where HoodieBackedTableMetadataWriter#updateInitializedPartitionsInTableConfig is called.
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one conceptual question On considering non_write actions during scheduling. We can fix minor things, land and then follow up.
| } | ||
| HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps()); | ||
| LOG.warn("Deleting Metadata Table partitions: " + partitionPath); | ||
| dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if delete fails midway before finishing? There is a follow on to use DELETE_PARTITION instead? Even there we could have that operation fail midway and we need some mechanism to reconcile/retry next time we tryto build that partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this will be replaced by DELETE_PARTITION path. we just got the lazy deletion of partitions landed.
indeed there are multiple point of failure but unlike schedule/run index delete is a bit safer in terms of partial failures. we would be in trouble if partition gets deleted but table cnfig is not updated.. so we update the table config first.. if table config is updated but partitions is not fully deleted, users can re-trigger drop.
| indexesInflightOrCompleted.addAll(getCompletedMetadataPartitions(tableConfig)); | ||
| Set<String> requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); | ||
| requestedPartitions.removeAll(indexesInflightOrCompleted); | ||
| if (!requestedPartitions.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if empty then return ?
| LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to index only these partitions: %s", | ||
| indexesInflightOrCompleted, requestedPartitions)); | ||
| } | ||
| List<MetadataPartitionType> finalPartitionsToIndex = partitionIndexTypes.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cant we just use requestedPartitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to pass List instead of List
will change when we get to secondary indexes.. it should be List i.e. list of partition paths
| indexesInflightOrCompleted.addAll(getCompletedMetadataPartitions(tableConfig)); | ||
| Set<String> requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); | ||
| requestedPartitions.removeAll(indexesInflightOrCompleted); | ||
| if (!requestedPartitions.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return if empty?
| // ensure the metadata partitions for the requested indexes are not already available (or inflight) | ||
| HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); | ||
| Set<String> indexesInflightOrCompleted = getInflightMetadataPartitions(tableConfig); | ||
| indexesInflightOrCompleted.addAll(getCompletedMetadataPartitions(tableConfig)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code Can move to a helper and shared with schedule A E ?
| Set<String> inflightPartitions = getInflightMetadataPartitions(table.getMetaClient().getTableConfig()); | ||
| Set<String> completedPartitions = getCompletedMetadataPartitions(table.getMetaClient().getTableConfig()); | ||
| // delete metadata partition | ||
| requestedPartitions.forEach(partition -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think about what happens if this fails midway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are multipe points of failure here.. to reduce the blast radius we will make changes to the table config first because after this patch we mostly rely on table configs. additionally, we need more cli commands to allow users to recover easily.. tracking in HUDI-3753
| // since only write timeline was considered while scheduling index, which gives us the indexUpto instant | ||
| // here we consider other valid actions to pick catchupStart instant | ||
| Set<String> validActions = CollectionUtils.createSet(CLEAN_ACTION, RESTORE_ACTION, ROLLBACK_ACTION); | ||
| HoodieInstant catchupStartInstant = table.getMetaClient().reloadActiveTimeline() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Option?
| private List<HoodieInstant> getInstantsToCatchup(String indexUptoInstant) { | ||
| // since only write timeline was considered while scheduling index, which gives us the indexUpto instant | ||
| // here we consider other valid actions to pick catchupStart instant | ||
| Set<String> validActions = CollectionUtils.createSet(CLEAN_ACTION, RESTORE_ACTION, ROLLBACK_ACTION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do it Such that any non write actions are picked up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also why not have scheduling consider non write actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do it Such that any non write actions are picked up?
Only savepoint is remaining right. deliberately avoided savepoint as it does not alter the filegroup in any way right (except for marking it so as to avoid cleaner). so i did not consider that.
Also why not have scheduling consider non write actions?
yes, that's the way to go.
we consider non write actions to determine the catchup start instant. going back to the table we discussed https://github.com/apache/hudi/pull/4693/files#r837817961 we need both indexUpto and catchupStart instants. i plan to write them to the index plan rather than pass as parameters. i'm going to revamp the index plan schema so that the API exposes minimal arguments and the plan is the source of truth as we discussed.Tracking it in HUDI-3755
| // we need take a lock here as inflight writer could also try to update the timeline | ||
| txnManager.beginTransaction(Option.of(instant), Option.empty()); | ||
| LOG.info("Updating metadata table for instant: " + instant); | ||
| switch (instant.getAction()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is n't there a top level method in metadata writer to handle different instant types ? We can reuse that or move this code there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, will take this refactoring along with a followup task.. it needs a little bit more than extracting to a method.
| throw new HoodieIndexException(String.format("Thread interrupted while running indexing check for instant: %s", instant), e); | ||
| } | ||
| } | ||
| // if instant completed, ensure that there was metadata commit, else update metadata for this completed instant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is just so that any race causing the inflight to miss this is handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes right..
What is the purpose of the pull request
This PR adds the support for asynchronous metadata indexing. Please see the RFC for the design.
ScheduleIndexActionExecutor.RunIndexActionExecutor.indexAPI inHoodieTableMetadataWriter.Brief change log
INDEX, whose state transition is described in the RFC.ScheduleIndexActionExecutor.RunIndexActionExecutor.HoodieTableMetadataWriter; a)scheduleIndex: will generate an index plan based on latest completed instant, initialize file groups and add a requestedINDEXinstant, b)index: executes the index plan and also takes care of writes that happened after indexing was requested, c)dropIndex: will drop index by removing the given metadata partition.HoodieIndexer.Verify this pull request
(Please pick either of the following options)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.