Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieBuildPlan;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand All @@ -41,6 +42,7 @@
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBuildCommitMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -141,6 +143,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
protected transient Timer.Context clusteringTimer;
protected transient Timer.Context buildTimer;

protected transient AsyncCleanerService asyncCleanerService;
protected transient AsyncArchiveService asyncArchiveService;
Expand Down Expand Up @@ -1291,15 +1294,37 @@ protected boolean scheduleCleaningAtInstant(String instantTime, Option<Map<Strin

/**
* Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata.
*
* @param clusteringInstant Clustering Instant Time
* @return Collection of Write Status
*/
public abstract HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete);

/**
* Schedule a new build instant with passed-in instant time.
*
* @param instantTime Build instant time
* @param extraMetadata Extra metadata to be stored
* @return True if build instant scheduled successfully
* @throws HoodieIOException
*/
public boolean scheduleBuildAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
return scheduleTableService(instantTime, extraMetadata, TableServiceType.BUILD).isPresent();
}

/**
* Perform build for the plan stored in metadata
*
* @param instantTime Build instant time
* @param shouldComplete
* @return
*/
public abstract HoodieBuildCommitMetadata build(String instantTime, boolean shouldComplete);

/**
* Schedule table services such as clustering, compaction & cleaning.
*
* @param extraMetadata Metadata to pass onto the scheduled service instant
* @param extraMetadata Metadata to pass onto the scheduled service instant
* @param tableServiceType Type of table service to schedule
* @return
*/
Expand Down Expand Up @@ -1353,6 +1378,11 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
.scheduleCleaning(context, instantTime, extraMetadata);
return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case BUILD:
LOG.info("Scheduling build at instant time: " + instantTime);
Option<HoodieBuildPlan> buildPlan = createTable(config, hadoopConf)
.scheduleBuild(context, instantTime, extraMetadata);
return buildPlan.isPresent() ? Option.of(instantTime) : Option.empty();
default:
throw new IllegalArgumentException("Invalid TableService " + tableServiceType);
}
Expand Down Expand Up @@ -1383,9 +1413,9 @@ protected Option<String> inlineScheduleClustering(Option<Map<String, String>> ex
/**
* Finalize Write operation.
*
* @param table HoodieTable
* @param table HoodieTable
* @param instantTime Instant Time
* @param stats Hoodie Write Stat
* @param stats Hoodie Write Stat
*/
protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieBuildConfig;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
Expand Down Expand Up @@ -2142,6 +2143,10 @@ public boolean areReleaseResourceEnabled() {
return getBooleanOrDefault(RELEASE_RESOURCE_ENABLE);
}

public String getBuildPartitionSelected() {
return getString(HoodieBuildConfig.PARTITION_SELECTED);
}

/**
* Layout configs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class HoodieMetrics {
public String finalizeTimerName = null;
public String compactionTimerName = null;
public String indexTimerName = null;
public String buildTimerName = null;
private HoodieWriteConfig config;
private String tableName;
private Timer rollbackTimer = null;
Expand All @@ -53,6 +54,7 @@ public class HoodieMetrics {
private Timer compactionTimer = null;
private Timer clusteringTimer = null;
private Timer indexTimer = null;
private Timer buildTimer = null;

public HoodieMetrics(HoodieWriteConfig config) {
this.config = config;
Expand All @@ -67,6 +69,7 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
this.buildTimerName = getMetricsName("timer", HoodieTimeline.BUILD_ACTION);
}
}

Expand Down Expand Up @@ -130,6 +133,13 @@ public Timer.Context getIndexCtx() {
return indexTimer == null ? null : indexTimer.time();
}

public Timer.Context getBuildCtx() {
if (config.isMetricsOn() && buildTimer == null) {
buildTimer = createTimer(indexTimerName);
}
return buildTimer == null ? null : buildTimer.time();
}

public void updateMetricsForEmptyData(String actionType) {
if (!config.isMetricsOn() || !config.getMetricsReporterType().equals(MetricsReporterType.PROMETHEUS_PUSHGATEWAY)) {
// No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
Expand Down Expand Up @@ -234,6 +244,13 @@ public void updateIndexMetrics(final String action, final long durationInMs) {
}
}

public void updateBuildMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending build metrics (%s.duration, %d)", action, durationInMs));
Metrics.registerGauge(getMetricsName(HoodieTimeline.BUILD_ACTION, String.format("%s.duration", action)), durationInMs);
}
}

String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieBuildPlan;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand All @@ -41,6 +42,7 @@
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.model.HoodieBuildCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -533,8 +535,29 @@ public abstract HoodieRestoreMetadata restore(HoodieEngineContext context,
* Schedules Restore for the table to the given instant.
*/
public abstract Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context,
String restoreInstantTime,
String instantToRestore);
String restoreInstantTime,
String instantToRestore);

/**
* Schedule build at passed-in instant time
*
* @param context HoodieEngineContext
* @param instantTime Instant time for scheduling build
* @param extraMetadata Additional metadata to write into plan
* @return Build plan
*/
public abstract Option<HoodieBuildPlan> scheduleBuild(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata);

/**
* Execute build for the table
*
* @param context HoodieEngineContext
* @param instantTime Build instant time
* @return HoodieWriteMetadata
*/
public abstract HoodieBuildCommitMetadata build(HoodieEngineContext context, String instantTime);

public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
rollbackInflightCompaction(inflightInstant, s -> Option.empty());
Expand Down Expand Up @@ -564,6 +587,18 @@ public void rollbackInflightClustering(HoodieInstant inflightInstant,
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
}

/**
* Rollback inflight build instant to requested build instant
*
* @param inflightInstant Inflight build instant
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
public void rollbackInflightBuild(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.BUILD_ACTION));
rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
}

/**
* Rollback inflight instant to requested instant
*
Expand Down
Loading