Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -783,9 +783,14 @@ protected void archive(HoodieTable table) {
return;
}
try {
final Timer.Context timerContext = metrics.getArchiveCtx();
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiver.archiveIfRequired(context, true);
int instantsToArchive = archiver.archiveIfRequired(context, true);
if (timerContext != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
this.metrics.updateArchiveMetrics(durationMs, instantsToArchive);
}
} catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkers;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
private final TransactionManager txnManager;

private final LSMTimelineWriter timelineWriter;
private final HoodieMetrics metrics;

public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
this.config = config;
Expand All @@ -84,24 +86,24 @@ public HoodieTimelineArchiver(HoodieWriteConfig config, HoodieTable<T, I, K, O>
Pair<Integer, Integer> minAndMaxInstants = getMinAndMaxInstantsToKeep(table, metaClient);
this.minInstantsToKeep = minAndMaxInstants.getLeft();
this.maxInstantsToKeep = minAndMaxInstants.getRight();
this.metrics = new HoodieMetrics(config);
}

public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
public int archiveIfRequired(HoodieEngineContext context) throws IOException {
return archiveIfRequired(context, false);
}

/**
* Check if commits need to be archived. If yes, archive commits.
*/
public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
public int archiveIfRequired(HoodieEngineContext context, boolean acquireLock) throws IOException {
try {
if (acquireLock) {
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
}
// Sort again because the cleaning and rollback instants could break the sequence.
List<ActiveAction> instantsToArchive = getInstantsToArchive().sorted().collect(Collectors.toList());
boolean success = true;
if (!instantsToArchive.isEmpty()) {
LOG.info("Archiving instants " + instantsToArchive);
Consumer<Exception> exceptionHandler = e -> {
Expand All @@ -111,13 +113,13 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc
};
this.timelineWriter.write(instantsToArchive, Option.of(action -> deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive, context);
deleteArchivedInstants(instantsToArchive, context);
// triggers compaction and cleaning only after archiving action
this.timelineWriter.compactAndClean(context);
} else {
LOG.info("No Instants to archive");
}
return success;
return instantsToArchive.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also include the success flag also as a metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current archive code, it seems that success is always set to true. The success variable is initialized as true, and deleteArchivedInstants always returns true unless it fails. However, if there is a failure, the current implementation should throw an exception and terminate without returning any value. Therefore, I believe the success variable is meaningless in the current context. Alternatively, should we catch these exceptions and return false? I'm not sure if this would be reasonable.

Copy link
Contributor

@danny0405 danny0405 Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can address it in another PR, we kind of do not have atomicity for current 2 steps:

  • flush of archived timeline
  • deletion of active metadata files

My roughly thought is we can fix the left over active metadata files (should be deleted from active timeline) in the next round of archiving, imagine the latest instant time in archived timeline is t10 and the oldest instant in active timeline is t7, we should retry the deletion of instant metadat files from t7 ~ 10 at the very beginning.

} finally {
if (acquireLock) {
txnManager.endTransaction(Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,28 @@ public class HoodieMetrics {
public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted";
public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = "totalCorruptedLogBlocks";
public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = "totalRollbackLogBlocks";

public static final String DURATION_STR = "duration";
public static final String DELETE_FILES_NUM_STR = "numFilesDeleted";
public static final String DELETE_INSTANTS_NUM_STR = "numInstantsArchived";
public static final String FINALIZED_FILES_NUM_STR = "numFilesFinalized";
public static final String CONFLICT_RESOLUTION_STR = "conflict_resolution";
public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs";
public static final String COMMIT_FRESHNESS_IN_MS_STR = "commitFreshnessInMs";
public static final String COMMIT_TIME_STR = "commitTime";
public static final String SUCCESS_EXTENSION = ".success";
public static final String FAILURE_EXTENSION = ".failure";

public static final String TIMER_ACTION = "timer";
public static final String COUNTER_ACTION = "counter";
public static final String ARCHIVE_ACTION = "archive";
public static final String FINALIZE_ACTION = "finalize";
public static final String INDEX_ACTION = "index";

private Metrics metrics;
// Some timers
public String rollbackTimerName = null;
public String cleanTimerName = null;
public String archiveTimerName = null;
public String commitTimerName = null;
public String logCompactionTimerName = null;
public String deltaCommitTimerName = null;
Expand All @@ -74,6 +91,7 @@ public class HoodieMetrics {
private String tableName;
private Timer rollbackTimer = null;
private Timer cleanTimer = null;
private Timer archiveTimer = null;
private Timer commitTimer = null;
private Timer deltaCommitTimer = null;
private Timer finalizeTimer = null;
Expand All @@ -92,20 +110,21 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.tableName = config.getTableName();
if (config.isMetricsOn()) {
metrics = Metrics.getInstance(config);
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION);
this.replaceCommitTimerName = getMetricsName("timer", HoodieTimeline.REPLACE_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName("timer", HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
this.compactionRequestedCounterName = getMetricsName("counter", "compaction.requested");
this.compactionCompletedCounterName = getMetricsName("counter", "compaction.completed");
this.rollbackTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.CLEAN_ACTION);
this.archiveTimerName = getMetricsName(TIMER_ACTION, ARCHIVE_ACTION);
this.commitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMMIT_ACTION);
this.deltaCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION);
this.replaceCommitTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName(TIMER_ACTION, FINALIZE_ACTION);
this.compactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName(TIMER_ACTION, HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName(TIMER_ACTION, INDEX_ACTION);
this.conflictResolutionTimerName = getMetricsName(TIMER_ACTION, CONFLICT_RESOLUTION_STR);
this.conflictResolutionSuccessCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + SUCCESS_EXTENSION);
this.conflictResolutionFailureCounterName = getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + FAILURE_EXTENSION);
this.compactionRequestedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.REQUESTED_EXTENSION);
this.compactionCompletedCounterName = getMetricsName(COUNTER_ACTION, HoodieTimeline.COMPACTION_ACTION + HoodieTimeline.COMPLETED_EXTENSION);
}
}

Expand Down Expand Up @@ -152,6 +171,13 @@ public Timer.Context getCleanCtx() {
return cleanTimer == null ? null : cleanTimer.time();
}

public Timer.Context getArchiveCtx() {
if (config.isMetricsOn() && archiveTimer == null) {
archiveTimer = createTimer(archiveTimerName);
}
return archiveTimer == null ? null : archiveTimer.time();
}

public Timer.Context getCommitCtx() {
if (config.isMetricsOn() && commitTimer == null) {
commitTimer = createTimer(commitTimerName);
Expand Down Expand Up @@ -255,48 +281,60 @@ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationIn
Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
if (eventTimePairMinMax.getLeft().isPresent()) {
long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_LATENCY_IN_MS_STR), commitLatencyInMs);
}
if (eventTimePairMinMax.getRight().isPresent()) {
long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_FRESHNESS_IN_MS_STR), commitFreshnessInMs);
}
metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
metrics.registerGauge(getMetricsName(actionType, COMMIT_TIME_STR), commitEpochTimeInMs);
metrics.registerGauge(getMetricsName(actionType, DURATION_STR), durationInMs);
}
}

public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
String.format("Sending rollback metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_FILES_NUM_STR, numFilesDeleted));
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.ROLLBACK_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
String.format("Sending clean metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_FILES_NUM_STR, numFilesDeleted));
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(HoodieTimeline.CLEAN_ACTION, DELETE_FILES_NUM_STR), numFilesDeleted);
}
}

public void updateArchiveMetrics(long durationInMs, int numInstantsArchived) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending archive metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
DELETE_INSTANTS_NUM_STR, numInstantsArchived));
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(ARCHIVE_ACTION, DELETE_INSTANTS_NUM_STR), numInstantsArchived);
}
}

public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs,
numFilesFinalized));
metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
LOG.info(String.format("Sending finalize write metrics (%s=%d, %s=%d)", DURATION_STR, durationInMs,
FINALIZED_FILES_NUM_STR, numFilesFinalized));
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, DURATION_STR), durationInMs);
metrics.registerGauge(getMetricsName(FINALIZE_ACTION, FINALIZED_FILES_NUM_STR), numFilesFinalized);
}
}

public void updateIndexMetrics(final String action, final long durationInMs) {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
LOG.info(String.format("Sending index metrics (%s.%s, %d)", action, DURATION_STR, durationInMs));
metrics.registerGauge(getMetricsName(INDEX_ACTION, String.format("%s.%s", action, DURATION_STR)), durationInMs);
}
}

Expand All @@ -306,7 +344,7 @@ public String getMetricsName(String action, String metric) {
}

public void updateClusteringFileCreationMetrics(long durationInMs) {
reportMetrics("replacecommit", "fileCreationTime", durationInMs);
reportMetrics(HoodieTimeline.REPLACE_COMMIT_ACTION, "fileCreationTime", durationInMs);
}

/**
Expand Down
Loading