Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;

import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -129,50 +131,18 @@ public String create() throws IOException {
}

@CliCommand(value = "metadata delete", help = "Remove the Metadata Table")
public String delete() throws Exception {
HoodieCLI.getTableMetaClient();
Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
if (statuses.length > 0) {
HoodieCLI.fs.delete(metadataPath, true);
}
} catch (FileNotFoundException e) {
// Metadata directory does not exist
}

return String.format("Removed Metadata Table from %s", metadataPath);
}

@CliCommand(value = "metadata init", help = "Update the metadata table from commits since the creation")
public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue = "false",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is init being removed?

help = "Open in read-only mode") final boolean readOnly) throws Exception {
HoodieCLI.getTableMetaClient();
Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
try {
HoodieCLI.fs.listStatus(metadataPath);
} catch (FileNotFoundException e) {
// Metadata directory does not exist
throw new RuntimeException("Metadata directory (" + metadataPath.toString() + ") does not exist.");
}

HoodieTimer timer = new HoodieTimer().startTimer();
if (!readOnly) {
HoodieWriteConfig writeConfig = getWriteConfig();
initJavaSparkContext();
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));
}

String action = readOnly ? "Opened" : "Initialized";
return String.format(action + " Metadata Table in %s (duration=%.2fsec)", metadataPath, (timer.endTimer()) / 1000.0);
public String delete(@CliOption(key = {"backup"}, help = "Backup the metadata table before delete", mandatory = true) final boolean backup) throws Exception {
HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
HoodieTableMetadataUtil.deleteMetadataTable(dataMetaClient, new HoodieSparkEngineContext(jsc), backup);
return "Metadata Table has been deleted from " + getMetadataTableBasePath(HoodieCLI.basePath);
}

@CliCommand(value = "metadata stats", help = "Print stats about the metadata")
public String stats() throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf),
config, HoodieCLI.basePath, "/tmp");
config, HoodieCLI.basePath);
Map<String, String> stats = metadata.stats();

final List<Comparable[]> rows = new ArrayList<>();
Expand All @@ -196,7 +166,7 @@ public String listPartitions() throws IOException {
initJavaSparkContext();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), config,
HoodieCLI.basePath, "/tmp");
HoodieCLI.basePath);

if (!metadata.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
Expand Down Expand Up @@ -224,7 +194,7 @@ public String listFiles(
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath);

if (!metaReader.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
Expand Down Expand Up @@ -252,15 +222,15 @@ public String validateFiles(
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadataReader = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath);

if (!metadataReader.enabled()) {
return "[ERROR] Metadata Table not enabled/initialized\n\n";
}

HoodieMetadataConfig fsConfig = HoodieMetadataConfig.newBuilder().enable(false).build();
HoodieBackedTableMetadata fsMetaReader = new HoodieBackedTableMetadata(
new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath, "/tmp");
new HoodieLocalEngineContext(HoodieCLI.conf), fsConfig, HoodieCLI.basePath);

HoodieTimer timer = new HoodieTimer().startTimer();
List<String> metadataPartitions = metadataReader.getAllPartitionPaths();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -172,13 +173,14 @@ public boolean commit(String instantTime, O writeStatuses, Option<Map<String, St
public abstract boolean commit(String instantTime, O writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds);

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just clarifying - WriteStatus contains only deflated HoodieRecord - right? Making sure we dont hold on to the data in memory until we update the metadata table.

Option<Map<String, String>> extraMetadata, String commitActionType) {
return commitStats(instantTime, writeStatuses, stats, extraMetadata, commitActionType, Collections.emptyMap());
}

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata, String commitActionType,
Map<String, List<String>> partitionToReplaceFileIds) {
// Skip the empty commit if not allowed
if (!config.allowEmptyCommit() && stats.isEmpty()) {
return true;
Expand All @@ -194,7 +196,7 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(inflightInstant, metadata);
commit(table, commitActionType, instantTime, metadata, stats);
commit(table, commitActionType, instantTime, metadata, stats, writeStatuses);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
releaseResources();
Expand All @@ -217,22 +219,18 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
}

protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata,
List<HoodieWriteStat> stats) throws IOException {
List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException {
LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
// update Metadata table
writeTableMetadata(table, instantTime, commitActionType, metadata);
writeTableMetadata(table, instantTime, commitActionType, metadata, writeStatuses);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}

protected HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
Expand Down Expand Up @@ -265,9 +263,10 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
* @param actionType action type of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata,
HoodieData<WriteStatus> writeStatuses) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table");
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, writeStatuses, instantTime,
table.isTableServiceAction(actionType)));
}

Expand Down Expand Up @@ -297,7 +296,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
*/
public void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
Expand Down Expand Up @@ -465,7 +464,7 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me

protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
if (config.inlineTableServices()) {
if (config.isMetadataTableEnabled()) {
if (table.getMetaClient().getTableConfig().isMetadataTableEnabled()) {
table.getHoodieView().sync();
}
// Do an inline compaction if enabled
Expand Down Expand Up @@ -529,7 +528,7 @@ protected void autoCleanOnCommit() {
* Run any pending compactions.
*/
public void runAnyPendingCompactions() {
runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled()));
runAnyPendingCompactions(createTable(config, hadoopConf));
}

/**
Expand All @@ -539,7 +538,7 @@ public void runAnyPendingCompactions() {
* @param comment - Comment for the savepoint
*/
public void savepoint(String user, String comment) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
Expand All @@ -563,7 +562,7 @@ public void savepoint(String user, String comment) {
* @param comment - Comment for the savepoint
*/
public void savepoint(String instantTime, String user, String comment) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
table.savepoint(context, instantTime, user, comment);
}

Expand All @@ -575,7 +574,7 @@ public void savepoint(String instantTime, String user, String comment) {
* @return true if the savepoint was deleted successfully
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
SavepointHelpers.deleteSavepoint(table, savepointTime);
}

Expand All @@ -590,7 +589,7 @@ public void deleteSavepoint(String savepointTime) {
* @return true if the savepoint was restored to successfully
*/
public void restoreToSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
Expand Down Expand Up @@ -673,7 +672,7 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws H
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
if (timerContext != null) {
final long durationInMs = metrics.getDurationInMs(timerContext.stop());
Expand Down Expand Up @@ -1091,17 +1090,17 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
switch (tableServiceType) {
case CLUSTER:
LOG.info("Scheduling clustering at instant time :" + instantTime);
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
.scheduleClustering(context, instantTime, extraMetadata);
return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case COMPACT:
LOG.info("Scheduling compaction at instant time :" + instantTime);
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
.scheduleCompaction(context, instantTime, extraMetadata);
return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case CLEAN:
LOG.info("Scheduling cleaning at instant time :" + instantTime);
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
.scheduleCleaning(context, instantTime, extraMetadata);
return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public long getTotalErrorRecords() {
return totalErrorRecords;
}

public boolean isTrackingSuccessRecords() {
return trackSuccessRecords;
}

public void setTotalErrorRecords(long totalErrorRecords) {
this.totalErrorRecords = totalErrorRecords;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,15 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`");

/**
* If a valid location is specified, a copy of the write config is saved before each operation.
*/
public static final ConfigProperty<String> CONFIG_EXPORT_DIR = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear where this is used? Write configs are persisted after each operation?

.key("hoodie.write.config.save.dir")
.defaultValue("/user/hudi/runtime_configs/0.10")
.sinceVersion("0.10.0")
.withDocumentation("The directory where write configs are saved before each operation.");

private ConsistencyGuardConfig consistencyGuardConfig;

// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
Expand Down Expand Up @@ -1634,9 +1643,13 @@ public boolean getPushGatewayRandomJobNameSuffix() {
}

public String getMetricReporterMetricsNamePrefix() {
return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
String prefix = getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
if (prefix.isEmpty()) {
prefix = getTableName();
}
return prefix;
}

/**
* memory configs.
*/
Expand Down Expand Up @@ -1839,6 +1852,36 @@ public String getFileIdPrefixProviderClassName() {
return getString(FILEID_PREFIX_PROVIDER_CLASS);
}

/**
* Record index configs.
*/
public boolean createRecordIndex() {
return metadataConfig.createRecordIndex();
}

public int getRecordIndexMinFileGroupCount() {
return metadataConfig.getRecordIndexMinFileGroupCount();
}

public int getRecordIndexMaxFileGroupCount() {
return metadataConfig.getRecordIndexMaxFileGroupCount();
}

public float getRecordIndexGrowthFactor() {
return metadataConfig.getRecordIndexGrowthFactor();
}

public long getMaxMetadataFileGroupSizeBytes() {
return metadataConfig.getMaxFileGroupSizeBytes();
}

/**
* Directory where write config should be exported before each operation.
*/
public String getConfigExportDir() {
return getString(CONFIG_EXPORT_DIR);
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down Expand Up @@ -2195,6 +2238,11 @@ public Builder withProperties(Properties properties) {
return this;
}

public Builder withConfigExportDir(String dir) {
writeConfig.setValue(CONFIG_EXPORT_DIR, dir);
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down
Loading