Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,12 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
HoodieTable table = createTable(config, hadoopConf);
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
extraMetadata, operationType, config.getWriteSchema(), commitActionType);
HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime)),
this.txnManager.beginTransaction(Option.of(inflightInstant),
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(instantTime, metadata);
preCommit(inflightInstant, metadata);
commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
Expand Down Expand Up @@ -244,14 +245,15 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String

/**
* Any pre-commit actions like conflict resolution or updating metadata table goes here.
* @param instantTime commit instant time.
* @param inflightInstant instant of inflight operation.
* @param metadata commit metadata for which pre commit is being invoked.
*/
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
// Create a Hoodie table after starting the transaction which encapsulated the commits and files visible.
// Important to create this after the lock to ensure latest commits show up in the timeline without need for reload
HoodieTable table = createTable(config, hadoopConf);
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime));
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(),
table.isTableServiceAction(inflightInstant.getAction())));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
});

LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
update(commitMetadata, createInstantTime, false);
return true;
}

Expand Down Expand Up @@ -523,23 +523,24 @@ private interface ConvertMetadataFunction {
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered. false otherwise.
*/
private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) {
private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
if (enabled && metadata != null) {
List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
}
}

/**
* Update from {@code HoodieCommitMetadata}.
*
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
* @param isTableServiceAction {@code true} if commit metadata is pertaining to a table service. {@code false} otherwise.
*/
@Override
public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime));
public void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime), !isTableServiceAction);
}

/**
Expand All @@ -550,7 +551,8 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
*/
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime));
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime),
false);
}

/**
Expand All @@ -562,7 +564,7 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
@Override
public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(),
restoreMetadata, instantTime, metadata.getSyncedInstantTime()));
restoreMetadata, instantTime, metadata.getSyncedInstantTime()), false);
}

/**
Expand All @@ -588,7 +590,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime)

List<HoodieRecord> records = HoodieTableMetadataUtil.convertMetadataToRecords(metadataMetaClient.getActiveTimeline(), rollbackMetadata, instantTime,
metadata.getSyncedInstantTime(), wasSynced);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime);
commit(records, MetadataPartitionType.FILES.partitionPath(), instantTime, false);
}
}

Expand All @@ -601,12 +603,12 @@ public void close() throws Exception {

/**
* Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit.
*
* @param records The list of records to be written.
* @param records The list of records to be written.
* @param partitionName The partition to which the records are to be written.
* @param instantTime The timestamp to use for the deltacommit.
* @param canTriggerTableService true if table services can be scheduled and executed. false otherwise.
*/
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService);

/**
* Perform a compaction on the Metadata Table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
* Update the metadata table due to a COMMIT operation.
* @param commitMetadata commit metadata of the operation of interest.
* @param instantTime instant time of the commit.
* @param isTableServiceAction true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument
* will assist in this.
*/
void update(HoodieCommitMetadata commitMetadata, String instantTime);
void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction);

/**
* Update the metadata table due to a CLEAN operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,13 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter() {
return getMetadataWriter(Option.empty());
}

/**
* Check if action type is a table service.
* @param actionType action type of interest.
* @return true if action represents a table service. false otherwise.
*/
public abstract boolean isTableServiceAction(String actionType);

/**
* Get Table metadata writer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config,
* Writes commits metadata to table metadata.
* @param metadata commit metadata of interest.
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime));
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, table.isTableServiceAction(actionType)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy
HoodieCommitMetadata commitMetadata = super.doWriteOperation(commitTime, operationType, newPartitionsToAdd,
partitionToFilesNameLengthMap, bootstrap, createInflightCommit);
if (writer != null && !createInflightCommit) {
writer.update(commitMetadata, commitTime);
writer.update(commitMetadata, commitTime, false);
}
return commitMetadata;
}
Expand All @@ -86,15 +86,15 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy
public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
super.moveInflightCommitToComplete(instantTime, metadata);
if (writer != null) {
writer.update(metadata, instantTime);
writer.update(metadata, instantTime, false);
}
return this;
}

public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata, boolean ignoreWriter) throws IOException {
super.moveInflightCommitToComplete(instantTime, metadata);
if (!ignoreWriter && writer != null) {
writer.update(metadata, instantTime);
writer.update(metadata, instantTime, false);
}
return this;
}
Expand All @@ -103,7 +103,7 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo
public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
super.moveInflightCompactionToComplete(instantTime, metadata);
if (writer != null) {
writer.update(metadata, instantTime);
writer.update(metadata, instantTime, true);
}
return this;
}
Expand All @@ -120,7 +120,7 @@ public HoodieCleanMetadata doClean(String commitTime, Map<String, Integer> parti
public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception {
super.addCompaction(instantTime, commitMetadata);
if (writer != null) {
writer.update(commitMetadata, instantTime);
writer.update(commitMetadata, instantTime, true);
}
return this;
}
Expand Down Expand Up @@ -151,7 +151,7 @@ public HoodieTestTable addReplaceCommit(
HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception {
super.addReplaceCommit(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata);
if (writer != null) {
writer.update(completeReplaceMetadata, instantTime);
writer.update(completeReplaceMetadata, instantTime, true);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
}

@Override
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
this.metadataWriterOption.ifPresent(w -> {
w.initTableMetadata(); // refresh the timeline
w.update(metadata, instantTime);
w.update(metadata, inflightInstant.getTimestamp(), getHoodieTable().isTableServiceAction(inflightInstant.getAction()));
});
}

Expand Down Expand Up @@ -406,7 +406,7 @@ private void writeTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<Hoodi
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp()));
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction())));
} finally {
this.txnManager.endTransaction();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext eng
}

@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);

Expand Down Expand Up @@ -125,8 +125,10 @@ protected void commit(List<HoodieRecord> records, String partitionName, String i

// reload timeline
metadataMetaClient.reloadActiveTimeline();
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
}
}

// Update total size of the metadata and count of base/log files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
Expand Down Expand Up @@ -83,6 +84,11 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -54,6 +55,11 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieEngineContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());

writeTableMetadata(metadata);
writeTableMetadata(metadata, actionType);

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
Expand Down Expand Up @@ -65,6 +66,11 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand All @@ -37,6 +38,11 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());

writeTableMetadata(metadata);
writeTableMetadata(metadata, actionType);

activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,10 @@ private void writeTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD
HoodieInstant hoodieInstant) {
try {
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp()));
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
} finally {
this.txnManager.endTransaction();
}
Expand Down Expand Up @@ -474,13 +475,14 @@ private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<Wri
}

@Override
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
// Important to create this after the lock to ensure latest commits show up in the timeline without need for reload
HoodieTable table = createTable(config, hadoopConf);
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner());
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime));
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(),
table.isTableServiceAction(inflightInstant.getAction())));
}

@Override
Expand Down
Loading