Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.hudi.metadata;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
Expand Down Expand Up @@ -67,6 +69,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -98,8 +101,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;

protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext) {
/**
* Hudi backed table metadata writer.
*
* @param hadoopConf - Hadoop configuration to use for the metadata writer
* @param writeConfig - Writer config
* @param engineContext - Engine context
* @param actionMetadata - Optional action metadata to help decide bootstrap operations
* @param <T> - Action metadata types extending Avro generated SpecificRecordBase
*/
protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
this.dataWriteConfig = writeConfig;
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
Expand All @@ -110,15 +124,20 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteC
enabled = true;

// Inline compaction and auto clean is required as we dont expose this table outside
ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(),
"Cleaning is controlled internally for Metadata table.");
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
"Compaction is controlled internally for metadata table.");
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(), "File listing cannot be used for Metadata Table");
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
"Auto commit is required for Metadata Table");
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
"File listing cannot be used for Metadata Table");

initRegistry();
this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
initialize(engineContext);
this.dataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
initialize(engineContext, actionMetadata);
initTableMetadata();
} else {
enabled = false;
Expand Down Expand Up @@ -215,10 +234,11 @@ public HoodieBackedTableMetadata metadata() {

/**
* Initialize the metadata table if it does not exist.
*
* If the metadata table did not exist, then file and partition listing is used to bootstrap the table.
* <p>
* If the metadata table does not exist, then file and partition listing is used to bootstrap the table.
*/
protected abstract void initialize(HoodieEngineContext engineContext);
protected abstract <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
Option<T> actionMetadata);

public void initTableMetadata() {
try {
Expand All @@ -233,26 +253,33 @@ public void initTableMetadata() {
}
}

protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient) throws IOException {
/**
* Bootstrap the metadata table if needed.
*
* @param engineContext - Engine context
* @param dataMetaClient - Meta client for the data table
* @param actionMetadata - Optional action metadata
* @param <T> - Action metadata types extending Avro generated SpecificRecordBase
* @throws IOException
*/
protected <T extends SpecificRecordBase> void bootstrapIfNeeded(HoodieEngineContext engineContext,
HoodieTableMetaClient dataMetaClient,
Option<T> actionMetadata) throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));

boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(),
HoodieTableMetaClient.METAFOLDER_NAME));
boolean rebootstrap = false;

// If the un-synced instants have been archived, then
// the metadata table will need to be bootstrapped again.
if (exists) {
// If the un-synched instants have been archived then the metadata table will need to be bootstrapped again
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get())
final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get())
.setBasePath(metadataWriteConfig.getBasePath()).build();
Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
if (!latestMetadataInstant.isPresent()) {
LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found");
rebootstrap = true;
} else if (!latestMetadataInstant.get().getTimestamp().equals(SOLO_COMMIT_TIMESTAMP)
&& dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) {
// TODO: Revisit this logic and validate that filtering for all commits timeline is the right thing to do
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
+ ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
rebootstrap = true;
}
final Option<HoodieInstant> latestMetadataInstant =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();

rebootstrap = isBootstrapNeeded(latestMetadataInstant, actionMetadata);
}

if (rebootstrap) {
Expand All @@ -270,6 +297,52 @@ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableM
}
}

/**
* Whether bootstrap operation needed for this metadata table.
* <p>
* Rollback of the first commit would look like un-synced instants in the metadata table.
* Action metadata is needed to verify the instant time and avoid erroneous bootstrapping.
* <p>
* TODO: Revisit this logic and validate that filtering for all
* commits timeline is the right thing to do
*
* @return True if the bootstrap is not needed, False otherwise
*/
private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant,
Option<T> actionMetadata) {
if (!latestMetadataInstant.isPresent()) {
LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found");
return true;
}

final String latestMetadataInstantTimestamp = latestMetadataInstant.get().getTimestamp();
if (latestMetadataInstantTimestamp.equals(SOLO_COMMIT_TIMESTAMP)) {
return false;
}

boolean isRollbackAction = false;
List<String> rollbackedTimestamps = Collections.emptyList();
if (actionMetadata.isPresent() && actionMetadata.get() instanceof HoodieRollbackMetadata) {
isRollbackAction = true;
List<HoodieInstantInfo> rollbackedInstants =
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
rollbackedTimestamps = rollbackedInstants.stream().map(instant -> {
return instant.getCommitTime().toString();
}).collect(Collectors.toList());
}

if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
latestMetadataInstant.get().getTimestamp())
&& (!isRollbackAction || !rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) {
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
+ ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
return true;
}

return false;
}

/**
* Initialize the Metadata Table by listing files and partitions from the file system.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -706,11 +707,23 @@ public HoodieEngineContext getContext() {
}

/**
* Fetch instance of {@link HoodieTableMetadataWriter}.
* Get Table metadata writer.
*
* @return instance of {@link HoodieTableMetadataWriter
*/
public final Option<HoodieTableMetadataWriter> getMetadataWriter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor question. should we pass actionMetadata only for rollback ? If we are going to pass for every operation, may I know why do we need this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for CommitMetadata all other actions are extending SpecificRecordBase and are good to be moved. So, getMetadataWriter() version has to stay around till the CommitMetadata is solved. I can file a new ticket to take this on if you are ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return getMetadataWriter(Option.empty());
}

/**
* Get Table metadata writer.
*
* @return instance of {@link HoodieTableMetadataWriter}
*/
public Option<HoodieTableMetadataWriter> getMetadataWriter() {
// Each engine is expected to override this and provide the actual metadata writer if enabled.
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) {
// Each engine is expected to override this and
// provide the actual metadata writer, if enabled.
return Option.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -72,7 +73,7 @@ protected final void writeTableMetadata(HoodieCleanMetadata metadata) {
* @param metadata rollback metadata of interest.
*/
protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime));
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.metadata;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -45,12 +46,23 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad

private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class);

public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
return create(conf, writeConfig, context, Option.empty());
}

FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
super(hadoopConf, writeConfig, engineContext);
public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
Option<T> actionMetadata) {
return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata);
}

<T extends SpecificRecordBase> FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
super(hadoopConf, writeConfig, engineContext, actionMetadata);
}

@Override
Expand All @@ -65,10 +77,11 @@ protected void initRegistry() {
}

@Override
protected void initialize(HoodieEngineContext engineContext) {
protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
Option<T> actionMetadata) {
try {
if (enabled) {
bootstrapIfNeeded(engineContext, dataMetaClient);
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -98,11 +99,11 @@ protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatu
* @return instance of {@link HoodieTableMetadataWriter}
*/
@Override
public Option<HoodieTableMetadataWriter> getMetadataWriter() {
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(Option<T> actionMetadata) {
synchronized (this) {
if (!isMetadataAvailabilityUpdated) {
// this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case.
// this is done to avoid repeated calls to fs.exists().
// This code assumes that if metadata availability is updated once it will not change.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the passed in actionMetadata used ?

// Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the passed in actionMetadata used ?

try {
isMetadataTableAvailable = config.isMetadataTableEnabled()
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.metadata;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
Expand Down Expand Up @@ -47,12 +48,23 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad

private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);

public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) {
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context);
public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
HoodieEngineContext context) {
return create(conf, writeConfig, context, Option.empty());
}

SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
super(hadoopConf, writeConfig, engineContext);
public static <T extends SpecificRecordBase> HoodieTableMetadataWriter create(Configuration conf,
HoodieWriteConfig writeConfig,
HoodieEngineContext context,
Option<T> actionMetadata) {
return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata);
}

<T extends SpecificRecordBase> SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
HoodieWriteConfig writeConfig,
HoodieEngineContext engineContext,
Option<T> actionMetadata) {
super(hadoopConf, writeConfig, engineContext, actionMetadata);
}

@Override
Expand All @@ -71,7 +83,8 @@ protected void initRegistry() {
}

@Override
protected void initialize(HoodieEngineContext engineContext) {
protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext,
Option<T> actionMetadata) {
try {
metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
if (registry instanceof DistributedRegistry) {
Expand All @@ -81,7 +94,7 @@ protected void initialize(HoodieEngineContext engineContext) {
});

if (enabled) {
bootstrapIfNeeded(engineContext, dataMetaClient);
bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
}
} catch (IOException e) {
LOG.error("Failed to initialize metadata table. Disabling the writer.", e);
Expand Down
Loading