Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@

package org.apache.hudi.client.transaction.lock;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;

Expand All @@ -45,6 +48,7 @@ public class LockManager implements Serializable, AutoCloseable {
private final SerializableConfiguration hadoopConf;
private final int maxRetries;
private final long maxWaitTimeInMs;
private transient HoodieLockMetrics metrics;
private volatile LockProvider lockProvider;

public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
Expand All @@ -55,6 +59,7 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
metrics = new HoodieLockMetrics(writeConfig);
}

public void lock() {
Expand All @@ -64,13 +69,18 @@ public void lock() {
boolean acquired = false;
while (retryCount <= maxRetries) {
try {
metrics.startLockApiTimerContext();
acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
if (acquired) {
metrics.updateLockAcquiredMetric();
metrics.startLockHeldTimerContext();
break;
}
metrics.updateLockNotAcquiredMetric();
LOG.info("Retrying to acquire lock...");
Thread.sleep(maxWaitTimeInMs);
} catch (HoodieLockException | InterruptedException e) {
metrics.updateLockNotAcquiredMetric();
if (retryCount >= maxRetries) {
throw new HoodieLockException("Unable to acquire lock, lock object ", e);
}
Expand All @@ -96,6 +106,7 @@ public void lock() {
public void unlock() {
if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
getLockProvider().unlock();
metrics.updateLockHeldTimerMetrics();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.metrics;

import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metrics.Metrics;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;

import java.util.concurrent.TimeUnit;

public class HoodieLockMetrics {

private final HoodieWriteConfig writeConfig;
private final boolean isMetricsEnabled;
private final int keepLastNtimes = 100;
private final transient HoodieTimer lockDurationTimer = HoodieTimer.create();
private final transient HoodieTimer lockApiRequestDurationTimer = HoodieTimer.create();
private transient Counter lockAttempts;
private transient Counter succesfulLockAttempts;
private transient Counter failedLockAttempts;
private transient Timer lockDuration;
private transient Timer lockApiRequestDuration;

public HoodieLockMetrics(HoodieWriteConfig writeConfig) {
this.isMetricsEnabled = writeConfig.isLockingMetricsEnabled();
this.writeConfig = writeConfig;

if (isMetricsEnabled) {
Metrics.init(writeConfig);
MetricRegistry registry = Metrics.getInstance().getRegistry();

lockAttempts = registry.counter(getMetricsName("acquire.attempts"));
succesfulLockAttempts = registry.counter(getMetricsName("acquire.success"));
failedLockAttempts = registry.counter(getMetricsName("acquire.failure"));

lockDuration = createTimerForMetrics(registry, "acquire.duration");
lockApiRequestDuration = createTimerForMetrics(registry, "request.latency");
}
}

private String getMetricsName(String metric) {
return writeConfig == null ? null : String.format("%s.%s.%s", writeConfig.getMetricReporterMetricsNamePrefix(), "lock", metric);
}

private Timer createTimerForMetrics(MetricRegistry registry, String metric) {
String metricName = getMetricsName(metric);
if (registry.getMetrics().get(metricName) == null) {
lockDuration = new Timer(new SlidingWindowReservoir(keepLastNtimes));
registry.register(metricName, lockDuration);
return lockDuration;
}
return (Timer) registry.getMetrics().get(metricName);
}

public void startLockApiTimerContext() {
if (isMetricsEnabled) {
lockApiRequestDurationTimer.startTimer();
}
}

public void updateLockAcquiredMetric() {
if (isMetricsEnabled) {
long durationMs = lockApiRequestDurationTimer.endTimer();
lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS);
lockAttempts.inc();
}
}

public void startLockHeldTimerContext() {
if (isMetricsEnabled) {
succesfulLockAttempts.inc();
lockDurationTimer.startTimer();
}
}

public void updateLockNotAcquiredMetric() {
if (isMetricsEnabled) {
long durationMs = lockApiRequestDurationTimer.endTimer();
lockApiRequestDuration.update(durationMs, TimeUnit.MILLISECONDS);
failedLockAttempts.inc();
}
}

public void updateLockHeldTimerMetrics() {
if (isMetricsEnabled && lockDurationTimer != null) {
long lockDurationInMs = lockDurationTimer.endTimer();
lockDuration.update(lockDurationInMs, TimeUnit.MILLISECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,10 @@ public boolean isExecutorMetricsEnabled() {
getStringOrDefault(HoodieMetricsConfig.EXECUTOR_METRICS_ENABLE, "false"));
}

public boolean isLockingMetricsEnabled() {
return getBoolean(HoodieMetricsConfig.LOCK_METRICS_ENABLE);
}

public MetricsReporterType getMetricsReporterType() {
return MetricsReporterType.valueOf(getString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ public class HoodieMetricsConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("");

public static final ConfigProperty<Boolean> LOCK_METRICS_ENABLE = ConfigProperty
.key(METRIC_PREFIX + ".lock.enable")
.defaultValue(false)
.withInferFunction(cfg -> {
if (cfg.contains(TURN_METRICS_ON)) {
return Option.of(cfg.getBoolean(TURN_METRICS_ON));
}
return Option.empty();
})
.withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode");

/**
* @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
*/
Expand Down Expand Up @@ -163,6 +174,11 @@ public Builder withExecutorMetrics(boolean enable) {
return this;
}

public Builder withLockingMetrics(boolean enable) {
hoodieMetricsConfig.setValue(LOCK_METRICS_ENABLE, String.valueOf(enable));
return this;
}

public HoodieMetricsConfig build() {

hoodieMetricsConfig.setDefaults(HoodieMetricsConfig.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -43,6 +44,9 @@ public class HoodieMetrics {
public String finalizeTimerName = null;
public String compactionTimerName = null;
public String indexTimerName = null;
private String conflictResolutionTimerName = null;
private String conflictResolutionSuccessCounterName = null;
private String conflictResolutionFailureCounterName = null;
private HoodieWriteConfig config;
private String tableName;
private Timer rollbackTimer = null;
Expand All @@ -53,6 +57,9 @@ public class HoodieMetrics {
private Timer compactionTimer = null;
private Timer clusteringTimer = null;
private Timer indexTimer = null;
private Timer conflictResolutionTimer = null;
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;

public HoodieMetrics(HoodieWriteConfig config) {
this.config = config;
Expand All @@ -67,6 +74,9 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
}
}

Expand Down Expand Up @@ -130,6 +140,13 @@ public Timer.Context getIndexCtx() {
return indexTimer == null ? null : indexTimer.time();
}

public Timer.Context getConflictResolutionCtx() {
if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) {
conflictResolutionTimer = createTimer(conflictResolutionTimerName);
}
return conflictResolutionTimer == null ? null : conflictResolutionTimer.time();
}

public void updateMetricsForEmptyData(String actionType) {
if (!config.isMetricsOn() || !config.getMetricsReporterType().equals(MetricsReporterType.PROMETHEUS_PUSHGATEWAY)) {
// No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
Expand Down Expand Up @@ -244,4 +261,27 @@ String getMetricsName(String action, String metric) {
public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}

public void emitConflictResolutionSuccessful() {
if (config.isLockingMetricsEnabled()) {
LOG.info("Sending conflict resolution success metric");
conflictResolutionSuccessCounter = getCounter(conflictResolutionSuccessCounter, conflictResolutionSuccessCounterName);
conflictResolutionSuccessCounter.inc();
}
}

public void emitConflictResolutionFailed() {
if (config.isLockingMetricsEnabled()) {
LOG.info("Sending conflict resolution failure metric");
conflictResolutionFailureCounter = getCounter(conflictResolutionFailureCounter, conflictResolutionFailureCounterName);
conflictResolutionFailureCounter.inc();
}
}

private Counter getCounter(Counter counter, String name) {
if (counter == null) {
return Metrics.getInstance().getRegistry().counter(name);
}
return counter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
Expand Down Expand Up @@ -460,8 +461,19 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
// Create a Hoodie table after startTxn which encapsulated the commits and files visible.
// Important to create this after the lock to ensure the latest commits show up in the timeline without need for reload
HoodieTable table = createTable(config, hadoopConf);
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
try {
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, this.pendingInflightAndRequestedInstants);
metrics.emitConflictResolutionSuccessful();
} catch (HoodieWriteConflictException e) {
metrics.emitConflictResolutionFailed();
throw e;
} finally {
if (conflictResolutionTimer != null) {
conflictResolutionTimer.stop();
}
}
}

@Override
Expand Down