> instantsToRollback, boolean skipLocking) {
@@ -1425,17 +1438,38 @@ public HoodieMetrics getMetrics() {
}
/**
- * Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary
- * bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped)
+ * Performs necessary bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped).
*
- * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
- * NOT REQUIRING EXTERNAL SYNCHRONIZATION
+ * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
+ * NOT REQUIRING EXTERNAL SYNCHRONIZATION
*
* @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
- * @return instantiated {@link HoodieTable}
*/
- protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary);
+ protected void doInitTable(HoodieTableMetaClient metaClient, Option instantTime, boolean initialMetadataTableIfNecessary) {
+ Option ownerInstant = Option.empty();
+ if (instantTime.isPresent()) {
+ ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
+ }
+ this.txnManager.beginTransaction(ownerInstant, Option.empty());
+ try {
+ tryUpgrade(metaClient, instantTime);
+ if (initialMetadataTableIfNecessary) {
+ initMetadataTable(instantTime);
+ }
+ } finally {
+ this.txnManager.endTransaction(ownerInstant);
+ }
+ }
+
+ /**
+ * Bootstrap the metadata table.
+ *
+ * @param instantTime current inflight instant time
+ */
+ protected void initMetadataTable(Option instantTime) {
+ // by default do nothing.
+ }
/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
@@ -1457,18 +1491,8 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option ownerInstant = Option.empty();
- if (instantTime.isPresent()) {
- ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
- }
- this.txnManager.beginTransaction(ownerInstant, Option.empty());
- try {
- tryUpgrade(metaClient, instantTime);
- table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
- } finally {
- this.txnManager.endTransaction(ownerInstant);
- }
+ doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
+ HoodieTable table = createTable(config, hadoopConf, metaClient);
// Validate table properties
metaClient.validateTableProperties(config.getProps());
@@ -1513,7 +1537,8 @@ protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
- if (commitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {
+ String extraSchema = commitMetadata.getExtraMetadata().get(SCHEMA_KEY);
+ if (!StringUtils.isNullOrEmpty(extraSchema)) {
config.setSchema(commitMetadata.getExtraMetadata().get(SCHEMA_KEY));
} else {
throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
@@ -1557,6 +1582,12 @@ private void setWriteTimer(HoodieTable table) {
}
}
+ /**
+ * Upgrades the hoodie table if need be when moving to a new Hudi version.
+ * This method is called within a lock. Try to avoid double locking from within this method.
+ * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+ * @param instantTime instant time of interest if we have one.
+ */
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) {
UpgradeDowngrade upgradeDowngrade =
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);
@@ -1569,7 +1600,6 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option insta
if (!instantsToRollback.isEmpty()) {
Map> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
-
rollbackFailedWrites(pendingRollbacks, true);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 2992f4abd4c9e..eaa0d8d167fbb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -43,7 +43,9 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -397,7 +399,7 @@ private Stream getCleanInstantsToArchive() {
}).flatMap(Collection::stream);
}
- private Stream getCommitInstantsToArchive() {
+ private Stream getCommitInstantsToArchive() throws IOException {
// TODO (na) : Add a way to return actions associated with a timeline and then merge/unify
// with logic above to avoid Stream.concat
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
@@ -430,6 +432,13 @@ private Stream getCommitInstantsToArchive() {
table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax())
: Option.empty();
+ // The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned,
+ // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers.
+ // Meanwhile, when inline or async clustering is enabled, we need to ensure that there is a commit in the active timeline
+ // to check whether the file slice generated in pending clustering after archive isn't committed.
+ Option oldestInstantToRetainForClustering =
+ ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient());
+
// Actually do the commits
Stream instantToArchiveStream = commitTimeline.getInstants()
.filter(s -> {
@@ -442,7 +451,7 @@ private Stream getCommitInstantsToArchive() {
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}
}).filter(s -> {
- // Ensure commits >= oldest pending compaction commit is retained
+ // Ensure commits >= the oldest pending compaction/replace commit is retained
return oldestPendingCompactionAndReplaceInstant
.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
@@ -459,6 +468,10 @@ private Stream getCommitInstantsToArchive() {
oldestInstantToRetainForCompaction.map(instantToRetain ->
compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
+ ).filter(s ->
+ oldestInstantToRetainForClustering.map(instantToRetain ->
+ HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
+ .orElse(true)
);
return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
@@ -466,7 +479,7 @@ private Stream getCommitInstantsToArchive() {
}
}
- private Stream getInstantsToArchive() {
+ private Stream getInstantsToArchive() throws IOException {
Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
if (config.isMetastoreEnabled()) {
return Stream.empty();
@@ -502,24 +515,27 @@ private Stream getInstantsToArchive() {
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
- Option earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant();
-
- if (config.shouldArchiveBeyondSavepoint()) {
- // There are chances that there could be holes in the timeline due to archival and savepoint interplay.
- // So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline.
- Option firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
- if (firstNonSavepointCommit.isPresent()) {
- String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp();
- instants = instants.filter(instant ->
- compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime));
- }
- } else {
- // Do not archive the commits that live in data set active timeline.
- // This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details.
- if (earliestActiveDatasetCommit.isPresent()) {
- instants = instants.filter(instant ->
- compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
- }
+ Option qualifiedEarliestInstant =
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ dataMetaClient.getActiveTimeline(), config.shouldArchiveBeyondSavepoint());
+
+ // Do not archive the instants after the earliest commit (COMMIT, DELTA_COMMIT, and
+ // REPLACE_COMMIT only, considering non-savepoint commit only if enabling archive
+ // beyond savepoint) and the earliest inflight instant (all actions).
+ // This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata
+ // for details.
+ // Note that we cannot blindly use the earliest instant of all actions, because CLEAN and
+ // ROLLBACK instants are archived separately apart from commits (check
+ // HoodieTimelineArchiver#getCleanInstantsToArchive). If we do so, a very old completed
+ // CLEAN or ROLLBACK instant can block the archive of metadata table timeline and causes
+ // the active timeline of metadata table to be extremely long, leading to performance issues
+ // for loading the timeline.
+ if (qualifiedEarliestInstant.isPresent()) {
+ instants = instants.filter(instant ->
+ compareTimestamps(
+ instant.getTimestamp(),
+ HoodieTimeline.LESSER_THAN,
+ qualifiedEarliestInstant.get().getTimestamp()));
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
new file mode 100644
index 0000000000000..28f308a32f907
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A utility class for helper functions when performing a delete partition operation.
+ */
+public class DeletePartitionUtils {
+
+ /**
+ * Check if there are any pending table service actions (requested + inflight) on a table affecting the partitions to
+ * be dropped.
+ *
+ * This check is to prevent a drop-partition from proceeding should a partition have a table service action in
+ * the pending stage. If this is allowed to happen, the filegroup that is an input for a table service action, might
+ * also be a candidate for being replaced. As such, when the table service action and drop-partition commits are
+ * committed, there will be two commits replacing a single filegroup.
+ *
+ * For example, a timeline might have an execution order as such:
+ * 000.replacecommit.requested (clustering filegroup_1 + filegroup_2 -> filegroup_3)
+ * 001.replacecommit.requested, 001.replacecommit.inflight, 0001.replacecommit (drop_partition to replace filegroup_1)
+ * 000.replacecommit.inflight (clustering is executed now)
+ * 000.replacecommit (clustering completed)
+ * For an execution order as shown above, 000.replacecommit and 001.replacecommit will both flag filegroup_1 to be replaced.
+ * This will cause downstream duplicate key errors when a map is being constructed.
+ *
+ * @param table Table to perform validation on
+ * @param partitionsToDrop List of partitions to drop
+ */
+ public static void checkForPendingTableServiceActions(HoodieTable table, List partitionsToDrop) {
+ List instantsOfOffendingPendingTableServiceAction = new ArrayList<>();
+ // ensure that there are no pending inflight clustering/compaction operations involving this partition
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
+
+ // separating the iteration of pending compaction operations from clustering as they return different stream types
+ fileSystemView.getPendingCompactionOperations()
+ .filter(op -> partitionsToDrop.contains(op.getRight().getPartitionPath()))
+ .forEach(op -> instantsOfOffendingPendingTableServiceAction.add(op.getLeft()));
+
+ fileSystemView.getFileGroupsInPendingClustering()
+ .filter(fgIdInstantPair -> partitionsToDrop.contains(fgIdInstantPair.getLeft().getPartitionPath()))
+ .forEach(x -> instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));
+
+ if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
+ throw new HoodieDeletePartitionException("Failed to drop partitions. "
+ + "Please ensure that there are no pending table service actions (clustering/compaction) for the partitions to be deleted: " + partitionsToDrop + ". "
+ + "Instant(s) of offending pending table service action: "
+ + instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 1180845a6ed8a..b3b6177f119bf 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -153,7 +153,9 @@ public class HoodieClusteringConfig extends HoodieConfig {
+ "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
+ PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
- + PARTITION_FILTER_END_PARTITION.key() + "'].");
+ + PARTITION_FILTER_END_PARTITION.key() + "']."
+ + "DAY_ROLLING: clustering partitions on a rolling basis by the hour to avoid clustering all partitions each time, "
+ + "which strategy sorts the partitions asc and chooses the partition of which index is divided by 24 and the remainder is equal to the current hour.");
public static final ConfigProperty PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 738e2d6b48d13..855d1e4936aa4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -55,7 +55,7 @@ public class HoodieCreateHandle extends
private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);
- protected final HoodieFileWriter fileWriter;
+ protected HoodieFileWriter fileWriter;
protected final Path path;
protected long recordsWritten = 0;
protected long insertRecordsWritten = 0;
@@ -208,7 +208,10 @@ public List close() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {
- fileWriter.close();
+ if (fileWriter != null) {
+ fileWriter.close();
+ fileWriter = null;
+ }
setupWriteStatus();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9607080d1e74e..408443f0fb553 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -112,6 +112,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
+ public static final String METADATA_COMPACTION_TIME_SUFFIX = "001";
+
// Virtual keys support for metadata table. This Field is
// from the metadata payload schema.
private static final String RECORD_KEY_FIELD_NAME = HoodieMetadataPayload.KEY_FIELD_NAME;
@@ -242,6 +244,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
// Create the write config for the metadata table by borrowing options from the main write config.
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+ .withEngineType(writeConfig.getEngineType())
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withConsistencyCheckEnabled(writeConfig.getConsistencyGuardConfig().isConsistencyCheckEnabled())
@@ -265,7 +268,7 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withAutoClean(false)
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
.build())
// we will trigger archive manually, to ensure only regular writer invokes it
@@ -1016,22 +1019,38 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst
// finish off any pending compactions if any from previous attempt.
writeClient.runAnyPendingCompactions();
- String latestDeltaCommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
- .get().getTimestamp();
+ String latestDeltaCommitTimeInMetadataTable = metadataMetaClient.reloadActiveTimeline()
+ .getDeltaCommitTimeline()
+ .filterCompletedInstants()
+ .lastInstant().orElseThrow(() -> new HoodieMetadataException("No completed deltacommit in metadata table"))
+ .getTimestamp();
+ // we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table.
+ // Whenever you want to change this logic, please ensure all below scenarios are considered.
+ // a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
+ // b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, latest compaction instant time in MDT represents
+ // any instants before that is already synced with metadata table.
+ // c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every
+ // instant before c4 is synced with metadata table.
List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
- .findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());
+ .findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants().collect(Collectors.toList());
if (!pendingInstants.isEmpty()) {
- LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
- pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray())));
+ LOG.info(String.format(
+ "Cannot compact metadata table as there are %d inflight instants in data table before latest deltacommit in metadata table: %s. Inflight instants in data table: %s",
+ pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, Arrays.toString(pendingInstants.toArray())));
return;
}
// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
- final String compactionInstantTime = latestDeltaCommitTime + "001";
- if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
+ final String compactionInstantTime = latestDeltaCommitTimeInMetadataTable + METADATA_COMPACTION_TIME_SUFFIX;
+ // we need to avoid checking compaction w/ same instant again.
+ // lets say we trigger compaction after C5 in MDT and so compaction completes with C4001. but C5 crashed before completing in MDT.
+ // and again w/ C6, we will re-attempt compaction at which point latest delta commit is C4 in MDT.
+ // and so we try compaction w/ instant C4001. So, we can avoid compaction if we already have compaction w/ same instant time.
+ if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
+ && writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
writeClient.compact(compactionInstantTime);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 35eb0edfbfc61..fac1db6d54526 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -470,7 +470,17 @@ public abstract Option scheduleCleaning(HoodieEngineContext c
*
* @return information on cleaned file slices
*/
- public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking);
+ @Deprecated
+ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
+ return clean(context, cleanInstantTime);
+ }
+
+ /**
+ * Executes a new clean action.
+ *
+ * @return information on cleaned file slices
+ */
+ public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime);
/**
* Schedule rollback for the instant time.
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 64e69b1d2a9bd..737388645b4b0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -465,7 +465,24 @@ public Option getEarliestCommitToRetain() {
int hoursRetained = config.getCleanerHoursRetained();
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
- earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+ Option earliestPendingCommits = hoodieTable.getMetaClient()
+ .getActiveTimeline()
+ .getCommitsTimeline()
+ .filter(s -> !s.isCompleted()).firstInstant();
+ if (earliestPendingCommits.isPresent()) {
+ // Earliest commit to retain must not be later than the earliest pending commit
+ earliestCommitToRetain =
+ commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
+ if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
+ return Option.of(nthInstant);
+ } else {
+ return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
+ }
+ }).orElse(Option.empty());
+ } else {
+ earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants()
+ - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
+ }
} else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
Instant instant = Instant.now();
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 5d62ef390233f..8aafa6d28c425 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -71,11 +71,18 @@ public Option generateClusteringPlan() {
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
HoodieWriteConfig config = getWriteConfig();
- List partitionPaths = FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath());
- // get matched partitions if set
- partitionPaths = getMatchedPartitions(config, partitionPaths);
- // filter the partition paths if needed to reduce list status
+ String partitionSelected = config.getClusteringPartitionSelected();
+ List partitionPaths;
+
+ if (StringUtils.isNullOrEmpty(partitionSelected)) {
+ // get matched partitions if set
+ partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()));
+ // filter the partition paths if needed to reduce list status
+ } else {
+ partitionPaths = Arrays.asList(partitionSelected.split(","));
+ }
+
partitionPaths = filterPartitionPaths(partitionPaths);
if (partitionPaths.isEmpty()) {
@@ -114,15 +121,6 @@ public Option generateClusteringPlan() {
.build());
}
- public List getMatchedPartitions(HoodieWriteConfig config, List partitionPaths) {
- String partitionSelected = config.getClusteringPartitionSelected();
- if (!StringUtils.isNullOrEmpty(partitionSelected)) {
- return Arrays.asList(partitionSelected.split(","));
- } else {
- return getRegexPatternMatchedPartitions(config, partitionPaths);
- }
- }
-
public List getRegexPatternMatchedPartitions(HoodieWriteConfig config, List partitionPaths) {
String pattern = config.getClusteringPartitionFilterRegexPattern();
if (!StringUtils.isNullOrEmpty(pattern)) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 4add51886fe3a..e8acd54c5aa49 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -37,6 +37,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.marker.WriteMarkersFactory;
@@ -156,12 +157,13 @@ private void validateRollbackCommitSequence() {
// since with LAZY rollback we support parallel writing which can allow a new inflight while rollback is ongoing
// Remove this once we support LAZY rollback of failed writes by default as parallel writing becomes the default
// writer mode.
- if (config.getFailedWritesCleanPolicy().isEager()) {
+ if (config.getFailedWritesCleanPolicy().isEager() && !HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
final String instantTimeToRollback = instantToRollback.getTimestamp();
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline();
// Make sure only the last n commits are being rolled back
// If there is a commit in-between or after that is not rolled back, then abort
+ // this condition may not hold good for metadata table. since the order of commits applied to MDT is data table commits and the ordering could be different.
if ((instantTimeToRollback != null) && !commitTimeline.empty()
&& !commitTimeline.findInstantsAfter(instantTimeToRollback, Integer.MAX_VALUE).empty()) {
// check if remnants are from a previous LAZY rollback config, if yes, let out of order rollback continue
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index 7f408c1b8d24a..310ebba3c14c7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -27,6 +27,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -85,15 +86,33 @@ public HoodieSavepointMetadata execute() {
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime + " " + table.getConfig().getTableName());
- List partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
- Map> latestFilesMap = context.mapToPair(partitions, partitionPath -> {
- // Scan all partitions files with this commit time
- LOG.info("Collecting latest files in partition path " + partitionPath);
- TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
- List latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
- .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
- return new ImmutablePair<>(partitionPath, latestFiles);
- }, null);
+ TableFileSystemView.BaseFileOnlyView view = table.getBaseFileOnlyView();
+
+ Map> latestFilesMap;
+ // NOTE: for performance, we have to use different logic here for listing the latest files
+ // before or on the given instant:
+ // (1) using metadata-table-based file listing: instead of parallelizing the partition
+ // listing which incurs unnecessary metadata table reads, we directly read the metadata
+ // table once in a batch manner through the timeline server;
+ // (2) using direct file system listing: we parallelize the partition listing so that
+ // each partition can be listed on the file system concurrently through Spark.
+ // Note that
+ if (shouldUseBatchLookup(config)) {
+ latestFilesMap = view.getAllLatestBaseFilesBeforeOrOn(instantTime).entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().map(HoodieBaseFile::getFileName).collect(Collectors.toList())));
+ } else {
+ List partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
+ latestFilesMap = context.mapToPair(partitions, partitionPath -> {
+ // Scan all partitions files with this commit time
+ LOG.info("Collecting latest files in partition path " + partitionPath);
+ List latestFiles = view.getLatestBaseFilesBeforeOrOn(partitionPath, instantTime)
+ .map(HoodieBaseFile::getFileName).collect(Collectors.toList());
+ return new ImmutablePair<>(partitionPath, latestFiles);
+ }, null);
+ }
+
HoodieSavepointMetadata metadata = TimelineMetadataUtils.convertSavepointMetadata(user, comment, latestFilesMap);
// Nothing to save in the savepoint
table.getActiveTimeline().createNewInstant(
@@ -107,4 +126,22 @@ public HoodieSavepointMetadata execute() {
throw new HoodieSavepointException("Failed to savepoint " + instantTime, e);
}
}
+
+ /**
+ * Whether to use batch lookup for listing the latest base files in metadata table.
+ *
+ * Note that metadata table has to be enabled, and the storage type of the file system view
+ * cannot be EMBEDDED_KV_STORE or SPILLABLE_DISK (these two types are not integrated with
+ * metadata table, see HUDI-5612).
+ *
+ * @param config Write configs.
+ * @return {@code true} if using batch lookup; {@code false} otherwise.
+ */
+ private boolean shouldUseBatchLookup(HoodieWriteConfig config) {
+ FileSystemViewStorageType storageType =
+ config.getClientSpecifiedViewStorageConfig().getStorageType();
+ return config.getMetadataConfig().enabled()
+ && !FileSystemViewStorageType.EMBEDDED_KV_STORE.equals(storageType)
+ && !FileSystemViewStorageType.SPILLABLE_DISK.equals(storageType);
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
index dfd55f2958125..625ed18227853 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
@@ -24,7 +24,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
-import com.esotericsoftware.minlog.Log;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -47,14 +46,14 @@ public static WriteMarkers get(MarkerType markerType, HoodieTable table, String
return new DirectWriteMarkers(table, instantTime);
case TIMELINE_SERVER_BASED:
if (!table.getConfig().isEmbeddedTimelineServerEnabled()) {
- Log.warn("Timeline-server-based markers are configured as the marker type "
+ LOG.warn("Timeline-server-based markers are configured as the marker type "
+ "but embedded timeline server is not enabled. Falling back to direct markers.");
return new DirectWriteMarkers(table, instantTime);
}
String basePath = table.getMetaClient().getBasePath();
if (StorageSchemes.HDFS.getScheme().equals(
FSUtils.getFs(basePath, table.getContext().getHadoopConf().newCopy()).getScheme())) {
- Log.warn("Timeline-server-based markers are not supported for HDFS: "
+ LOG.warn("Timeline-server-based markers are not supported for HDFS: "
+ "base path " + basePath + ". Falling back to direct markers.");
return new DirectWriteMarkers(table, instantTime);
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
new file mode 100644
index 0000000000000..929540b8dcaf7
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestDeletePartitionUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDeletePartitionUtils {
+
+ private static final String PARTITION_IN_PENDING_SERVICE_ACTION = "partition_with_pending_table_service_action";
+ private static final String HARDCODED_INSTANT_TIME = "0";
+
+ private final HoodieTable table = Mockito.mock(HoodieTable.class);
+
+ private final SyncableFileSystemView fileSystemView = Mockito.mock(SyncableFileSystemView.class);
+
+ public static Stream generateTruthValues() {
+ int noOfVariables = 2;
+ int noOfRows = 1 << noOfVariables;
+ Object[][] truthValues = new Object[noOfRows][noOfVariables];
+ for (int i = 0; i < noOfRows; i++) {
+ for (int j = noOfVariables - 1; j >= 0; j--) {
+ boolean out = (i / (1 << j)) % 2 != 0;
+ truthValues[i][j] = out;
+ }
+ }
+ return Stream.of(truthValues).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateTruthValues")
+ public void testDeletePartitionUtils(
+ boolean hasPendingCompactionOperations,
+ boolean hasFileGroupsInPendingClustering) {
+ System.out.printf("hasPendingCompactionOperations: %s, hasFileGroupsInPendingClustering: %s%n",
+ hasPendingCompactionOperations, hasFileGroupsInPendingClustering);
+ Mockito.when(table.getSliceView()).thenReturn(fileSystemView);
+ Mockito.when(fileSystemView.getPendingCompactionOperations()).thenReturn(createPendingCompactionOperations(hasPendingCompactionOperations));
+ Mockito.when(fileSystemView.getFileGroupsInPendingClustering()).thenReturn(createFileGroupsInPendingClustering(hasFileGroupsInPendingClustering));
+
+ boolean shouldThrowException = hasPendingCompactionOperations || hasFileGroupsInPendingClustering;
+
+ if (shouldThrowException) {
+ assertThrows(HoodieDeletePartitionException.class,
+ () -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+ Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+ } else {
+ assertDoesNotThrow(() -> DeletePartitionUtils.checkForPendingTableServiceActions(table,
+ Collections.singletonList(PARTITION_IN_PENDING_SERVICE_ACTION)));
+ }
+ }
+
+ private static Stream> createPendingCompactionOperations(boolean hasPendingCompactionOperations) {
+ return Stream.of(Pair.of(HARDCODED_INSTANT_TIME, getCompactionOperation(hasPendingCompactionOperations)));
+ }
+
+ private static CompactionOperation getCompactionOperation(boolean hasPendingJobInPartition) {
+ return new CompactionOperation(
+ "fileId", getPartitionName(hasPendingJobInPartition), HARDCODED_INSTANT_TIME, Option.empty(),
+ new ArrayList<>(), Option.empty(), Option.empty(), new HashMap<>());
+ }
+
+ private static Stream> createFileGroupsInPendingClustering(boolean hasFileGroupsInPendingClustering) {
+ HoodieFileGroupId hoodieFileGroupId = new HoodieFileGroupId(getPartitionName(hasFileGroupsInPendingClustering), "fileId");
+ HoodieInstant hoodieInstant = new HoodieInstant(true, "replacecommit", HARDCODED_INSTANT_TIME);
+ return Stream.of(Pair.of(hoodieFileGroupId, hoodieInstant));
+ }
+
+ private static String getPartitionName(boolean hasPendingTableServiceAction) {
+ return hasPendingTableServiceAction ? PARTITION_IN_PENDING_SERVICE_ACTION : "unaffected_partition";
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index 440bc95615391..a053a9611050c 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -71,7 +71,7 @@ public void testFilterPartitionPaths() {
fakeTimeBasedPartitionsPath.add("20210719");
fakeTimeBasedPartitionsPath.add("20210721");
- List list = strategyTestRegexPattern.getMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath);
+ List list = strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig, fakeTimeBasedPartitionsPath);
assertEquals(2, list.size());
assertTrue(list.contains("20210721"));
assertTrue(list.contains("20210723"));
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 551b412ccbc6c..9a9037c17cc07 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -69,6 +69,7 @@
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +101,7 @@ public class HoodieFlinkWriteClient extends
* FileID to write handle mapping in order to record the write handles for each file group,
* so that we can append the mini-batch data buffer incrementally.
*/
- private final Map> bucketToHandles;
+ private final Map bucketToHandles;
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
@@ -126,6 +127,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}
+ @Override
+ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
+ return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient);
+ }
+
@Override
public List> filterExists(List> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
@@ -147,9 +153,10 @@ public List upsert(List> records, String instantTim
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
- final HoodieWriteHandle, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
- instantTime, table, records.listIterator());
- HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsert(context, writeHandle, instantTime, records);
+ HoodieWriteMetadata> result;
+ try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+ result = ((HoodieFlinkTable) table).upsert(context, closeableHandle.getWriteHandle(), instantTime, records);
+ }
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
@@ -166,9 +173,10 @@ public List upsertPreppedRecords(List> preppedRecor
Map>> preppedRecordsByFileId = preppedRecords.stream().parallel()
.collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId()));
return preppedRecordsByFileId.values().stream().parallel().map(records -> {
- final HoodieWriteHandle, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
- instantTime, table, records.listIterator());
- HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsertPrepped(context, writeHandle, instantTime, records);
+ HoodieWriteMetadata> result;
+ try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+ result = ((HoodieFlinkTable) table).upsertPrepped(context, closeableHandle.getWriteHandle(), instantTime, records);
+ }
return postWrite(result, instantTime, table);
}).flatMap(Collection::stream).collect(Collectors.toList());
}
@@ -180,9 +188,10 @@ public List insert(List> records, String instantTim
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// create the write handle if not exists
- final HoodieWriteHandle, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
- instantTime, table, records.listIterator());
- HoodieWriteMetadata> result = ((HoodieFlinkTable) table).insert(context, writeHandle, instantTime, records);
+ HoodieWriteMetadata> result;
+ try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+ result = ((HoodieFlinkTable) table).insert(context, closeableHandle.getWriteHandle(), instantTime, records);
+ }
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
@@ -203,9 +212,10 @@ public List insertOverwrite(
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
// create the write handle if not exists
- final HoodieWriteHandle, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
- instantTime, table, records.listIterator());
- HoodieWriteMetadata result = ((HoodieFlinkTable) table).insertOverwrite(context, writeHandle, instantTime, records);
+ HoodieWriteMetadata> result;
+ try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+ result = ((HoodieFlinkTable) table).insertOverwrite(context, closeableHandle.getWriteHandle(), instantTime, records);
+ }
return postWrite(result, instantTime, table);
}
@@ -222,9 +232,10 @@ public List insertOverwriteTable(
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
// create the write handle if not exists
- final HoodieWriteHandle, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
- instantTime, table, records.listIterator());
- HoodieWriteMetadata result = ((HoodieFlinkTable) table).insertOverwriteTable(context, writeHandle, instantTime, records);
+ HoodieWriteMetadata> result;
+ try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table)) {
+ result = ((HoodieFlinkTable) table).insertOverwriteTable(context, closeableHandle.getWriteHandle(), instantTime, records);
+ }
return postWrite(result, instantTime, table);
}
@@ -291,10 +302,14 @@ public void initMetadataTable() {
HoodieFlinkTable> table = getHoodieTable();
if (config.isMetadataTableEnabled()) {
// initialize the metadata table path
- try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
- // do nothing
+ // guard the metadata writer with concurrent lock
+ try {
+ this.txnManager.getLockManager().lock();
+ initMetadataWriter().close();
} catch (Exception e) {
throw new HoodieException("Failed to initialize metadata table", e);
+ } finally {
+ this.txnManager.getLockManager().unlock();
}
// clean the obsolete index stats
table.deleteMetadataIndexIfNecessary();
@@ -360,18 +375,21 @@ protected List postWrite(HoodieWriteMetadata> res
protected void postCommit(HoodieTable table,
HoodieCommitMetadata metadata,
String instantTime,
- Option