-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-2285] Metadata Table synchronous design #3426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-2285] Metadata Table synchronous design #3426
Conversation
1. Removed code which calls syncTableMetadata 2. Unit tests are broken because not all functionality have been implemented yet.
Reader does not need to merge the instants in memory. It simply opens the base and log files, validates which log blocks to read (should have completed instants on dataset timeline).
…inst file listing. Validation does not work in several cases especially with multi-writer. So its best to remove it.
We cannot perform compaction if there are previous inflight operations on the dataset. This is because a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
1. There will be fixed number of shards for each Metadata Table partition. 2. Shards are implemented using filenames of format fileId00ABCD where ABCD is the shard number. This allows easy identification of the files and their order while still keeping the names unique. 3. Shards are pre-allocation during the time of bootstrap. 4. Currently only files partition has 1 shard. But this code is required for record-level-index so implemented here.
…han latest compaction on metadata table. LogBlocks written to the log file of Metadata Table need to be validated - they are used only if they correspond to a completed action on the dataset.
…ing the table and re-bootstrapping. The two versions differ in schema and its complicated to check whether the table is in sync. So its simpler to re-bootstrap as its only the file listing which needs to be re-bootstrapped.
Since each operation on metadata table writes to the same files (file-listing partition has a single FileSlice), we can only allow single-writer access to the metadata table. For this, the Transaction Manager is used to lock the table before any updates.
66366b9 to
59cd841
Compare
| HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); | ||
| archiveLog.archiveIfRequired(context); | ||
| autoCleanOnCommit(); | ||
| syncTableMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove the metadata table sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no more additional sync process, with this re-design.
|
|
||
| protected void finishRollback(HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException { | ||
| try { | ||
| // TODO: Potential error here - rollbacks have already completed here so if the syncTableMetadata fails, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here how can we handle the case, re-bootstrap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, @leesf can you please clarify your question.
| HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), | ||
| extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); | ||
|
|
||
| syncTableMetadata(metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here means we take syncing to metadata table into a commit. more reasonable than making sync table metadata in postCommit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Flink, this code is still executed at the driver, right?
...ent/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngrade.java
Show resolved
Hide resolved
nbalajee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Before completing the dataset commit, writing the changes to the metadata table makes sense. a) Makes the dataset commit consistent with metadata table commit b) writes are scalable for future metadata operations, like record level index.
Could you update the description section, to reflect that (a) we are writing to the metadata table from the in-memory states available, as part of the original/dataset commit. (b) dataset commit will be complete, only after the metadata write is completed/successful.
...client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
Show resolved
Hide resolved
| // Dataset: C1 C2 C3.inflight[failed] C4.clean C5 R6[rolls back C3] | ||
| // Metadata: C1.delta C2.delta | ||
| // | ||
| // When R6 completes, C3.xxx will be deleted. When C5 completes, C4, C5 and R6 will be committed to Metadata Table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If R5 completed, then metadata table can be updated with C4, C5 etc.
Corner case: R6 rolled back C3 deleting all associated data/metadata files. But before R6.commit was written, Rollback failed (eg, failure related to writing rollback metadata - in other words c3.inflight was replaced by R6.inflight).
nbalajee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
| Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); | ||
| String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestMetaInstantTimestamp is same as the last instant on completedInstants of the dataset, isn't?
nbalajee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
nbalajee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
| private void compactIfNecessary(SparkRDDWriteClient writeClient, String instantTime) { | ||
| List<HoodieInstant> pendingInstants = datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested().findInstantsBefore(instantTime) | ||
| .getInstants().collect(Collectors.toList()); | ||
| if (!pendingInstants.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the dataset timeline has an incomplete commit (multiple parallel commits C1, C2 were started, C2 succeeded but C1 failed leaving C1.inflight). Dataset commits and delta commits on metadata table will be successful, but compaction could fall behind, until manual intervention.
With current approach, manual intervention would be required to clean up the inflight to allow compaction to make progress or would ingestion/dataset commits fail due to maxArchivalLimit on metadata table is reached(due to delta commits created, but not compacted)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nbalajee : are you talking about compaction of data table or metadata table. We expect that if compaction of data table fails, there will be continuous retries. If not, liveness will not be guaranteed. But in general, we need to think through this and see if we can relax this constraint. We will take this up as a follow up.
nbalajee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
| * fc9f18eb-6049-4f47-bc51-23884bef0001 | ||
| * fc9f18eb-6049-4f47-bc51-23884bef0002 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an executor receives more than one file's worth of records, we end up creating f1-0_wt_cx.parquet, f1-1_wt2_cx.parquet etc. Should the same naming convention used here?
In other words, should you use the 36 byte fileId prefix + "-ABCD"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we use the 0000-9999 as a hash partition, then we cannot reuse that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nbalajee : these shards/buckets are instantiated by driver based on configured partitions and bucket counts per partition. Not sure how executor is involved here. can you help me understand.
@vinothchandar : sorry, I don't understand your point on hash partition. can you help me understand please.
| .withFs(datasetMetaClient.getFs()) | ||
| .withRolloverLogWriteToken(FSUtils.makeWriteToken(0, 0, 0)) | ||
| .withLogWriteToken(FSUtils.makeWriteToken(0, 0, 0)) | ||
| .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any advantages to creating a log file vs the base file here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as you might have known, we fence compaction with any inflght commits in data table. And so any new writes for a new shard or bucket has to start with log file and not go into base file. If we don't create a log file here, a new write to a new bucket might start creating a base file first. And due to the fact that we fence compaction, we can't create base file here.
| * @return An integer hash of the given string | ||
| */ | ||
| public static int keyToShard(String str, int numShards) { | ||
| int h = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With a 6+ character key, int would overflow. Should this be a long?
Also, should keyToShard be customizable with other hash functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this is what I found in java sdk source code as well for String.hashcode().
guess it should be fine. thats why we take absolute value below and calculate the right bucket.
| // The lock location should be different from the dataset | ||
| Properties properties = new Properties(); | ||
| properties.putAll(datasetWriteConfig.getProps()); | ||
| properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, properties.getProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, datasetWriteConfig.getBasePath() + "/.hoodie/.locks") + "/metadata"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make the '/.hoodie/.locks' constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filesystem based lock may not work on cloud storage. not sure if we can assume this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, how do you suggest we go about this.
I am thinking if we can do something like this.
Determine the type of lock acquired and automatically derive the properties accordingly.
for FileSystemBasedLock:
User has to set "hoodie.write.lock.filesystem.path" for data table.
we can add "/metadata" to the value set for this.
for eg: if someone sets data table config to "hudi_path/.locks", we infer path for metadata as "hudi_path/.locks/metadata".
for metastore based locks:
I could not think of a way to auto derive metadata table configs. bcoz, we have 3 configs for data table locks. database name, table name and metastore uris. don't think we can do much from these configs. we might have to add 3 new props (something like below) and expect users to set them as well.
"hoodie.write.lock.hivemetastore.database" -> "hoodie.metadata.write.lock.hivemetastore.database"
"hoodie.write.lock.hivemetastore.table" - "hoodie.metadata.write.lock.hivemetastore.table"
"hoodie.write.lock.hivemetastore.uris" - "hoodie.metadata.write.lock.hivemetastore.uris"
for zookeeper based locks:
we can infer from "hoodie.write.lock.zookeeper.lock_key". we can suffix "_metadata" to the value set of this by the user.
I am not addressing this issue right now. once we have some consensus, will work on the fix.
...di-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
Show resolved
Hide resolved
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made one pass. @nsivabalan is going to take a stab at rebasing , addressing some stuff.
| lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty()); | ||
| try { | ||
| preCommit(instantTime, metadata); | ||
| table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: first committing to metadata table
| HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); | ||
| archiveLog.archiveIfRequired(context); | ||
| autoCleanOnCommit(); | ||
| syncTableMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no more additional sync process, with this re-design.
| HoodieTableMetaClient metaClient) { | ||
| setOperationType(writeOperationType); | ||
| this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); | ||
| this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)), lastCompletedTxnAndMetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: this lock was only being taken for purposes of syncing. So removing this is fine.
| syncFromInstants(datasetMetaClient); | ||
| metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer())); | ||
| } | ||
| this.datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: loading this afresh here.
| // The lock location should be different from the dataset | ||
| Properties properties = new Properties(); | ||
| properties.putAll(datasetWriteConfig.getProps()); | ||
| properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, properties.getProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, datasetWriteConfig.getBasePath() + "/.hoodie/.locks") + "/metadata"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
| Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); | ||
| Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); | ||
| processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs); | ||
| if (!wasSynced) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: need to revisit again with rollback/restore issues fixed
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
Show resolved
Hide resolved
|
|
||
| // If the base file is present then create a reader | ||
| Option<HoodieBaseFile> basefile = slice.getBaseFile(); | ||
| if (basefile.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we send initial data to log files? without any base? is this why we are creating the log files with empty delete block upfront?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| .withSpillableMapBasePath(spillableMapDirectory) | ||
| .withDiskMapType(commonConfig.getSpillableDiskMapType()) | ||
| .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()) | ||
| .withLogBlockTimestamps(validInstantTimestamps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is what fences all uncommitted data from being read out of metadata table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
|
Closing in favor for #3590 |
|
@nbalajee @nsivabalan lets resolve the CR feedback there and land. |
| Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>(); | ||
| Map<String, List<String>> partitionToDeletedFiles = new HashMap<>(); | ||
| processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs); | ||
| if (!wasSynced) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts on processRollbackMetadata(...).
with multi-writer, lastsyncTs could be mis-leading. So, just calling it out for understanding purpose.
lets say,
dataset timeline:
C1.complete, C2.complete, C3.inflight, C4.complete
metadata timeline:
C1.complete, C2.complete, C3.inflight, C4.complete.
rollback is triggered for C3.
Case1: C3 is committed to metadata.
Case2: C3 is not committed to metadata yet.
Lets see what happens when we processRollbackMetadata() R5 for rollback of C3.
As per code, 2 cases will be ignored while trying to process rollback metadata.
Case A: if instant to rollback > lastSyncTs. // metadata table is not yet caught up. skip processing.
Case B: If instant to rollback was never committed to metadata and is part of active timeline. refers to failed commit. skip processing.
For any other case, we will process the rollback metadata and add/delete appropriate files.
Case1: C3 is committed to metadata.
As per logic above,
Case A is false.
Case B true. since C3 is part of active timeline, we can't skip processing.
And so we will process the rollback since C3 was committed to metadata table.
Case2: C3 not committed to metadata.
As per logic above,
Case A is false.
Case B false. since C3 is committed to metadata, we can skip processing.
| // The lock location should be different from the dataset | ||
| Properties properties = new Properties(); | ||
| properties.putAll(datasetWriteConfig.getProps()); | ||
| properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, properties.getProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, datasetWriteConfig.getBasePath() + "/.hoodie/.locks") + "/metadata"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, how do you suggest we go about this.
I am thinking if we can do something like this.
Determine the type of lock acquired and automatically derive the properties accordingly.
for FileSystemBasedLock:
User has to set "hoodie.write.lock.filesystem.path" for data table.
we can add "/metadata" to the value set for this.
for eg: if someone sets data table config to "hudi_path/.locks", we infer path for metadata as "hudi_path/.locks/metadata".
for metastore based locks:
I could not think of a way to auto derive metadata table configs. bcoz, we have 3 configs for data table locks. database name, table name and metastore uris. don't think we can do much from these configs. we might have to add 3 new props (something like below) and expect users to set them as well.
"hoodie.write.lock.hivemetastore.database" -> "hoodie.metadata.write.lock.hivemetastore.database"
"hoodie.write.lock.hivemetastore.table" - "hoodie.metadata.write.lock.hivemetastore.table"
"hoodie.write.lock.hivemetastore.uris" - "hoodie.metadata.write.lock.hivemetastore.uris"
for zookeeper based locks:
we can infer from "hoodie.write.lock.zookeeper.lock_key". we can suffix "_metadata" to the value set of this by the user.
I am not addressing this issue right now. once we have some consensus, will work on the fix.
| .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); | ||
|
|
||
| initTableMetadata(); | ||
| initializeShards(datasetMetaClient, MetadataPartitionType.FILES.partitionPath(), createInstantTime, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess the intention here is for creating buckets for record level index and any other such partitions we might have. Will change it to buckets.
| * fc9f18eb-6049-4f47-bc51-23884bef0001 | ||
| * fc9f18eb-6049-4f47-bc51-23884bef0002 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nbalajee : these shards/buckets are instantiated by driver based on configured partitions and bucket counts per partition. Not sure how executor is involved here. can you help me understand.
@vinothchandar : sorry, I don't understand your point on hash partition. can you help me understand please.
| final String newFileIdPrefix = newFileId.substring(0, 32); | ||
| final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>(); | ||
| blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime); | ||
| final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0], blockHeader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. you are right.
| .onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), partition)) | ||
| .withFileId(shardFileId).overBaseCommit(instantTime) | ||
| .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) | ||
| .withFileSize(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, for first log block (even in regular flow), we do the same.
return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
....
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
....
for when there is no latestLogFile for a given file group, we set the size to 0.
| .withFs(datasetMetaClient.getFs()) | ||
| .withRolloverLogWriteToken(FSUtils.makeWriteToken(0, 0, 0)) | ||
| .withLogWriteToken(FSUtils.makeWriteToken(0, 0, 0)) | ||
| .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as you might have known, we fence compaction with any inflght commits in data table. And so any new writes for a new shard or bucket has to start with log file and not go into base file. If we don't create a log file here, a new write to a new bucket might start creating a base file first. And due to the fact that we fence compaction, we can't create base file here.
|
|
||
| // If the base file is present then create a reader | ||
| Option<HoodieBaseFile> basefile = slice.getBaseFile(); | ||
| if (basefile.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
| .withSpillableMapBasePath(spillableMapDirectory) | ||
| .withDiskMapType(commonConfig.getSpillableDiskMapType()) | ||
| .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()) | ||
| .withLogBlockTimestamps(validInstantTimestamps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
| @Override | ||
| public Option<String> getLatestCompactionTime() { | ||
| if (metaClient != null) { | ||
| Option<HoodieInstant> latestCompaction = metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nts: we filter for commitTimeline bcoz, clean, clustering, delta commits, etc may not result in a "commit", but respective ones (like clean, replace commit, delta commit). and so only compaction will result in a "commit instant".
| * @return An integer hash of the given string | ||
| */ | ||
| public static int keyToShard(String str, int numShards) { | ||
| int h = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this is what I found in java sdk source code as well for String.hashcode().
guess it should be fine. thats why we take absolute value below and calculate the right bucket.
What is the purpose of the pull request
Implementation of the metadata table synchronous design.
Brief change log
Please see HUDI-2285 for changes and how they are implemented. There are different commits that handle each change.
Verify this pull request
Metadata table unit tests have been updated.
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.