Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
* @param metadata instance of {@link HoodieCommitMetadata}.
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime,
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;

/**
Expand Down Expand Up @@ -712,7 +713,8 @@ protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime
*
*/
protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String createInstantTime) {
List<String> partitions = partitionInfoList.stream().map(p -> p.getRelativePath()).collect(Collectors.toList());
List<String> partitions = partitionInfoList.stream().map(p ->
p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath()).collect(Collectors.toList());
final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum();

// Record which saves the list of all partitions
Expand All @@ -727,7 +729,7 @@ protected void bootstrapCommit(List<DirectoryInfo> partitionInfoList, String cre
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
// Record which saves files within a partition
return HoodieMetadataPayload.createPartitionFilesRecord(
partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty());
});
partitionRecords = partitionRecords.union(fileListRecords);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,10 +737,11 @@ public HoodieEngineContext getContext() {
/**
* Get Table metadata writer.
*
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter
*/
public final Option<HoodieTableMetadataWriter> getMetadataWriter() {
return getMetadataWriter(Option.empty());
public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) {
return getMetadataWriter(triggeringInstantTimestamp, Option.empty());
}

/**
Expand All @@ -752,10 +753,19 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter() {

/**
* Get Table metadata writer.
* <p>
* Note:
* Get the metadata writer for the conf. If the metadata table doesn't exist,
* this wil trigger the creation of the table and the initial bootstrapping.
* Since this call is under the transaction lock, other concurrent writers
* are blocked from doing the similar initial metadata table creation and
* the bootstrapping.
*
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter}
*/
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) {
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
// Each engine is expected to override this and
// provide the actual metadata writer, if enabled.
return Option.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,31 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config,
* @param metadata commit metadata of interest.
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, table.isTableServiceAction(actionType)));
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(
metadata, instantTime, table.isTableServiceAction(actionType)));
}

/**
* Writes clean metadata to table metadata.
* @param metadata clean metadata of interest.
*/
protected final void writeTableMetadata(HoodieCleanMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime));
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, instantTime));
}

/**
* Writes rollback metadata to table metadata.
* @param metadata rollback metadata of interest.
*/
protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
table.getMetadataWriter(instantTime, Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
}

/**
* Writes restore metadata to table metadata.
* @param metadata restore metadata of interest.
*/
protected final void writeTableMetadata(HoodieRestoreMetadata metadata) {
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
table.getMetadataWriter(instantTime, Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ public void completeCompaction(
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have filed a ticket to fix Flink around metadata table instantiation
https://issues.apache.org/jira/browse/HUDI-2866
I don't see concurrency support in flink and so may not be very much needed as we need in spark, but anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @danny0405 FYI

w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,23 @@ public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Co
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
Option<T> actionMetadata) {
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata);
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, Option.empty());
}

public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
Option<T> actionMetadata,
Option<String> inFlightInstantTimestamp) {
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, inFlightInstantTimestamp);
}

<T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
super(hadoopConf, writeConfig, engineContext, actionMetadata, Option.empty());
Option<T> actionMetadata,
Option<String> inFlightInstantTimestamp) {
super(hadoopConf, writeConfig, engineContext, actionMetadata, inFlightInstantTimestamp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,12 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.List;

import static org.apache.hudi.common.data.HoodieList.getList;
Expand All @@ -50,9 +45,6 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> {

private boolean isMetadataAvailabilityUpdated = false;
private boolean isMetadataTableAvailable;

protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
Expand Down Expand Up @@ -108,22 +100,11 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
* @return instance of {@link HoodieTableMetadataWriter}
*/
@Override
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) {
synchronized (this) {
if (!isMetadataAvailabilityUpdated) {
// This code assumes that if metadata availability is updated once it will not change.
// Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists().
try {
isMetadataTableAvailable = config.isMetadataTableEnabled()
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
} catch (IOException e) {
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
}
isMetadataAvailabilityUpdated = true;
}
}
if (isMetadataTableAvailable) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context));
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
if (config.isMetadataTableEnabled()) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,6 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
initializeMetadataTable(Option.empty());
}

private void initializeMetadataTable(Option<String> inflightInstantTimestamp) {
if (config.isMetadataTableEnabled()) {
// Defer bootstrap if upgrade / downgrade is pending
HoodieTableMetaClient metaClient = createMetaClient(true);
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// TODO: Check if we can remove this requirement - auto bootstrap on commit
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(),
inflightInstantTimestamp);
}
}
}

/**
Expand Down Expand Up @@ -431,45 +416,54 @@ private void writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRec
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent(
w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
}

@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance());
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
this.txnManager.beginTransaction();
try {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime));
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
this.rollbackFailedWrites(pendingRollbacks, true);
new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
} finally {
this.txnManager.endTransaction();
}
} else {
upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
try {
this.txnManager.beginTransaction();
if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = getInstantsToRollback(
metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime));
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
this.rollbackFailedWrites(pendingRollbacks, true);
new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
metaClient.reloadActiveTimeline();
initializeMetadataTable(Option.of(instantTime));
}
metaClient.reloadActiveTimeline();

// re-bootstrap metadata table if required
initializeMetadataTable(Option.of(instantTime));
} finally {
this.txnManager.endTransaction();
}
metaClient.validateTableProperties(config.getProps(), operationType);
return getTableAndInitCtx(metaClient, operationType, instantTime);
}

/**
* Initialize the metadata table if needed. Creating the metadata table writer
* will trigger the initial bootstrapping from the data table.
*
* @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization
*/
private void initializeMetadataTable(Option<String> inFlightInstantTimestamp) {
if (config.isMetadataTableEnabled()) {
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, Option.empty(), inFlightInstantTimestamp);
}
}

// TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String commitInstant) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String commitInstant) {

switch (tableServiceType) {
case CLUSTER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext eng
}

protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) {
ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet.");
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) hoodieDataRecords.get();
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
Expand Down
Loading