Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class InstrumentedReadWriteLock implements ReadWriteLock {
private final Lock readLock;
private final Lock writeLock;

InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
public InstrumentedReadWriteLock(boolean fair, String name, Logger logger,
long minLoggingGapMs, long lockWarningThresholdMs) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
readLock = new InstrumentedReadLock(name, logger, readWriteLock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.lock.suppress.warning.interval";
public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
10000; //ms
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
"dfs.datanode.lock.fair";
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
"dfs.datanode.lock.read.write.enabled";
public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
true;
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.datanode.lock-reporting-threshold-ms";
public static final long
DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 300L;

public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3010,7 +3010,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final BlockConstructionStage stage;

//get replica information
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private Map<String, String> getStorageIDToVolumeBasePathMap()
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,9 +657,19 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
FsVolumeSpi destination) throws IOException;

/**
* Acquire the lock of the data set.
* Acquire the lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
*/
AutoCloseableLock acquireDatasetLock();

/***
* Acquire the read lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();

Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.slf4j.Logger;
Expand All @@ -66,7 +67,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.DiskChecker;
Expand Down Expand Up @@ -874,7 +874,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) {

private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(new AutoCloseableLock());
ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock());
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {
Expand Down
Loading