Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.NumericUtils;

import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
Expand Down Expand Up @@ -270,7 +271,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
}
instantsStream = instantsStream.filter(is -> predicate.test(maxInstant, is.requestedTime()));
}
TimelineFactory timelineFactory = metaClient.getTimelineLayout().getTimelineFactory();
TimelineFactory timelineFactory = metaClient.getTableFormat().getTimelineFactory();
HoodieTimeline filteredTimeline = timelineFactory.createDefaultTimeline(instantsStream, metaClient.getActiveTimeline());
return new HoodieTableFileSystemView(metaClient, filteredTimeline, pathInfoList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public String migratePartitionMeta(
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(HoodieCLI.conf);
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(engineContext, client.getStorage(), client.getBasePath(), false);
FSUtils.getAllPartitionPaths(engineContext, client, false);
StoragePath basePath = client.getBasePath();

String[][] rows = new String[partitionPaths.size()][];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void commitLogCompaction(String compactionInstantTime, HoodieWriteMetadat
HoodieTable table = tableOpt.orElseGet(() -> createTable(config, context.getStorageConf()));
completeLogCompaction(writeMetadata.getCommitMetadata().get(), table, compactionInstantTime, tableWriteStats.getMetadataTableWriteStats());
}

/**
* Schedules a new log compaction instant.
*
Expand Down Expand Up @@ -575,7 +575,8 @@ private void completeClustering(HoodieReplaceCommitMetadata replaceCommitMetadat

LOG.info("Committing Clustering {} for table {}", clusteringCommitTime, table.getConfig().getBasePath());

ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(false, clusteringInstant, replaceCommitMetadata, table.getActiveTimeline());
ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(false, clusteringInstant, replaceCommitMetadata, table.getActiveTimeline(),
completedInstant -> table.getMetaClient().getTableFormat().commit(replaceCommitMetadata, completedInstant, table.getContext(), table.getMetaClient(), table.getViewManager()));
LOG.debug("Clustering {} finished with result {}", clusteringCommitTime, replaceCommitMetadata);
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,10 @@ protected void commit(HoodieTable table,
}
// update Metadata table
writeToMetadataTable(skipStreamingWritesToMetadataTable, table, instantTime, tableWriteStats.getMetadataTableWriteStats(), metadata);
activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime), Option.of(metadata));
activeTimeline.saveAsComplete(false,
table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime), Option.of(metadata),
completedInstant -> table.getMetaClient().getTableFormat().commit(metadata, completedInstant, getEngineContext(), table.getMetaClient(), table.getViewManager())
);
// update cols to Index as applicable
HoodieColumnStatsIndexUtils.updateColsToIndex(table, config, metadata, commitActionType,
(Functions.Function2<HoodieTableMetaClient, List<String>, Void>) (metaClient, columnsToIndex) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private List<HoodieInstant> getInstantsToArchive() throws IOException {
// If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// metadata table.
if (config.isMetadataTableEnabled() && table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), table.getStorage(), config.getMetadataConfig(), config.getBasePath())) {
try (HoodieTableMetadata tableMetadata = table.refreshAndGetTableMetadata()) {
Option<String> latestCompactionTime = tableMetadata.getLatestCompactionTime();
if (!latestCompactionTime.isPresent()) {
LOG.info("Not archiving as there is no compaction yet on the metadata table");
Expand Down Expand Up @@ -397,10 +397,10 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
// Therefore, the concurrency of deleting completed instants is temporarily disabled,
// and instants are deleted in ascending order to prevent the occurrence of such holes.
// See HUDI-7207 and #10325.
completedInstants.stream()
.forEach(instant -> activeTimeline.deleteInstantFileIfExists(instant));
completedInstants.stream().forEach(activeTimeline::deleteInstantFileIfExists);
}

// Call Table Format archive to allow archiving in table format.
table.getMetaClient().getTableFormat().archive(() -> archivedInstants, table.getContext(), table.getMetaClient(), table.getViewManager());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -117,6 +118,11 @@ public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) t
deleteArchivedActions(instantsToArchive, context);
// triggers compaction and cleaning only after archiving action
this.timelineWriter.compactAndClean(context);
Supplier<List<HoodieInstant>> archivedInstants = () -> instantsToArchive.stream()
.flatMap(action -> Stream.concat(action.getCompletedInstants().stream(), action.getPendingInstants().stream()))
.collect(Collectors.toList());
// Call Table Format archive to allow archiving in table format.
table.getMetaClient().getTableFormat().archive(archivedInstants, table.getContext(), table.getMetaClient(), table.getViewManager());
} else {
LOG.info("No Instants to archive");
}
Expand Down Expand Up @@ -209,8 +215,7 @@ private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
// 4. If metadata table is enabled, do not archive instants which are more recent than the last compaction on the
// metadata table.
if (config.isMetadataTableEnabled() && table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(
table.getContext(), table.getStorage(), config.getMetadataConfig(), config.getBasePath())) {
try (HoodieTableMetadata tableMetadata = table.refreshAndGetTableMetadata()) {
Option<String> latestCompactionTime = tableMetadata.getLatestCompactionTime();
if (!latestCompactionTime.isPresent()) {
LOG.info("Not archiving as there is no compaction yet on the metadata table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config, BaseHoodieBloomIndexHelp
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(List<String> partitions, final HoodieEngineContext context,
final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getStorage(), config.getMetadataConfig(), metaClient.getBasePath());
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient, config.getMetadataConfig());
return super.loadColumnRangesFromFiles(allPartitionPaths, context, hoodieTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private HoodiePairData<String, HoodieRecordGlobalLocation> fetchRecordGlobalLoca
private List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(
final HoodieEngineContext context, final HoodieTable hoodieTable) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient.getStorage(), config.getMetadataConfig(), metaClient.getBasePath());
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(context, metaClient, config.getMetadataConfig());
// Obtain the latest data files from all the partitions.
return getLatestBaseFilesForAllPartitions(allPartitionPaths, context, hoodieTable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public HoodieTableVersion version() {

protected abstract HoodieIndex<?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);

private synchronized FileSystemViewManager getViewManager() {
public synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), unused -> getMetadataTable());
}
Expand Down Expand Up @@ -1166,14 +1166,18 @@ private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> pa

public HoodieTableMetadata getMetadataTable() {
if (metadata == null) {
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(config.getMetadataConfig().getProps())
.build();
metadata = HoodieTableMetadata.create(context, metaClient.getStorage(), metadataConfig, config.getBasePath());
metadata = refreshAndGetTableMetadata();
}
return metadata;
}

public HoodieTableMetadata refreshAndGetTableMetadata() {
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(config.getMetadataConfig().getProps())
.build();
return metaClient.getTableFormat().getMetadataFactory().create(context, metaClient.getStorage(), metadataConfig, config.getBasePath());
}

/**
* When {@link HoodieTableConfig#POPULATE_META_FIELDS} is enabled,
* we need to track written records within WriteStatus in two cases:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ public List<String> getPartitions(Object strategy, TableServiceType type) {

// get all partitions
LOG.info("Start to fetch all partitions for " + type + ". Instant " + instantTime);
return FSUtils.getAllPartitionPaths(context, table.getMetaClient().getStorage(),
config.getMetadataConfig(), table.getMetaClient().getBasePath());
return FSUtils.getAllPartitionPaths(context, table.getMetaClient(), config.getMetadataConfig());
}

public Pair<Option<HoodieInstant>, Set<String>> getIncrementalPartitions(TableServiceType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ protected void commit(HoodieWriteMetadata<O> result, List<HoodieWriteStat> write
// cannot serialize maps with null values
metadata.getExtraMetadata().entrySet().removeIf(entry -> entry.getValue() == null);
activeTimeline.saveAsComplete(false,
table.getMetaClient().createNewInstant(State.INFLIGHT, actionType, instantTime), Option.of(metadata));
table.getMetaClient().createNewInstant(State.INFLIGHT, actionType, instantTime), Option.of(metadata),
completedInstant -> table.getMetaClient().getTableFormat().commit(metadata, completedInstant, table.getContext(), table.getMetaClient(), table.getViewManager()));
LOG.info("Committed " + instantTime);
result.setCommitMetadata(Option.of(metadata));
// update cols to Index as applicable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public HoodieCompactionPlan generateCompactionPlan(String compactionInstant) thr
// TODO : check if maxMemory is not greater than JVM or executor memory
// TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
CompletionTimeQueryView completionTimeQueryView = metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient);
CompletionTimeQueryView completionTimeQueryView = metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient);
List<String> partitionPaths = getPartitions();

int allPartitionSize = partitionPaths.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
// NOTE: no need to lock here, since !skipTimelinePublish is always true,
// when skipLocking is false, txnManager above-mentioned should lock it.
// when skipLocking is true, the caller should have already held the lock.
table.getActiveTimeline().transitionRollbackInflightToComplete(false, inflightInstant, rollbackMetadata);
table.getActiveTimeline().transitionRollbackInflightToComplete(false, inflightInstant, rollbackMetadata,
completedInstant -> table.getMetaClient().getTableFormat().completedRollback(completedInstant, table.getContext(), table.getMetaClient(), table.getViewManager()));
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.NativeTableFormat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -67,11 +69,23 @@ protected List<HoodieRollbackStat> executeRollback(HoodieRollbackPlan hoodieRoll

if (instantToRollback.isCompleted()) {
LOG.info("Unpublishing instant " + instantToRollback);
table.getMetaClient().getTableFormat().rollback(instantToRollback, table.getContext(), table.getMetaClient(), table.getViewManager());
// Revert the completed instant to inflight in native format.
resolvedInstant = activeTimeline.revertToInflight(instantToRollback);
// reload meta-client to reflect latest timeline status
table.getMetaClient().reloadActiveTimeline();
}

// If instant is inflight but marked as completed in native format, delete the completed instant from storage.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nts: to review closely

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a bug fix? how could this happen?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When committing -> we commit to native timeline first (A), then plugged-in tableformat (B)

instantToRollback.isInflight() can be true, if A happened, but we failed before B. This is fixing that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ll make this limited to cases where something else is plugged in

Copy link
Copy Markdown
Contributor

@danny0405 danny0405 Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let's avoid listing the timeline multiple times for reqular workflow when the external table format is not there.

if (instantToRollback.isInflight() && !table.getMetaClient().getTableFormat().getName().equals(NativeTableFormat.TABLE_FORMAT)) {
HoodieActiveTimeline activeTimelineForNativeFormat = table.getMetaClient().getActiveTimelineForNativeFormat();
Option<HoodieInstant> instantToRollbackInNativeFormat = activeTimelineForNativeFormat.filter(instant -> instant.requestedTime().equals(instantToRollback.requestedTime())).lastInstant();
if (instantToRollbackInNativeFormat.isPresent() && instantToRollbackInNativeFormat.get().isCompleted()) {
resolvedInstant = activeTimelineForNativeFormat.revertToInflight(instantToRollbackInNativeFormat.get());
table.getMetaClient().reloadActiveTimeline();
}
}

// For Requested State (like failure during index lookup), there is nothing to do rollback other than
// deleting the timeline file
if (!resolvedInstant.isRequested()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
HoodieTableMetaClient metaClient = table.getMetaClient();
boolean isTableVersionLessThanEight = metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT);
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, table.getStorage(), table.getMetaClient().getBasePath(), false);
FSUtils.getAllPartitionPaths(context, table.getMetaClient(), false);
int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1);

context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan: " + config.getTableName());
Expand Down Expand Up @@ -285,7 +285,7 @@ private List<StoragePathInfo> listAllFilesSinceCommit(String commit,
String partitionPath,
HoodieTableMetaClient metaClient) throws IOException {
LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit);
CompletionTimeQueryView completionTimeQueryView = metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient);
CompletionTimeQueryView completionTimeQueryView = metaClient.getTableFormat().getTimelineFactory().createCompletionTimeQueryView(metaClient);
StoragePathFilter filter = (path) -> {
if (path.toString().contains(baseFileExtension)) {
String fileCommitTime = FSUtils.getCommitTime(path.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public HoodieSavepointMetadata execute() {
return latestFiles;
}));
} else {
List<String> partitions = FSUtils.getAllPartitionPaths(
context, table.getStorage(), config.getMetadataConfig(), table.getMetaClient().getBasePath());
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient(), config.getMetadataConfig());
latestFilesMap = context.mapToPair(partitions, partitionPath -> {
// Scan all partitions files with this commit time
LOG.info("Collecting latest files in partition path " + partitionPath);
Expand All @@ -143,8 +142,9 @@ public HoodieSavepointMetadata execute() {
table.getActiveTimeline().createNewInstant(
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.SAVEPOINT_ACTION, instantTime));
table.getActiveTimeline()
.saveAsComplete(instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.SAVEPOINT_ACTION, instantTime),
Option.of(metadata));
.saveAsComplete(
true, instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.SAVEPOINT_ACTION, instantTime), Option.of(metadata),
savepointCompletedInstant -> table.getMetaClient().getTableFormat().savepoint(savepointCompletedInstant, table.getContext(), table.getMetaClient(), table.getViewManager()));
LOG.info("Savepoint " + instantTime + " created");
return metadata;
} catch (HoodieIOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ protected List<String> getPartitionPathsForTTL() {
List<String> partitionsForTTL;
if (StringUtils.isNullOrEmpty(partitionSelected)) {
// Return all partition paths.
partitionsForTTL = FSUtils.getAllPartitionPaths(
hoodieTable.getContext(), hoodieTable.getStorage(), writeConfig.getMetadataConfig(), writeConfig.getBasePath());
partitionsForTTL = FSUtils.getAllPartitionPaths(hoodieTable.getContext(), hoodieTable.getMetaClient(), writeConfig.getMetadataConfig());
} else {
partitionsForTTL = Arrays.asList(partitionSelected.split(","));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,14 @@ static void downgradeMetadataPartitions(HoodieEngineContext context,
// Get base path for metadata table.
StoragePath metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath());
HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder()
.setBasePath(metadataTableBasePath.toUri().toString())
.setConf(hoodieStorage.getConf())
.build();

// Fetch metadata partition paths.
List<String> metadataPartitions = FSUtils.getAllPartitionPaths(context,
hoodieStorage,
metadataTableBasePath,
false);
List<String> metadataPartitions = FSUtils.getAllPartitionPaths(context, metadataMetaClient, false);

// Delete partitions.
List<String> validPartitionPaths = deleteMetadataPartition(context, metaClient, metadataPartitions);
Expand Down
Loading
Loading