diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java index 6ebae44fd467c..0e7e7fbf2e476 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java @@ -18,9 +18,7 @@ package org.apache.hudi.client.transaction.lock; -import java.io.Serializable; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.lock.LockProvider; @@ -28,9 +26,14 @@ import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieLockException; + +import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; @@ -45,6 +48,7 @@ public class LockManager implements Serializable, AutoCloseable { private final SerializableConfiguration hadoopConf; private final int maxRetries; private final long maxWaitTimeInMs; + private transient HoodieLockMetrics metrics; private volatile LockProvider lockProvider; public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { @@ -55,6 +59,7 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) { Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())); maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())); + metrics = new HoodieLockMetrics(writeConfig); } public void lock() { @@ -64,13 +69,17 @@ public void lock() { boolean acquired = false; while (retryCount <= maxRetries) { try { + metrics.startLockApiTimerContext(); acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS); if (acquired) { + metrics.updateLockAcquiredMetric(); break; } + metrics.updateLockNotAcquiredMetric(); LOG.info("Retrying to acquire lock..."); Thread.sleep(maxWaitTimeInMs); } catch (HoodieLockException | InterruptedException e) { + metrics.updateLockNotAcquiredMetric(); if (retryCount >= maxRetries) { throw new HoodieLockException("Unable to acquire lock, lock object ", e); } @@ -96,6 +105,7 @@ public void lock() { public void unlock() { if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { getLockProvider().unlock(); + metrics.updateLockHeldTimerMetrics(); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java new file mode 100644 index 0000000000000..6ea7a1ae141ca --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/metrics/HoodieLockMetrics.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.metrics; + +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.Metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Timer; + +import java.util.concurrent.TimeUnit; + +public class HoodieLockMetrics { + + public static final String LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME = "lock.acquire.attempts"; + public static final String LOCK_ACQUIRE_SUCCESS_COUNTER_NAME = "lock.acquire.success"; + public static final String LOCK_ACQUIRE_FAILURES_COUNTER_NAME = "lock.acquire.failure"; + public static final String LOCK_ACQUIRE_DURATION_TIMER_NAME = "lock.acquire.duration"; + public static final String LOCK_REQUEST_LATENCY_TIMER_NAME = "lock.request.latency"; + private final HoodieWriteConfig writeConfig; + private final boolean isMetricsEnabled; + private final int keepLastNtimes = 100; + private final transient HoodieTimer lockDurationTimer = HoodieTimer.create(); + private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.create(); + private transient Counter lockAttempts; + private transient Counter successfulLockAttempts; + private transient Counter failedLockAttempts; + private transient Timer lockDuration; + private transient Timer lockApiRequestDuration; + + public HoodieLockMetrics(HoodieWriteConfig writeConfig) { + this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled(); + this.writeConfig = writeConfig; + + if (isMetricsEnabled) { + MetricRegistry registry = Metrics.getInstance().getRegistry(); + + lockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_ATTEMPTS_COUNTER_NAME)); + successfulLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_SUCCESS_COUNTER_NAME)); + failedLockAttempts = registry.counter(getMetricsName(LOCK_ACQUIRE_FAILURES_COUNTER_NAME)); + + lockDuration = createTimerForMetrics(registry, LOCK_ACQUIRE_DURATION_TIMER_NAME); + lockApiRequestDuration = createTimerForMetrics(registry, LOCK_REQUEST_LATENCY_TIMER_NAME); + } + } + + private String getMetricsName(String metric) { + return writeConfig == null ? null : String.format("%s.%s", writeConfig.getMetricReporterMetricsNamePrefix(), metric); + } + + private Timer createTimerForMetrics(MetricRegistry registry, String metric) { + String metricName = getMetricsName(metric); + if (registry.getMetrics().get(metricName) == null) { + lockDuration = new Timer(new SlidingWindowReservoir(keepLastNtimes)); + registry.register(metricName, lockDuration); + return lockDuration; + } + return (Timer) registry.getMetrics().get(metricName); + } + + public void startLockApiTimerContext() { + if (isMetricsEnabled) { + lockApiRequestDurationTimer.startTimer(); + } + } + + public void updateLockAcquiredMetric() { + if (isMetricsEnabled) { + long durationMs = lockApiRequestDurationTimer.endTimer(); + lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS); + lockAttempts.inc(); + successfulLockAttempts.inc(); + lockDurationTimer.startTimer(); + } + } + + public void updateLockNotAcquiredMetric() { + if (isMetricsEnabled) { + long durationMs = lockApiRequestDurationTimer.endTimer(); + lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS); + failedLockAttempts.inc(); + } + } + + public void updateLockHeldTimerMetrics() { + if (isMetricsEnabled && lockDurationTimer != null) { + long lockDurationInMs = lockDurationTimer.endTimer(); + lockDuration.update(lockDurationInMs, TimeUnit.MILLISECONDS); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6178e63e3606c..fae20c4cbac2d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1745,6 +1745,10 @@ public boolean isExecutorMetricsEnabled() { getStringOrDefault(HoodieMetricsConfig.EXECUTOR_METRICS_ENABLE, "false")); } + public boolean isLockingMetricsEnabled() { + return getBoolean(HoodieMetricsConfig.LOCK_METRICS_ENABLE); + } + public MetricsReporterType getMetricsReporterType() { return MetricsReporterType.valueOf(getString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java index a515eb702b8cc..957b439051a81 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java @@ -83,6 +83,17 @@ public class HoodieMetricsConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation(""); + public static final ConfigProperty LOCK_METRICS_ENABLE = ConfigProperty + .key(METRIC_PREFIX + ".lock.enable") + .defaultValue(false) + .withInferFunction(cfg -> { + if (cfg.contains(TURN_METRICS_ON)) { + return Option.of(cfg.getBoolean(TURN_METRICS_ON)); + } + return Option.empty(); + }) + .withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode"); + /** * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead */ @@ -163,6 +174,11 @@ public Builder withExecutorMetrics(boolean enable) { return this; } + public Builder withLockingMetrics(boolean enable) { + hoodieMetricsConfig.setValue(LOCK_METRICS_ENABLE, String.valueOf(enable)); + return this; + } + public HoodieMetricsConfig build() { hoodieMetricsConfig.setDefaults(HoodieMetricsConfig.class.getName()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index d13110feef228..69ef7917b284f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import com.codahale.metrics.Counter; import com.codahale.metrics.Timer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -43,6 +44,9 @@ public class HoodieMetrics { public String finalizeTimerName = null; public String compactionTimerName = null; public String indexTimerName = null; + private String conflictResolutionTimerName = null; + private String conflictResolutionSuccessCounterName = null; + private String conflictResolutionFailureCounterName = null; private HoodieWriteConfig config; private String tableName; private Timer rollbackTimer = null; @@ -53,6 +57,9 @@ public class HoodieMetrics { private Timer compactionTimer = null; private Timer clusteringTimer = null; private Timer indexTimer = null; + private Timer conflictResolutionTimer = null; + private Counter conflictResolutionSuccessCounter = null; + private Counter conflictResolutionFailureCounter = null; public HoodieMetrics(HoodieWriteConfig config) { this.config = config; @@ -67,6 +74,9 @@ public HoodieMetrics(HoodieWriteConfig config) { this.finalizeTimerName = getMetricsName("timer", "finalize"); this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION); this.indexTimerName = getMetricsName("timer", "index"); + this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution"); + this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success"); + this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure"); } } @@ -130,6 +140,13 @@ public Timer.Context getIndexCtx() { return indexTimer == null ? null : indexTimer.time(); } + public Timer.Context getConflictResolutionCtx() { + if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) { + conflictResolutionTimer = createTimer(conflictResolutionTimerName); + } + return conflictResolutionTimer == null ? null : conflictResolutionTimer.time(); + } + public void updateMetricsForEmptyData(String actionType) { if (!config.isMetricsOn() || !config.getMetricsReporterType().equals(MetricsReporterType.PROMETHEUS_PUSHGATEWAY)) { // No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY. @@ -244,4 +261,27 @@ String getMetricsName(String action, String metric) { public long getDurationInMs(long ctxDuration) { return ctxDuration / 1000000; } + + public void emitConflictResolutionSuccessful() { + if (config.isLockingMetricsEnabled()) { + LOG.info("Sending conflict resolution success metric"); + conflictResolutionSuccessCounter = getCounter(conflictResolutionSuccessCounter, conflictResolutionSuccessCounterName); + conflictResolutionSuccessCounter.inc(); + } + } + + public void emitConflictResolutionFailed() { + if (config.isLockingMetricsEnabled()) { + LOG.info("Sending conflict resolution failure metric"); + conflictResolutionFailureCounter = getCounter(conflictResolutionFailureCounter, conflictResolutionFailureCounterName); + conflictResolutionFailureCounter.inc(); + } + } + + private Counter getCounter(Counter counter, String name) { + if (counter == null) { + return Metrics.getInstance().getRegistry().counter(name); + } + return counter; + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a142fd80d4bf8..0462267f180ea 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -41,6 +41,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.metadata.HoodieTableMetadataWriter; @@ -460,8 +461,19 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met // Create a Hoodie table after startTxn which encapsulated the commits and files visible. // Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload HoodieTable table = createTable(config, hadoopConf); - TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), - Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants); + Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx(); + try { + TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), + Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants); + metrics.emitConflictResolutionSuccessful(); + } catch (HoodieWriteConflictException e) { + metrics.emitConflictResolutionFailed(); + throw e; + } finally { + if (conflictResolutionTimer != null) { + conflictResolutionTimer.stop(); + } + } } @Override