Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,15 @@ public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue = "fal
throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") does not exist.");
}

long t1 = System.currentTimeMillis();
if (readOnly) {
//HoodieMetadata.init(HoodieCLI.conf, HoodieCLI.basePath);
} else {
HoodieTimer timer = new HoodieTimer().startTimer();
if (!readOnly) {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext();
HoodieTableMetadataWriter.create(HoodieCLI.conf, writeConfig, jsc);
}
long t2 = System.currentTimeMillis();

String action = readOnly ? "Opened" : "Initialized";
return String.format(action + " Metadata Table in %s (duration=%.2fsec)", metadataPath, (t2 - t1) / 1000.0);
return String.format(action + " Metadata Table in %s (duration=%.2fsec)", metadataPath, (timer.endTimer()) / 1000.0);
}

@CliCommand(value = "metadata stats", help = "Print stats about the metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
finalizeWrite(table, instantTime, stats);

try {
table.metadataWriter(jsc).update(metadata, instantTime);

activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(table, metadata, instantTime, extraMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig writeConfig, bo
super(jsc, index, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;

// Initialize Metadata Table
HoodieTableMetadataWriter.create(hadoopConf, writeConfig, jsc);
syncTableMetadata();
}

/**
Expand Down Expand Up @@ -179,7 +177,6 @@ protected void rollBackInflightBootstrap() {
table.rollbackBootstrap(jsc, HoodieActiveTimeline.createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}

}

/**
Expand Down Expand Up @@ -384,7 +381,6 @@ private JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata result, String instan
@Override
protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {

// Delete the marker directory for the instant.
new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());

Expand All @@ -400,6 +396,8 @@ protected void postCommit(HoodieTable<?> table, HoodieCommitMetadata metadata, S
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
archiveLog.archiveIfRequired(jsc);
autoCleanOnCommit(instantTime);

syncTableMetadata();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down Expand Up @@ -587,6 +585,11 @@ public HoodieCleanMetadata clean() {
return clean(HoodieActiveTimeline.createNewInstantTime());
}

public void syncTableMetadata() {
// Open up the metadata table again, for syncing
HoodieTableMetadataWriter.create(hadoopConf, config, jsc);
}

/**
* Provides a new commit time for a write operation (insert/update/delete).
*/
Expand Down Expand Up @@ -701,8 +704,6 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);

table.metadataWriter(jsc).update(metadata, compactionCommitTime);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarification: Since we are not updating the metadata table here, for compactions the metadata table will be updated the next time the HoodieWriteClient is created.

But for commit operations the metadata table (may) be updated right after the commit completes in "postCommit" function. Right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct. even for compaction, once the compaction commits, it will trigger a sync. I think we should ensure the sync only happens for commit operations (i.e only one allowed at the moment). I ll double check this.

CompactHelpers.completeInflightCompaction(table, compactionCommitTime, metadata);

if (compactionTimer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWrite
enabled = true;

// Inline compaction and auto clean is required as we dont expose this table outside
ValidationUtils.checkArgument(this.metadataWriteConfig.isAutoClean(), "Auto clean is required for Metadata Compaction config");
ValidationUtils.checkArgument(this.metadataWriteConfig.isInlineCompaction(), "Inline compaction is required for Metadata Compaction config");
ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
ValidationUtils.checkArgument(!this.metadataWriteConfig.isInlineCompaction(), "Compaction is controlled internally for metadata table.");
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(), "File listing cannot be used for Metadata Table");
Expand Down Expand Up @@ -148,7 +148,7 @@ public class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWrite
// may have occurred on the table. Hence, calling this always ensures that the metadata is brought in sync
// with the active timeline.
HoodieTimer timer = new HoodieTimer().startTimer();
syncFromInstants(jsc, datasetMetaClient);
syncFromInstants(datasetMetaClient);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, timer.endTimer()));
}
} else {
Expand Down Expand Up @@ -184,12 +184,14 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.forTable(tableName)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withAsyncClean(writeConfig.isMetadataAsyncClean())
.withAutoClean(true)
// we will trigger cleaning manually, to control the instant times
.withAutoClean(false)
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
.archiveCommitsWith(writeConfig.getMetadataMinCommitsToKeep(), writeConfig.getMetadataMaxCommitsToKeep())
.withInlineCompaction(true)
// we will trigger compaction manually, to control the instant times
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
Expand Down Expand Up @@ -376,7 +378,7 @@ private void bootstrapFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient
*
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
*/
private void syncFromInstants(JavaSparkContext jsc, HoodieTableMetaClient datasetMetaClient) {
private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it is not enabled");

try {
Expand All @@ -391,56 +393,35 @@ private void syncFromInstants(JavaSparkContext jsc, HoodieTableMetaClient datase
final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline();
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");
ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");

switch (instant.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
// CLEAN is synced from the
// - inflight instant which contains the HoodieCleanerPlan, or
// - complete instant which contains the HoodieCleanMetadata
try {
HoodieInstant inflightCleanInstant = new HoodieInstant(true, instant.getAction(), instant.getTimestamp());
ValidationUtils.checkArgument(inflightCleanInstant.isInflight());
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant);
update(cleanerPlan, instant.getTimestamp());
} catch (HoodieIOException e) {
HoodieInstant cleanInstant = new HoodieInstant(false, instant.getAction(), instant.getTimestamp());
ValidationUtils.checkArgument(cleanInstant.isCompleted());
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant);
update(cleanMetadata, instant.getTimestamp());
}
case HoodieTimeline.CLEAN_ACTION:
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
update(cleanMetadata, instant.getTimestamp());
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.COMPACTION_ACTION: {
ValidationUtils.checkArgument(instant.isCompleted());
case HoodieTimeline.COMPACTION_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
update(commitMetadata, instant.getTimestamp());
break;
}
case HoodieTimeline.ROLLBACK_ACTION: {
ValidationUtils.checkArgument(instant.isCompleted());
case HoodieTimeline.ROLLBACK_ACTION:
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
timeline.getInstantDetails(instant).get());
update(rollbackMetadata, instant.getTimestamp());
break;
}
case HoodieTimeline.RESTORE_ACTION: {
ValidationUtils.checkArgument(instant.isCompleted());
case HoodieTimeline.RESTORE_ACTION:
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
timeline.getInstantDetails(instant).get());
update(restoreMetadata, instant.getTimestamp());
break;
}
case HoodieTimeline.SAVEPOINT_ACTION: {
ValidationUtils.checkArgument(instant.isCompleted());
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
}
default: {
default:
throw new HoodieException("Unknown type of action " + instant.getAction());
}
}
}
// re-init the table metadata, for any future writes.
Expand Down Expand Up @@ -472,8 +453,7 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
writeStats.forEach(hoodieWriteStat -> {
String pathWithPartition = hoodieWriteStat.getPath();
if (pathWithPartition == null) {
// Empty partition
return;
throw new HoodieMetadataException("Unable to find path in write stat to update metadata table " + hoodieWriteStat);
}

int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
Expand Down Expand Up @@ -509,13 +489,6 @@ public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
return;
}

HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
long cnt = timeline.filterCompletedInstants().getInstants().filter(i -> i.getTimestamp().equals(instantTime)).count();
if (cnt == 1) {
LOG.info("Ignoring update from cleaner plan for already completed instant " + instantTime);
return;
}

List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> {
Expand Down Expand Up @@ -725,6 +698,13 @@ private synchronized void commit(JavaSparkContext jsc, JavaRDD<HoodieRecord> rec
throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
}
});
// trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
writeClient.clean(instantTime + "001");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. I hope it does not break some function which may depend on yyyyMMddHHmmss format of the instantTime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prashantwason our tests use 001,002 etc. So code should be fine. but may be some metrics reporting code might have some issue. let me double check. good call

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Seems like the parser can handle this

if (writeClient.scheduleCompactionAtInstant(instantTime + "002", Option.empty())) {
writeClient.compact(instantTime + "002");
}
}

// Update total size of the metadata and count of base/log files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -96,7 +95,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
private SerializableConfiguration hadoopConfiguration;
private transient FileSystemViewManager viewManager;

private HoodieTableMetadataWriter metadataWriter;
private HoodieTableMetadata metadata;

protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
Expand Down Expand Up @@ -642,11 +640,4 @@ public HoodieTableMetadata metadata() {
}
return metadata;
}

public HoodieTableMetadataWriter metadataWriter(JavaSparkContext jsc) {
if (metadataWriter == null) {
metadataWriter = HoodieTableMetadataWriter.create(hadoopConfiguration.get(), config, jsc);
}
return metadataWriter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -201,31 +200,16 @@ private Stream<HoodieInstant> getInstantsToArchive() {
.collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(),
HoodieInstant.getComparableAction(i.getAction()))));

// If metadata table is enabled, do not archive instants which are more recent that the latest compaction
// of the metadata table. This is required for metadata table sync.
// If metadata table is enabled, do not archive instants which are more recent that the latest synced
// instant on the metadata table. This is required for metadata table sync.
if (config.useFileListingMetadata()) {
Option<String> latestCompaction = table.metadata().getLatestCompactionTimestamp();
if (latestCompaction.isPresent()) {
LOG.info("Limiting archiving of instants to last compaction on metadata table at " + latestCompaction.get());
Option<String> lastSyncedInstantTime = table.metadata().getSyncedInstantTime();
if (lastSyncedInstantTime.isPresent()) {
LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
latestCompaction.get()));
lastSyncedInstantTime.get()));
} else {
LOG.info("Not archiving instants as there is no compaction yet of the metadata table");
instants = Stream.empty();
}
}

// For metadata tables, ensure commits >= latest compaction commit are retained. This is required for
// metadata table sync.
if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
Option<HoodieInstant> latestCompactionInstant =
table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
if (latestCompactionInstant.isPresent()) {
LOG.info("Limiting archiving of instants on metadata table to last compaction at " + latestCompactionInstant.get());
instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
latestCompactionInstant.get().getTimestamp()));
} else {
LOG.info("Not archiving instants on metdata table as there is no compaction yet");
LOG.info("Not archiving as there is no instants yet on the metadata table");
instants = Stream.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,6 @@ private HoodieCleanMetadata runClean(HoodieTable<?> table, HoodieInstant cleanIn
cleanStats
);

table.metadataWriter(jsc).update(cleanerPlan, cleanInstant.getTimestamp());

table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());

table.metadataWriter(jsc).update(metadata, instantTime);

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
LOG.info("Committed " + instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ private HoodieRestoreMetadata finishRestore(Map<String, List<HoodieRollbackMetad
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.convertRestoreMetadata(
instantTime, durationInMs, instantsRolledBack, instantToMetadata);

table.metadataWriter(jsc).update(restoreMetadata, instantTime);

table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
LOG.info("Commits " + instantsRolledBack + " rollback is complete. Restored table to " + restoreInstantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ public HoodieRollbackMetadata execute() {
Collections.singletonList(instantToRollback),
stats);
if (!skipTimelinePublish) {
table.metadataWriter(jsc).update(rollbackMetadata, instantTime);
finishRollback(rollbackMetadata);
}

Expand Down
Loading