diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java index 0abcf989d1500..c87d7c84be974 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java @@ -20,6 +20,9 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; +import java.util.Iterator; +import java.util.function.Consumer; + /** * A low memory footprint {@link GSet} implementation, * which uses an array for storing the elements @@ -87,16 +90,45 @@ public LightWeightResizableGSet(int initCapacity) { @Override public E put(final E element) { - E existing = super.put(element); - expandIfNecessary(); - return existing; + synchronized (this) { + E existing = super.put(element); + expandIfNecessary(); + return existing; + } + } + + @Override + public E get(K key) { + synchronized (this) { + return super.get(key); + } + } + + @Override + public E remove(K key) { + synchronized (this) { + return super.remove(key); + } + } + + @Override + public int size() { + synchronized (this) { + return super.size(); + } + } + + public void getIterator(Consumer> consumer) { + synchronized (this) { + consumer.accept(super.values().iterator()); + } } /** * Resize the internal table to given capacity. */ @SuppressWarnings("unchecked") - protected void resize(int cap) { + protected synchronized void resize(int cap) { int newCapacity = actualArrayLength(cap); if (newCapacity == this.capacity) { return; @@ -121,7 +153,7 @@ protected void resize(int cap) { /** * Checks if we need to expand, and expands if necessary. */ - protected void expandIfNecessary() { + protected synchronized void expandIfNecessary() { if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) { resize(capacity * 2); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 14f9cd7730e2f..4132379f0869b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1658,6 +1658,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys { DFS_NAMESERVICES_RESOLVER_IMPL = "dfs.datanode.nameservices.resolver.impl"; + public static final String + DFS_DATANODE_BLOCKPOOL_LOCK_FAIR = + "dfs.blockpool.lock.fair"; + + public static final boolean + DFS_DATANODE_BLOCKPOOL_LOCK_FAIR_DEFAULT = false; + + public static final String + DFS_DATANODE_BLOCKPOOL_LOCK_TRACE = + "dfs.blockpool.lock.trace"; + + public static final boolean + DFS_DATANODE_BLOCKPOOL_LOCK_TRACE_DEFAULT = false; + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/AutoCloseLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/AutoCloseLock.java new file mode 100644 index 0000000000000..bcb8fbbaf527e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/AutoCloseLock.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.util.StringUtils; + +import java.util.concurrent.locks.Lock; + +import static org.apache.hadoop.hdfs.server.datanode.DataSetLockManager.LOG; + +/** + * Auto release lock when exit try scoop. + */ +public class AutoCloseLock extends AutoCloseableLock { + private Lock lock; + private AutoCloseLock parentLock; + private LockManager lockManager; + + public AutoCloseLock(Lock lock) { + this.lock = lock; + } + + @Override + public void close() { + if (lock != null) { + lock.unlock(); + if (lockManager != null) { + lockManager.hook(); + } + } else { + LOG.error("Try to unlock null lock" + + StringUtils.getStackTrace(Thread.currentThread())); + } + if (parentLock != null) { + parentLock.close(); + } + } + + public void lock() { + if (lock != null) { + lock.lock(); + return; + } + LOG.error("Try to lock null lock" + + StringUtils.getStackTrace(Thread.currentThread())); + } + + public void setParentLock(AutoCloseLock parent) { + if (parentLock == null) { + this.parentLock = parent; + } + } + + public void setLockManager(LockManager lockManager) { + this.lockManager = lockManager; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/LockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/LockManager.java new file mode 100644 index 0000000000000..ed2fd89b66800 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/LockManager.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +/** + * Use for manage a set of lock for datanode. + */ +public interface LockManager { + + enum LockLevel { + BLOCK_POOl, + VOLUME + } + + /** + * Acquire readLock and then lock. + */ + T readLock(LockLevel level, String... resources); + + /** + * Acquire writeLock and then lock. + */ + T writeLock(LockLevel level, String... resources); + + /** + * Add a lock to LockManager. + */ + void addLock(LockLevel level, String... resources); + + /** + * Remove a lock from LockManager. + */ + void removeLock(LockLevel level, String... resources); + + /** + * LockManager may need to back hook. + */ + void hook(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/NoLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/NoLockManager.java new file mode 100644 index 0000000000000..7fb26ee42cff7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/NoLockManager.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.common; + +import java.util.concurrent.locks.Lock; + +/** + * Some ut or temp replicaMap not need to lock with DataSetLockManager. + */ +public class NoLockManager implements LockManager { + private NoLock lock = new NoLock(null); + + private final class NoLock extends AutoCloseLock { + + private NoLock(Lock lock) { + super(lock); + } + + @Override + public void lock() { + } + + @Override + public void close() { + } + } + + public NoLockManager() { + } + + @Override + public AutoCloseLock readLock(LockLevel level, String... resources) { + return lock; + } + + @Override + public AutoCloseLock writeLock(LockLevel level, String... resources) { + return lock; + } + + @Override + public void addLock(LockLevel level, String... resources) { + } + + @Override + public void removeLock(LockLevel level, String... resources) { + } + + @Override + public void hook() { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index fe83700d6f994..282f29e6f48ee 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; +import org.apache.hadoop.hdfs.server.common.LockManager; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -307,6 +308,10 @@ private void connectToNNAndHandshake() throws IOException { // info. NamespaceInfo nsInfo = retrieveNamespaceInfo(); + // init block pool lock when init. + dn.getDataSetLockManager().addLock(LockManager.LockLevel.BLOCK_POOl, + nsInfo.getBlockPoolID()); + // Verify that this matches the other NN in this HA pair. // This also initializes our block pool in the DN if we are // the first NN connection for this BP. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 172a245b3525a..92e60d4bb0e67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.common.LockManager; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; @@ -256,7 +257,8 @@ class BlockSender implements java.io.Closeable { // the append write. ChunkChecksum chunkChecksum = null; final long replicaVisibleLength; - try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) { + try (AutoCloseableLock lock = datanode.getDataSetLockManager().readLock( + LockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8009577c06219..8adf7ce293f2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.common.LockManager; import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; import org.apache.hadoop.hdfs.util.DataTransferThrottler; @@ -396,6 +397,7 @@ public static InetSocketAddress createSocketAddr(String target) { .availableProcessors(); private static final double CONGESTION_RATIO = 1.5; private DiskBalancer diskBalancer; + private DataSetLockManager dataSetLockManager; private final ExecutorService xferService; @@ -437,6 +439,7 @@ private static Tracer createTracer(Configuration conf) { this.pipelineSupportSlownode = false; this.socketFactory = NetUtils.getDefaultSocketFactory(conf); this.dnConf = new DNConf(this); + this.dataSetLockManager = new DataSetLockManager(conf); initOOBTimeout(); storageLocationChecker = null; volumeChecker = new DatasetVolumeChecker(conf, new Timer()); @@ -455,6 +458,7 @@ private static Tracer createTracer(Configuration conf) { super(conf); this.tracer = createTracer(conf); this.fileIoProvider = new FileIoProvider(conf, this); + this.dataSetLockManager = new DataSetLockManager(conf); this.blockScanner = new BlockScanner(this); this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, @@ -2265,6 +2269,7 @@ public void shutdown() { notifyAll(); } tracer.close(); + dataSetLockManager.lockLeakCheck(); } /** @@ -3171,7 +3176,9 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final BlockConstructionStage stage; //get replica information - try(AutoCloseableLock lock = data.acquireDatasetReadLock()) { + + try(AutoCloseableLock lock = dataSetLockManager.writeLock(LockManager. + LockLevel.BLOCK_POOl, b.getBlockPoolId())) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { @@ -3888,6 +3895,10 @@ private static boolean isWrite(BlockConstructionStage stage) { || stage == PIPELINE_SETUP_APPEND_RECOVERY); } + public DataSetLockManager getDataSetLockManager() { + return dataSetLockManager; + } + boolean isSlownodeByNameserviceId(String nsId) { return blockPoolManager.isSlownodeByNameserviceId(nsId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java new file mode 100644 index 0000000000000..900fbf4b0ad69 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataSetLockManager.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.AutoCloseLock; +import org.apache.hadoop.hdfs.server.common.LockManager; + +import java.util.HashMap; +import java.util.Stack; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Class for maintain a set of lock for fsDataSetImpl. + */ +public class DataSetLockManager implements LockManager { + public static final Log LOG = LogFactory.getLog(DataSetLockManager.class); + private HashMap threadCountMap = new HashMap<>(); + private LockMap lockMap = new LockMap(); + private boolean isFair = true; + private boolean openLockTrace; + private Exception lastException; + + /** + * Class for maintain lockMap and is thread safe. + */ + private class LockMap { + private HashMap readlockMap = new HashMap<>(); + private HashMap writeLockMap = new HashMap<>(); + + public synchronized void addLock(String name, ReentrantReadWriteLock lock) { + AutoCloseLock readLock = new AutoCloseLock(lock.readLock()); + AutoCloseLock writeLock = new AutoCloseLock(lock.writeLock()); + if (openLockTrace) { + readLock.setLockManager(DataSetLockManager.this); + writeLock.setLockManager(DataSetLockManager.this); + } + readlockMap.putIfAbsent(name, readLock); + writeLockMap.putIfAbsent(name, writeLock); + } + + public synchronized void removeLock(String name) { + if (!readlockMap.containsKey(name) || !writeLockMap.containsKey(name)) { + LOG.error("The lock " + name + " is not in LockMap"); + } + readlockMap.remove(name); + writeLockMap.remove(name); + } + + public synchronized AutoCloseLock getReadLock(String name) { + return readlockMap.get(name); + } + + public synchronized AutoCloseLock getWriteLock(String name) { + return writeLockMap.get(name); + } + } + + private String generateLockName(LockLevel level, String... resources) { + if (resources.length == 1 && level == LockLevel.BLOCK_POOl) { + if (resources[0] == null) { + throw new IllegalArgumentException("acquire a null block pool lock"); + } + return resources[0]; + } else if (resources.length == 2 && level == LockLevel.VOLUME) { + if (resources[0] == null || resources[1] == null) { + throw new IllegalArgumentException("acquire a null bp lock : " + + resources[0] + "volume lock :" + resources[1]); + } + return resources[0] + resources[1]; + } else { + throw new IllegalArgumentException("lock level do not match resource"); + } + } + + /** + * Class for record thread acquire lock stack trace and count. + */ + private class TrackLog { + private Stack logStack = new Stack<>(); + private int lockCount = 0; + private String threadName; + + TrackLog(String threadName) { + this.threadName = threadName; + incrLockCount(); + } + + public void incrLockCount() { + logStack.push(new Exception("lock stack trace")); + lockCount += 1; + } + + public void decrLockCount() { + logStack.pop(); + lockCount -= 1; + } + + public void showLockMessage() { + LOG.error("hold lock thread name is:" + threadName + + " hold count is:" + lockCount); + while (!logStack.isEmpty()) { + Exception e = logStack.pop(); + LOG.error("lock stack ", e); + } + } + + public boolean shouldClear() { + return lockCount == 1; + } + } + + public DataSetLockManager(Configuration conf) { + this.isFair = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_BLOCKPOOL_LOCK_FAIR, + DFSConfigKeys.DFS_DATANODE_BLOCKPOOL_LOCK_FAIR_DEFAULT); + this.openLockTrace = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_BLOCKPOOL_LOCK_TRACE, + DFSConfigKeys.DFS_DATANODE_BLOCKPOOL_LOCK_TRACE_DEFAULT); + } + + public DataSetLockManager() { + this.openLockTrace = true; + } + + public AutoCloseLock readLock(LockLevel level, String... resources) { + if (level == LockLevel.BLOCK_POOl) { + return getReadLock(level, resources[0]); + } else { + AutoCloseLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); + AutoCloseLock volLock = getReadLock(level, resources); + volLock.setParentLock(bpLock); + if (openLockTrace) { + LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " + + resources[0]); + } + return volLock; + } + } + + public AutoCloseLock writeLock(LockLevel level, String... resources) { + if (level == LockLevel.BLOCK_POOl) { + return getWriteLock(level, resources[0]); + } else { + AutoCloseLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]); + AutoCloseLock volLock = getWriteLock(level, resources); + volLock.setParentLock(bpLock); + if (openLockTrace) { + LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " + + resources[0]); + } + return volLock; + } + } + + /** + * Return a not null ReadLock. + */ + private AutoCloseLock getReadLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + AutoCloseLock lock = lockMap.getReadLock(lockName); + if (lock == null) { + LOG.warn("Ignore this error during dn restart: Not existing readLock " + + lockName); + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + lock = lockMap.getReadLock(lockName); + } + lock.lock(); + if (openLockTrace) { + putThreadname(getThreadName()); + } + return lock; + } + + /** + * Return a not null WriteLock. + */ + private AutoCloseLock getWriteLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + AutoCloseLock lock = lockMap.getWriteLock(lockName); + if (lock == null) { + LOG.warn("Ignore this error during dn restart: Not existing writeLock" + + lockName); + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + lock = lockMap.getWriteLock(lockName); + } + lock.lock(); + if (openLockTrace) { + putThreadname(getThreadName()); + } + return lock; + } + + public void addLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + if (level == LockLevel.BLOCK_POOl) { + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + } else { + lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair)); + lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair)); + } + } + + public void removeLock(LockLevel level, String... resources) { + String lockName = generateLockName(level, resources); + try (AutoCloseLock lock = writeLock(level, resources)) { + lock.lock(); + lockMap.removeLock(lockName); + } + } + + @Override + public void hook() { + if (openLockTrace) { + removeThreadName(getThreadName()); + } + } + + // add when lock a lock + private synchronized void putThreadname(String thread) { + if (threadCountMap.containsKey(thread)) { + TrackLog trackLog = threadCountMap.get(thread); + trackLog.incrLockCount(); + } + threadCountMap.putIfAbsent(thread, new TrackLog(thread)); + } + + public void lockLeakCheck() { + if (!openLockTrace) { + LOG.warn("not open lock leak check func"); + return; + } + if (threadCountMap.isEmpty()) { + LOG.warn("all lock has release"); + return; + } + setLastException(new Exception("lock Leak")); + for (String thread : threadCountMap.keySet()) { + TrackLog trackLog = threadCountMap.get(thread); + trackLog.showLockMessage(); + } + } + + // remove when unlock a lock + private synchronized void removeThreadName(String thread) { + if (threadCountMap.containsKey(thread)) { + TrackLog trackLog = threadCountMap.get(thread); + if (trackLog.shouldClear()) { + threadCountMap.remove(thread); + return; + } + trackLog.decrLockCount(); + } + } + + private void setLastException(Exception e) { + this.lastException = e; + } + + public Exception getLastException() { + return lastException; + } + + private String getThreadName() { + return Thread.currentThread().getName() + Thread.currentThread().getId(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 0f710a143ad87..971f355814164 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -504,15 +504,13 @@ private Map getStorageIDToVolumeBasePathMap() Map storageIDToVolBasePathMap = new HashMap<>(); FsDatasetSpi.FsVolumeReferences references; try { - try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) { - references = this.dataset.getFsVolumeReferences(); - for (int ndx = 0; ndx < references.size(); ndx++) { - FsVolumeSpi vol = references.get(ndx); - storageIDToVolBasePathMap.put(vol.getStorageID(), - vol.getBaseURI().getPath()); - } - references.close(); + references = this.dataset.getFsVolumeReferences(); + for (int ndx = 0; ndx < references.size(); ndx++) { + FsVolumeSpi vol = references.get(ndx); + storageIDToVolBasePathMap.put(vol.getStorageID(), + vol.getBaseURI().getPath()); } + references.close(); } catch (IOException ex) { LOG.error("Disk Balancer - Internal Error.", ex); throw new DiskBalancerException("Internal error", ex, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index f162ea1b3ae15..a4fddaa8992fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.common.LockManager; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -657,21 +658,12 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, FsVolumeSpi destination) throws IOException; - /** - * Acquire the lock of the data set. This prevents other threads from - * modifying the volume map structure inside the datanode, but other changes - * are still possible. For example modifying the genStamp of a block instance. - */ - AutoCloseableLock acquireDatasetLock(); - /*** - * Acquire the read lock of the data set. This prevents other threads from - * modifying the volume map structure inside the datanode, but other changes - * are still possible. For example modifying the genStamp of a block instance. + * Acquire lock Manager for the data set. This prevents other threads from + * modifying the volume map structure inside the datanode. * @return The AutoClosable read lock instance. */ - AutoCloseableLock acquireDatasetReadLock(); - + LockManager acquireDatasetLockManager(); /** * Deep copy the replica info belonging to given block pool. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 0ef1d56de34d6..d1c38580e0b75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -914,7 +914,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) { private boolean readReplicasFromCache(ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap) { - ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap tmpReplicaMap = new ReplicaMap(); File replicaFile = new File(replicaCacheDir, REPLICA_CACHE_FILE); // Check whether the file exists or not. if (!replicaFile.exists()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 776e28594f52a..2afcb59bd42a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -32,7 +32,6 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -41,9 +40,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -63,6 +59,10 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.common.AutoCloseLock; +import org.apache.hadoop.hdfs.server.common.LockManager; +import org.apache.hadoop.hdfs.server.common.LockManager.LockLevel; +import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; @@ -118,7 +118,6 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.apache.hadoop.util.InstrumentedReadWriteLock; import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Sets; @@ -187,7 +186,8 @@ public StorageReport[] getStorageReports(String bpid) @Override public FsVolumeImpl getVolume(final ExtendedBlock b) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); return r != null ? (FsVolumeImpl) r.getVolume() : null; @@ -197,7 +197,8 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) { @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + bpid)) { ReplicaInfo r = volumeMap.get(bpid, blkid); if (r == null) { return null; @@ -209,12 +210,16 @@ public Block getStoredBlock(String bpid, long blkid) @Override public Set deepCopyReplica(String bpid) throws IOException { - Set replicas = null; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { - replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections. - EMPTY_SET : volumeMap.replicas(bpid)); + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { + Set replicas = new HashSet<>(); + volumeMap.replicas(bpid, (iterator) -> { + while (iterator.hasNext()) { + ReplicaInfo b = iterator.next(); + replicas.add(b); + } + }); + return replicas; } - return Collections.unmodifiableSet(replicas); } /** @@ -274,13 +279,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private boolean blockPinningEnabled; private final int maxDataLength; - @VisibleForTesting - final AutoCloseableLock datasetWriteLock; - @VisibleForTesting - final AutoCloseableLock datasetReadLock; - @VisibleForTesting - final InstrumentedReadWriteLock datasetRWLock; - private final Condition datasetWriteLockCondition; + private final DataSetLockManager lockManager; private static String blockPoolId = ""; // Make limited notify times from DirectoryScanner to NameNode. @@ -299,33 +298,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) this.dataStorage = storage; this.conf = conf; this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); - this.datasetRWLock = new InstrumentedReadWriteLock( - conf.getBoolean(DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY, - DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT), - "FsDatasetRWLock", LOG, conf.getTimeDuration( - DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY, - DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS), - conf.getTimeDuration( - DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY, - DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, - TimeUnit.MILLISECONDS)); - this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock()); - boolean enableRL = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, - DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT); - // The read lock can be disabled by the above config key. If it is disabled - // then we simply make the both the read and write lock variables hold - // the write lock. All accesses to the lock are via these variables, so that - // effectively disables the read lock. - if (enableRL) { - LOG.info("The datanode lock is a read write lock"); - this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); - } else { - LOG.info("The datanode lock is an exclusive write lock"); - this.datasetReadLock = this.datasetWriteLock; - } - this.datasetWriteLockCondition = datasetWriteLock.newCondition(); + this.lockManager = datanode.getDataSetLockManager(); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. @@ -364,7 +337,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) } storageMap = new ConcurrentHashMap(); - volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock); + volumeMap = new ReplicaMap(lockManager); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") @@ -420,16 +393,6 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) lastDirScannerNotifyTime = System.currentTimeMillis(); } - @Override - public AutoCloseableLock acquireDatasetLock() { - return datasetWriteLock.acquire(); - } - - @Override - public AutoCloseableLock acquireDatasetReadLock() { - return datasetReadLock.acquire(); - } - /** * Gets initial volume failure information for all volumes that failed * immediately at startup. The method works by determining the set difference @@ -464,42 +427,40 @@ private static List getInitialVolumeFailureInfos( * Activate a volume to serve requests. * @throws IOException if the storage UUID already exists. */ - private void activateVolume( + private synchronized void activateVolume( ReplicaMap replicaMap, Storage.StorageDirectory sd, StorageType storageType, FsVolumeReference ref) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { - DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); - if (dnStorage != null) { - final String errorMsg = String.format( - "Found duplicated storage UUID: %s in %s.", - sd.getStorageUuid(), sd.getVersionFile()); - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - // Check if there is same storage type on the mount. - // Only useful when same disk tiering is turned on. - FsVolumeImpl volumeImpl = (FsVolumeImpl) ref.getVolume(); - FsVolumeReference checkRef = volumes - .getMountVolumeMap() - .getVolumeRefByMountAndStorageType( - volumeImpl.getMount(), volumeImpl.getStorageType()); - if (checkRef != null) { - final String errorMsg = String.format( - "Storage type %s already exists on same mount: %s.", - volumeImpl.getStorageType(), volumeImpl.getMount()); - checkRef.close(); - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - volumeMap.mergeAll(replicaMap); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), - DatanodeStorage.State.NORMAL, - storageType)); - asyncDiskService.addVolume(volumeImpl); - volumes.addVolume(ref); - } + DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); + if (dnStorage != null) { + final String errorMsg = String.format( + "Found duplicated storage UUID: %s in %s.", + sd.getStorageUuid(), sd.getVersionFile()); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + // Check if there is same storage type on the mount. + // Only useful when same disk tiering is turned on. + FsVolumeImpl volumeImpl = (FsVolumeImpl) ref.getVolume(); + FsVolumeReference checkRef = volumes + .getMountVolumeMap() + .getVolumeRefByMountAndStorageType( + volumeImpl.getMount(), volumeImpl.getStorageType()); + if (checkRef != null) { + final String errorMsg = String.format( + "Storage type %s already exists on same mount: %s.", + volumeImpl.getStorageType(), volumeImpl.getMount()); + checkRef.close(); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + volumeMap.mergeAll(replicaMap); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(volumeImpl); + volumes.addVolume(ref); } private void addVolume(Storage.StorageDirectory sd) throws IOException { @@ -516,8 +477,8 @@ private void addVolume(Storage.StorageDirectory sd) throws IOException { .setConf(this.conf) .build(); FsVolumeReference ref = fsVolume.obtainReference(); - ReplicaMap tempVolumeMap = - new ReplicaMap(datasetReadLock, datasetWriteLock); + // no need to acquire lock. + ReplicaMap tempVolumeMap = new ReplicaMap(); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref); @@ -556,13 +517,13 @@ public void addVolume(final StorageLocation location, StorageType storageType = location.getStorageType(); final FsVolumeImpl fsVolume = createFsVolume(sd.getStorageUuid(), sd, location); - final ReplicaMap tempVolumeMap = - new ReplicaMap(new ReentrantReadWriteLock()); + // no need to add lock + final ReplicaMap tempVolumeMap = new ReplicaMap(); ArrayList exceptions = Lists.newArrayList(); for (final NamespaceInfo nsInfo : nsInfos) { String bpid = nsInfo.getBlockPoolID(); - try { + try (AutoCloseLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { fsVolume.addBlockPool(bpid, this.conf, this.timer); fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker); } catch (IOException e) { @@ -602,7 +563,7 @@ public void removeVolumes( new ArrayList<>(storageLocsToRemove); Map> blkToInvalidate = new HashMap<>(); List storageToRemove = new ArrayList<>(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + synchronized (this) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); final StorageLocation sdLocation = sd.getStorageLocation(); @@ -614,7 +575,7 @@ public void removeVolumes( // Disable the volume from the service. asyncDiskService.removeVolume(sd.getStorageUuid()); volumes.removeVolume(sdLocation, clearFailure); - volumes.waitVolumeRemoved(5000, datasetWriteLockCondition); + volumes.waitVolumeRemoved(5000, this); // Removed all replica information for the blocks on the volume. // Unlike updating the volumeMap in addVolume(), this operation does @@ -622,18 +583,19 @@ public void removeVolumes( for (String bpid : volumeMap.getBlockPoolList()) { List blocks = blkToInvalidate .computeIfAbsent(bpid, (k) -> new ArrayList<>()); - for (Iterator it = - volumeMap.replicas(bpid).iterator(); it.hasNext();) { - ReplicaInfo block = it.next(); - final StorageLocation blockStorageLocation = - block.getVolume().getStorageLocation(); - LOG.trace("checking for block " + block.getBlockId() + - " with storageLocation " + blockStorageLocation); - if (blockStorageLocation.equals(sdLocation)) { - blocks.add(block); - it.remove(); + volumeMap.replicas(bpid, (iterator) -> { + while (iterator.hasNext()) { + ReplicaInfo block = iterator.next(); + final StorageLocation blockStorageLocation = + block.getVolume().getStorageLocation(); + LOG.trace("checking for block " + block.getBlockId() + + " with storageLocation " + blockStorageLocation); + if (blockStorageLocation.equals(sdLocation)) { + blocks.add(block); + iterator.remove(); + } } - } + }); } storageToRemove.add(sd.getStorageUuid()); storageLocationsToRemove.remove(sdLocation); @@ -661,8 +623,8 @@ public void removeVolumes( } } - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { - for(String storageUuid : storageToRemove) { + synchronized (this) { + for (String storageUuid : storageToRemove) { storageMap.remove(storageUuid); } } @@ -852,7 +814,8 @@ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { ReplicaInfo info; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -940,7 +903,8 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid) @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1116,7 +1080,8 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, targetStorageType, targetStorageId); boolean useVolumeOnSameMount = false; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + block.getBlockPoolId())) { if (shouldConsiderSameMountVolume) { volumeRef = volumes.getVolumeByMount(targetStorageType, ((FsVolumeImpl) replicaInfo.getVolume()).getMount(), @@ -1310,7 +1275,8 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi FsVolumeReference volumeRef = null; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + block.getBlockPoolId())) { volumeRef = destination.obtainReference(); } @@ -1324,6 +1290,11 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi return replicaInfo; } + @Override + public LockManager acquireDatasetLockManager() { + return lockManager; + } + /** * Compute and store the checksum for a block file that does not already have * its checksum computed. @@ -1398,7 +1369,8 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta, @Override // FsDatasetSpi public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client @@ -1450,7 +1422,7 @@ public ReplicaHandler append(ExtendedBlock b, private ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, long newGS, long estimateBlockLen) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { // If the block is cached, start uncaching it. if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new IOException("Only a Finalized replica can be appended to; " @@ -1546,7 +1518,8 @@ public ReplicaHandler recoverAppend( while (true) { try { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); ReplicaInPipeline replica; @@ -1578,7 +1551,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS, LOG.info("Recover failed close " + b); while (true) { try { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1601,7 +1575,8 @@ public ReplicaHandler createRbw( StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { @@ -1679,7 +1654,8 @@ public ReplicaHandler recoverRbw( try { while (true) { try { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1710,7 +1686,8 @@ public ReplicaHandler recoverRbw( private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1771,7 +1748,8 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); final long visible = b.getNumBytes(); @@ -1850,7 +1828,8 @@ public ReplicaHandler createTemporary(StorageType storageType, ReplicaInfo lastFoundReplicaInfo = null; boolean isInPipeline = false; do { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { @@ -1905,7 +1884,8 @@ public ReplicaHandler createTemporary(StorageType storageType, false); } long startHoldLockTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b .getNumBytes()); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); @@ -1966,7 +1946,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) ReplicaInfo replicaInfo = null; ReplicaInfo finalizedReplicaInfo = null; long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); @@ -2002,7 +1983,7 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { // Compare generation stamp of old and new replica before finalizing if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp() > replicaInfo.getGenerationStamp()) { @@ -2048,7 +2029,8 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) @Override // FsDatasetSpi public void unfinalizeBlock(ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + b.getBlockPoolId())) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && @@ -2106,47 +2088,50 @@ public Map getBlockReports(String bpid) { new HashMap(); List curVolumes = null; - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { curVolumes = volumes.getVolumes(); for (FsVolumeSpi v : curVolumes) { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); } Set missingVolumesReported = new HashSet<>(); - for (ReplicaInfo b : volumeMap.replicas(bpid)) { - // skip PROVIDED replicas. - if (b.getVolume().getStorageType() == StorageType.PROVIDED) { - continue; - } - String volStorageID = b.getVolume().getStorageID(); - switch(b.getState()) { - case FINALIZED: - case RBW: - case RWR: - break; - case RUR: - // use the original replica. - b = b.getOriginalReplica(); - break; - case TEMPORARY: - continue; - default: - assert false : "Illegal ReplicaInfo state."; - continue; - } - BlockListAsLongs.Builder storageBuilder = builders.get(volStorageID); - // a storage in the process of failing will not be in the volumes list - // but will be in the replica map. - if (storageBuilder != null) { - storageBuilder.add(b); - } else { - if (!missingVolumesReported.contains(volStorageID)) { - LOG.warn("Storage volume: " + volStorageID + " missing for the" - + " replica block: " + b + ". Probably being removed!"); - missingVolumesReported.add(volStorageID); + volumeMap.replicas(bpid, (iterator) -> { + while (iterator.hasNext()) { + ReplicaInfo b = iterator.next(); + // skip PROVIDED replicas. + if (b.getVolume().getStorageType() == StorageType.PROVIDED) { + continue; + } + String volStorageID = b.getVolume().getStorageID(); + switch(b.getState()) { + case FINALIZED: + case RBW: + case RWR: + break; + case RUR: + // use the original replica. + b = b.getOriginalReplica(); + break; + case TEMPORARY: + continue; + default: + assert false : "Illegal ReplicaInfo state."; + continue; + } + BlockListAsLongs.Builder storageBuilder = builders.get(volStorageID); + // a storage in the process of failing will not be in the volumes list + // but will be in the replica map. + if (storageBuilder != null) { + storageBuilder.add(b); + } else { + if (!missingVolumesReported.contains(volStorageID)) { + LOG.warn("Storage volume: " + volStorageID + " missing for the" + + " replica block: " + b + ". Probably being removed!"); + missingVolumesReported.add(volStorageID); + } } } - } + }); } for (FsVolumeImpl v : curVolumes) { @@ -2161,7 +2146,7 @@ public Map getBlockReports(String bpid) { * Gets a list of references to the finalized blocks for the given block pool. *

* Callers of this function should call - * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being + * {@link FsDatasetSpi#acquireDatasetLockManager()} ()} to avoid blocks' status being * changed during list iteration. *

* @return a list of references to the finalized blocks for the given block @@ -2169,14 +2154,17 @@ public Map getBlockReports(String bpid) { */ @Override public List getFinalizedBlocks(String bpid) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { - final List finalized = new ArrayList( - volumeMap.size(bpid)); - for (ReplicaInfo b : volumeMap.replicas(bpid)) { - if (b.getState() == ReplicaState.FINALIZED) { - finalized.add(b); + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { + ArrayList finalized = + new ArrayList<>(volumeMap.size(bpid)); + volumeMap.replicas(bpid, (iterator) -> { + while (iterator.hasNext()) { + ReplicaInfo b = iterator.next(); + if (b.getState() == ReplicaState.FINALIZED) { + finalized.add(new FinalizedReplica((FinalizedReplica)b)); + } } - } + }); return finalized; } } @@ -2309,7 +2297,7 @@ private void invalidate(String bpid, Block[] invalidBlks, boolean async) for (int i = 0; i < invalidBlks.length; i++) { final ReplicaInfo removing; final FsVolumeImpl v; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); if (info == null) { ReplicaInfo infoByBlockId = @@ -2432,7 +2420,7 @@ private void cacheBlock(String bpid, long blockId) { long length, genstamp; Executor volumeExecutor; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { ReplicaInfo info = volumeMap.get(bpid, blockId); boolean success = false; try { @@ -2500,7 +2488,8 @@ public boolean isCached(String bpid, long blockId) { @Override // FsDatasetSpi public boolean contains(final ExtendedBlock block) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + block.getBlockPoolId())) { final long blockId = block.getLocalBlock().getBlockId(); final String bpid = block.getBlockPoolId(); final ReplicaInfo r = volumeMap.get(bpid, blockId); @@ -2627,7 +2616,7 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) curDirScannerNotifyCount = 0; lastDirScannerNotifyTime = startTimeMs; } - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { @@ -2844,7 +2833,7 @@ public ReplicaInfo getReplica(String bpid, long blockId) { @Override public String getReplicaString(String bpid, long blockId) { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { final Replica r = volumeMap.get(bpid, blockId); return r == null ? "null" : r.toString(); } @@ -2858,12 +2847,26 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) datanode.getDnConf().getXceiverStopTimeout()); } + ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, + Block block, long recoveryId, long xceiverStopTimeout) throws IOException { + while (true) { + try { + try (AutoCloseLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + return initReplicaRecoveryImpl(bpid, map, block, recoveryId); + } + } catch (MustStopExistingWriter e) { + e.getReplicaInPipeline().stopWriter(xceiverStopTimeout); + } + } + } + /** static version of {@link #initReplicaRecovery(RecoveringBlock)}. */ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, - Block block, long recoveryId, long xceiverStopTimeout) throws IOException { + Block block, long recoveryId, long xceiverStopTimeout, DataSetLockManager + lockManager) throws IOException { while (true) { try { - try (AutoCloseableLock lock = map.getLock().acquire()) { + try (AutoCloseLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { return initReplicaRecoveryImpl(bpid, map, block, recoveryId); } } catch (MustStopExistingWriter e) { @@ -2952,7 +2955,8 @@ public Replica updateReplicaUnderRecovery( final long newBlockId, final long newlength) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + oldBlock.getBlockPoolId())) { //get replica final String bpid = oldBlock.getBlockPoolId(); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); @@ -3071,7 +3075,8 @@ private ReplicaInfo updateReplicaUnderRecovery( @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + block.getBlockPoolId())) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -3088,7 +3093,7 @@ public void addBlockPool(String bpid, Configuration conf) throws IOException { LOG.info("Adding block pool " + bpid); AddBlockPoolException volumeExceptions = new AddBlockPoolException(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { try { volumes.addBlockPool(bpid, conf); } catch (AddBlockPoolException e) { @@ -3118,7 +3123,7 @@ public static void setBlockPoolId(String bpid) { @Override public void shutdownBlockPool(String bpid) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { LOG.info("Removing block pool " + bpid); Map blocksPerVolume = getBlockReports(bpid); @@ -3192,7 +3197,7 @@ public Map getVolumeInfoMap() { @Override //FsDatasetSpi public void deleteBlockPool(String bpid, boolean force) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { List curVolumes = volumes.getVolumes(); if (!force) { for (FsVolumeImpl volume : curVolumes) { @@ -3221,7 +3226,8 @@ public void deleteBlockPool(String bpid, boolean force) @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetReadLock.acquire()) { + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, + block.getBlockPoolId())) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { @@ -3275,7 +3281,7 @@ public void clearRollingUpgradeMarker(String bpid) throws IOException { @Override public void onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpId)) { ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length() @@ -3409,7 +3415,8 @@ private boolean saveNextReplica() { try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + block.getBlockPoolId())) { replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); // If replicaInfo is null, the block was either deleted before @@ -3476,7 +3483,7 @@ public void evictBlocks(long bytesNeeded) throws IOException { ReplicaInfo replicaInfo, newReplicaInfo; final String bpid = replicaState.getBlockPoolId(); - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); @@ -3654,18 +3661,21 @@ public int getVolumeCount() { } void stopAllDataxceiverThreads(FsVolumeImpl volume) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { - for (String bpid : volumeMap.getBlockPoolList()) { - Collection replicas = volumeMap.replicas(bpid); - for (ReplicaInfo replicaInfo : replicas) { - if ((replicaInfo.getState() == ReplicaState.TEMPORARY - || replicaInfo.getState() == ReplicaState.RBW) - && replicaInfo.getVolume().equals(volume)) { - ReplicaInPipeline replicaInPipeline = - (ReplicaInPipeline) replicaInfo; - replicaInPipeline.interruptThread(); + for (String bpid : volumeMap.getBlockPoolList()) { + try (AutoCloseLock lock = lockManager + .writeLock(LockLevel.BLOCK_POOl, bpid)) { + volumeMap.replicas(bpid, (iterator) -> { + while (iterator.hasNext()) { + ReplicaInfo replicaInfo = iterator.next(); + if ((replicaInfo.getState() == ReplicaState.TEMPORARY + || replicaInfo.getState() == ReplicaState.RBW) + && replicaInfo.getVolume().equals(volume)) { + ReplicaInPipeline replicaInPipeline = + (ReplicaInPipeline) replicaInfo; + replicaInPipeline.interruptThread(); + } } - } + }); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 95470bb8ffe26..262a24bd3aa45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -345,6 +345,28 @@ void waitVolumeRemoved(int sleepMillis, Condition condition) { FsDatasetImpl.LOG.info("Volume reference is released."); } + /** + * Wait for the reference of the volume removed from a previous + * {@link #removeVolume(FsVolumeImpl)} call to be released. + * + * @param sleepMillis interval to recheck. + */ + void waitVolumeRemoved(int sleepMillis, Object condition) { + while (!checkVolumesRemoved()) { + if (FsDatasetImpl.LOG.isDebugEnabled()) { + FsDatasetImpl.LOG.debug("Waiting for volume reference to be released."); + } + try { + condition.wait(sleepMillis); + } catch (InterruptedException e) { + FsDatasetImpl.LOG.info("Thread interrupted when waiting for " + + "volume reference to be released."); + Thread.currentThread().interrupt(); + } + } + FsDatasetImpl.LOG.info("Volume reference is released."); + } + @Override public String toString() { return volumes.toString(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java index eae119712f7c4..3286140d822a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java @@ -134,7 +134,7 @@ static class ProvidedBlockPoolSlice { ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume, Configuration conf) { this.providedVolume = volume; - bpVolumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + bpVolumeMap = new ReplicaMap(); Class fmt = conf.getClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS, TextFileRegionAliasMap.class, BlockAliasMap.class); @@ -219,7 +219,7 @@ private void incrNumBlocks() { } public boolean isEmpty() { - return bpVolumeMap.replicas(bpid).size() == 0; + return bpVolumeMap.size(bpid) == 0; } public void shutdown(BlockListAsLongs blocksListsAsLongs) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index c1d103ed50dba..b0c4a78db826d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -6,58 +6,62 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.util.Collection; -import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; -import java.util.concurrent.locks.ReadWriteLock; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.AutoCloseLock; +import org.apache.hadoop.hdfs.server.common.LockManager; +import org.apache.hadoop.hdfs.server.common.LockManager.LockLevel; +import org.apache.hadoop.hdfs.server.common.NoLockManager; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.util.LightWeightResizableGSet; -import org.apache.hadoop.util.AutoCloseableLock; /** * Maintains the replica map. */ class ReplicaMap { // Lock object to synchronize this instance. - private final AutoCloseableLock readLock; - private final AutoCloseableLock writeLock; - + private LockManager lockManager = null; + // Map of block pool Id to another map of block Id to ReplicaInfo. private final Map> map = - new HashMap<>(); + new ConcurrentHashMap<>(); - ReplicaMap(AutoCloseableLock readLock, AutoCloseableLock writeLock) { - if (readLock == null || writeLock == null) { + ReplicaMap(LockManager manager) { + if (manager == null) { throw new HadoopIllegalArgumentException( - "Lock to synchronize on cannot be null"); + "Object to synchronize on cannot be null"); } - this.readLock = readLock; - this.writeLock = writeLock; + this.lockManager = manager; } - ReplicaMap(ReadWriteLock lock) { - this(new AutoCloseableLock(lock.readLock()), - new AutoCloseableLock(lock.writeLock())); + // Used for ut or temp replicaMap that no need to protected by lock. + ReplicaMap() { + this.lockManager = new NoLockManager(); } String[] getBlockPoolList() { - try (AutoCloseableLock l = readLock.acquire()) { - return map.keySet().toArray(new String[map.keySet().size()]); - } + Set bpset = map.keySet(); + return bpset.toArray(new String[bpset.size()]); } private void checkBlockPool(String bpid) { @@ -100,7 +104,7 @@ ReplicaInfo get(String bpid, Block block) { */ ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = readLock.acquire()) { + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.get(new Block(blockId)) : null; } @@ -117,7 +121,7 @@ ReplicaInfo get(String bpid, long blockId) { ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -135,7 +139,7 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) { ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) { checkBlockPool(bpid); checkBlock(replicaInfo); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -164,13 +168,28 @@ void addAll(ReplicaMap other) { * Merge all entries from the given replica map into the local replica map. */ void mergeAll(ReplicaMap other) { - other.map.forEach( - (bp, replicaInfos) -> { - replicaInfos.forEach( - replicaInfo -> add(bp, replicaInfo) - ); + Set bplist = other.map.keySet(); + for (String bp : bplist) { + checkBlockPool(bp); + try (AutoCloseLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bp)) { + LightWeightResizableGSet replicaInfos = other.map.get(bp); + LightWeightResizableGSet curSet = map.get(bp); + HashSet replicaSet = new HashSet<>(); + //Can't add to GSet while in another GSet iterator may cause endlessLoop + for (ReplicaInfo replicaInfo : replicaInfos) { + replicaSet.add(replicaInfo); } - ); + for (ReplicaInfo replicaInfo : replicaSet) { + checkBlock(replicaInfo); + if (curSet == null) { + // Add an entry for block pool if it does not exist already + curSet = new LightWeightResizableGSet<>(); + map.put(bp, curSet); + } + curSet.put(replicaInfo); + } + } + } } /** @@ -184,7 +203,7 @@ void mergeAll(ReplicaMap other) { ReplicaInfo remove(String bpid, Block block) { checkBlockPool(bpid); checkBlock(block); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { ReplicaInfo replicaInfo = m.get(block); @@ -206,7 +225,7 @@ ReplicaInfo remove(String bpid, Block block) { */ ReplicaInfo remove(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m != null) { return m.remove(new Block(blockId)); @@ -221,7 +240,7 @@ ReplicaInfo remove(String bpid, long blockId) { * @return the number of replicas in the map */ int size(String bpid) { - try (AutoCloseableLock l = readLock.acquire()) { + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); return m != null ? m.size() : 0; } @@ -229,11 +248,9 @@ int size(String bpid) { /** * Get a collection of the replicas for given block pool - * This method is not synchronized. It needs to be synchronized - * externally using the lock, both for getting the replicas - * values from the map and iterating over it. Mutex can be accessed using - * {@link #getLock()} method. - * + * This method is not synchronized. If you want to keep thread safe + * Use method {@link #replicas(String, Consumer>)}. + * * @param bpid block pool id * @return a collection of the replicas belonging to the block pool */ @@ -243,9 +260,25 @@ Collection replicas(String bpid) { return m != null ? m.values() : null; } + /** + * execute function for one block pool and protect by LockManager. + * This method is synchronized. + * + * @param bpid block pool id + */ + void replicas(String bpid, Consumer> consumer) { + LightWeightResizableGSet m = null; + try (AutoCloseLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) { + m = map.get(bpid); + if (m !=null) { + m.getIterator(consumer); + } + } + } + void initBlockPool(String bpid) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { LightWeightResizableGSet m = map.get(bpid); if (m == null) { // Add an entry for block pool if it does not exist already @@ -257,26 +290,8 @@ void initBlockPool(String bpid) { void cleanUpBlockPool(String bpid) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { map.remove(bpid); } } - - /** - * Get the lock object used for synchronizing ReplicasMap - * @return lock object - */ - AutoCloseableLock getLock() { - return writeLock; - } - - /** - * Get the lock object used for synchronizing the ReplicasMap for read only - * operations. - * @return The read lock object - */ - AutoCloseableLock getReadLock() { - return readLock; - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index e766a13787f22..708b696d82de1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6337,4 +6337,25 @@ times, we should mark it as a badnode. + + + dfs.blockpool.lock.fair + false + + If this is true, the Datanode FsDataset lock will be used in Fair + mode, which will help to prevent writer threads from being starved, but can + lower lock throughput. See java.util.concurrent.locks.ReentrantReadWriteLock + for more information on fair/non-fair locks. + + + + + dfs.blockpool.lock.trace + false + + If this is true, after shout down datanode lock Manager will print all leak + thread that not release by lock Manager. Only used for test or trace dead lock + problem. In produce default set false, because it's have little performance loss. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 73e8bf7cb618e..706e1b42de5c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -40,6 +40,7 @@ import javax.management.ObjectName; import javax.management.StandardMBean; +import org.apache.hadoop.hdfs.server.common.LockManager; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap; import org.apache.hadoop.thirdparty.com.google.common.math.LongMath; import org.apache.commons.lang3.ArrayUtils; @@ -48,7 +49,6 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils; -import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -162,7 +162,7 @@ public static byte simulatedByte(Block b, long offsetInBlk) { static final byte[] nullCrcFileData; - private final AutoCloseableLock datasetLock; + private final LockManager datasetLockManager; private final FileIoProvider fileIoProvider; static { @@ -707,6 +707,8 @@ public SimulatedFSDataset(DataStorage storage, Configuration conf) { public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) { this.datanode = datanode; + this.datasetLockManager = datanode == null ? new DataSetLockManager() : + datanode.getDataSetLockManager(); int storageCount; if (storage != null && storage.getNumStorageDirs() > 0) { storageCount = storage.getNumStorageDirs(); @@ -721,9 +723,6 @@ public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration registerMBean(datanodeUuid); this.fileIoProvider = new FileIoProvider(conf, datanode); - - this.datasetLock = new AutoCloseableLock(); - this.storages = new ArrayList<>(); for (int i = 0; i < storageCount; i++) { this.storages.add(new SimulatedStorage( @@ -1587,14 +1586,8 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, } @Override - public AutoCloseableLock acquireDatasetLock() { - return datasetLock.acquire(); - } - - @Override - public AutoCloseableLock acquireDatasetReadLock() { - // No RW lock implementation in simulated dataset currently. - return datasetLock.acquire(); + public LockManager acquireDatasetLockManager() { + return null; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 057dd6459fdc2..0c0fe618174c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -128,6 +128,7 @@ public class TestBPOfferService { private final int[] heartbeatCounts = new int[3]; private DataNode mockDn; private FsDatasetSpi mockFSDataset; + private DataSetLockManager dataSetLockManager = new DataSetLockManager(); private boolean isSlownode; @Before @@ -153,6 +154,7 @@ public void setupMocks() throws Exception { // Wire the dataset to the DN. Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset(); + Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager(); } /** @@ -508,6 +510,7 @@ public void testPickActiveNameNode() throws Exception { public void testBPInitErrorHandling() throws Exception { final DataNode mockDn = Mockito.mock(DataNode.class); Mockito.doReturn(true).when(mockDn).shouldRun(); + Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager(); Configuration conf = new Configuration(); File dnDataDir = new File( new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java index 8d2df18711256..c5dfba42d784a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery2.java @@ -252,10 +252,8 @@ public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() final BlockRecoveryCommand.RecoveringBlock recoveringBlock = new BlockRecoveryCommand.RecoveringBlock(block.getBlock(), locations, block.getBlock().getGenerationStamp() + 1); - try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { - Thread.sleep(2000); - dataNode.initReplicaRecovery(recoveringBlock); - } + Thread.sleep(2000); + dataNode.initReplicaRecovery(recoveringBlock); } catch (Exception e) { LOG.error("Something went wrong.", e); recoveryInitResult.set(false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java new file mode 100644 index 0000000000000..067bb22346f3c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataSetLockManager.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.hadoop.hdfs.server.common.AutoCloseLock; +import org.apache.hadoop.hdfs.server.common.LockManager; +import org.apache.hadoop.hdfs.server.common.LockManager.LockLevel; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestDataSetLockManager { + private DataSetLockManager manager; + + @Before + public void init() { + manager = new DataSetLockManager(); + } + + @Test(timeout = 5000) + public void testBaseFunc() { + manager.addLock(LockLevel.BLOCK_POOl, "BPtest"); + manager.addLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + + AutoCloseLock lock = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest"); + AutoCloseLock lock1 = manager.readLock(LockLevel.BLOCK_POOl, "BPtest"); + lock1.close(); + lock.close(); + + manager.lockLeakCheck(); + assertNull(manager.getLastException()); + + AutoCloseLock lock2 = manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + AutoCloseLock lock3 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + lock3.close(); + lock2.close(); + + manager.lockLeakCheck(); + assertNull(manager.getLastException()); + + AutoCloseLock lock4 = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest"); + AutoCloseLock lock5 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + lock5.close(); + lock4.close(); + + manager.lockLeakCheck(); + assertNull(manager.getLastException()); + + manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest"); + manager.lockLeakCheck(); + + Exception lastException = manager.getLastException(); + assertEquals(lastException.getMessage(), "lock Leak"); + } + + @Test(timeout = 5000) + public void testAcquireWriteLockError() throws InterruptedException { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + manager.readLock(LockManager.LockLevel.BLOCK_POOl, "test"); + manager.writeLock(LockLevel.BLOCK_POOl, "test"); + } + }); + t.start(); + Thread.sleep(1000); + manager.lockLeakCheck(); + Exception lastException = manager.getLastException(); + t.stop(); + assertEquals(lastException.getMessage(), "lock Leak"); + } + + @Test(timeout = 5000) + public void testLockLeakCheck() { + manager.writeLock(LockLevel.BLOCK_POOl, "test"); + manager.lockLeakCheck(); + Exception lastException = manager.getLastException(); + assertEquals(lastException.getMessage(), "lock Leak"); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 7f79778842780..9b01fde22513f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.LockManager; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; @@ -130,7 +131,8 @@ private List createFile(String fileNamePrefix, long fileLen, /** Truncate a block file. */ private long truncateBlockFile() throws IOException { - try (AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLockManager().writeLock( + LockManager.LockLevel.BLOCK_POOl, bpid)) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = new File(b.getBlockURI()); File mf = new File(b.getMetadataURI()); @@ -155,7 +157,8 @@ private long truncateBlockFile() throws IOException { /** Delete a block file */ private long deleteBlockFile() { - try (AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLockManager(). + writeLock(LockManager.LockLevel.BLOCK_POOl, bpid)) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = new File(b.getBlockURI()); File mf = new File(b.getMetadataURI()); @@ -171,7 +174,8 @@ private long deleteBlockFile() { /** Delete block meta file */ private long deleteMetaFile() { - try (AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLockManager(). + writeLock(LockManager.LockLevel.BLOCK_POOl, bpid)) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { // Delete a metadata file if (b.metadataExists() && b.deleteMetadata()) { @@ -190,7 +194,8 @@ private long deleteMetaFile() { * @throws IOException */ private void duplicateBlock(long blockId) throws IOException { - try (AutoCloseableLock lock = fds.acquireDatasetLock()) { + try (AutoCloseableLock lock = fds.acquireDatasetLockManager(). + writeLock(LockManager.LockLevel.BLOCK_POOl, bpid)) { ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 3fbd4de721260..21f4bd36dda04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.common.LockManager; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; @@ -452,12 +453,7 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, } @Override - public AutoCloseableLock acquireDatasetLock() { - return null; - } - - @Override - public AutoCloseableLock acquireDatasetReadLock() { + public LockManager acquireDatasetLockManager() { return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index 67176d8a1a3e6..7e8497e1ebdbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -434,7 +434,7 @@ public void changeStoredGenerationStamp( @Override public Iterator getStoredReplicas(String bpid) throws IOException { // Reload replicas from the disk. - ReplicaMap replicaMap = new ReplicaMap(dataset.datasetRWLock); + ReplicaMap replicaMap = new ReplicaMap(dataset.acquireDatasetLockManager()); try (FsVolumeReferences refs = dataset.getFsVolumeReferences()) { for (FsVolumeSpi vol : refs) { FsVolumeImpl volume = (FsVolumeImpl) vol; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index f11f4b9d1cdb1..18fe5264eff01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.DF; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; @@ -140,6 +141,7 @@ public class TestFsDatasetImpl { private DataNode datanode; private DataStorage storage; private FsDatasetImpl dataset; + private DataSetLockManager manager = new DataSetLockManager(); private final static String BLOCKPOOL = "BP-TEST"; @@ -213,6 +215,7 @@ public void setUp() throws IOException { this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY, replicaCacheRootDir); + when(datanode.getDataSetLockManager()).thenReturn(manager); when(datanode.getConf()).thenReturn(conf); final DNConf dnConf = new DNConf(datanode); when(datanode.getDnConf()).thenReturn(dnConf); @@ -232,118 +235,6 @@ public void setUp() throws IOException { assertEquals(0, dataset.getNumFailedVolumes()); } - @Test(timeout=10000) - public void testReadLockEnabledByDefault() - throws Exception { - final FsDatasetSpi ds = dataset; - AtomicBoolean accessed = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch waiterLatch = new CountDownLatch(1); - - Thread holder = new Thread() { - public void run() { - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - latch.countDown(); - // wait for the waiter thread to access the lock. - waiterLatch.await(); - } catch (Exception e) { - } - } - }; - - Thread waiter = new Thread() { - public void run() { - try { - latch.await(); - } catch (InterruptedException e) { - waiterLatch.countDown(); - return; - } - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - accessed.getAndSet(true); - // signal the holder thread. - waiterLatch.countDown(); - } catch (Exception e) { - } - } - }; - waiter.start(); - holder.start(); - holder.join(); - waiter.join(); - // The holder thread is still holding the lock, but the waiter can still - // run as the lock is a shared read lock. - // Otherwise test will timeout with deadlock. - assertEquals(true, accessed.get()); - holder.interrupt(); - } - - @Test(timeout=20000) - public void testReadLockCanBeDisabledByConfig() - throws Exception { - HdfsConfiguration conf = new HdfsConfiguration(); - conf.setBoolean( - DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); - try { - AtomicBoolean accessed = new AtomicBoolean(false); - cluster.waitActive(); - DataNode dn = cluster.getDataNodes().get(0); - final FsDatasetSpi ds = DataNodeTestUtils.getFSDataset(dn); - - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch waiterLatch = new CountDownLatch(1); - Thread holder = new Thread() { - public void run() { - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - latch.countDown(); - // wait for the waiter thread to access the lock. - waiterLatch.await(); - } catch (Exception e) { - } - } - }; - - Thread waiter = new Thread() { - public void run() { - try { - // Wait for holder to get ds read lock. - latch.await(); - } catch (InterruptedException e) { - waiterLatch.countDown(); - return; - } - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - accessed.getAndSet(true); - // signal the holder thread. - waiterLatch.countDown(); - } catch (Exception e) { - } - } - }; - waiter.start(); - holder.start(); - // Wait for sometime to make sure we are in deadlock, - try { - GenericTestUtils.waitFor(() -> - accessed.get(), - 100, 10000); - fail("Waiter thread should not execute."); - } catch (TimeoutException e) { - } - // Release waiterLatch to exit deadlock. - waiterLatch.countDown(); - holder.join(); - waiter.join(); - // After releasing waiterLatch water - // thread will be able to execute. - assertTrue(accessed.get()); - } finally { - cluster.shutdown(); - } - } - @Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; @@ -413,6 +304,7 @@ public void testAddVolumeWithSameDiskTiering() throws IOException { final ShortCircuitRegistry shortCircuitRegistry = new ShortCircuitRegistry(conf); when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); + when(datanode.getDataSetLockManager()).thenReturn(manager); createStorageDirs(storage, conf, 1); dataset = new FsDatasetImpl(datanode, storage, conf); @@ -480,6 +372,8 @@ public void testAddVolumeWithCustomizedCapacityRatio() final ShortCircuitRegistry shortCircuitRegistry = new ShortCircuitRegistry(conf); when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); + when(datanode.getDataSetLockManager()).thenReturn(manager); + createStorageDirs(storage, conf, 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 4d8e0c99980d2..8a813a6c0017a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -407,7 +407,7 @@ public void run() { fs.close(); FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0) .getFSDataset(); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(fsDataset.acquireDatasetLockManager()); RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker .getInstance(conf, fsDataset); FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index b72b1cd1bb8bd..0727c53f5fb1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -47,10 +47,12 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; @@ -236,7 +238,8 @@ public void testInitReplicaRecovery() throws IOException { final long firstblockid = 10000L; final long gs = 7777L; final long length = 22L; - final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock()); + DataSetLockManager manager = new DataSetLockManager(); + final ReplicaMap map = new ReplicaMap(manager); String bpid = "BP-TEST"; final Block[] blocks = new Block[5]; for(int i = 0; i < blocks.length; i++) { @@ -252,7 +255,7 @@ public void testInitReplicaRecovery() throws IOException { final long recoveryid = gs + 1; final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl .initReplicaRecovery(bpid, map, blocks[0], recoveryid, - DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager); assertEquals(originalInfo, recoveryInfo); final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b); @@ -263,7 +266,7 @@ public void testInitReplicaRecovery() throws IOException { final long recoveryid2 = gs + 2; final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl .initReplicaRecovery(bpid, map, blocks[0], recoveryid2, - DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager); assertEquals(originalInfo, recoveryInfo2); final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b); @@ -273,7 +276,7 @@ public void testInitReplicaRecovery() throws IOException { //case RecoveryInProgressException try { FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid, - DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager); Assert.fail(); } catch(RecoveryInProgressException ripe) { @@ -286,7 +289,7 @@ public void testInitReplicaRecovery() throws IOException { final Block b = new Block(firstblockid - 1, length, gs); ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid, - DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager); Assert.assertNull("Data-node should not have this replica.", r); } @@ -295,7 +298,7 @@ public void testInitReplicaRecovery() throws IOException { final Block b = new Block(firstblockid + 1, length, gs); try { FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid, - DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager); Assert.fail(); } catch(IOException ioe) { @@ -309,7 +312,7 @@ public void testInitReplicaRecovery() throws IOException { final Block b = new Block(firstblockid, length, gs+1); try { FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid, - DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT); + DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT, manager); fail("InitReplicaRecovery should fail because replica's " + "gs is less than the block's gs"); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java index afd816864a1d7..e23de378a8b48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DNConf; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfoVolumeReport; @@ -109,6 +110,7 @@ public class TestProvidedImpl { private DataNode datanode; private DataStorage storage; private FsDatasetImpl dataset; + private DataSetLockManager manager = new DataSetLockManager(); private static Map blkToPathMap; private static List providedVolumes; private static long spaceUsed = 0; @@ -319,6 +321,7 @@ public void setUp() throws IOException { conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); when(datanode.getConf()).thenReturn(conf); + when(datanode.getDataSetLockManager()).thenReturn(manager); final DNConf dnConf = new DNConf(datanode); when(datanode.getDnConf()).thenReturn(dnConf); // reset the space used @@ -400,7 +403,7 @@ public void testProvidedVolumeImpl() throws IOException { public void testBlockLoad() throws IOException { for (int i = 0; i < providedVolumes.size(); i++) { FsVolumeImpl vol = providedVolumes.get(i); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(); vol.getVolumeMap(volumeMap, null); assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length); @@ -476,7 +479,7 @@ private int getBlocksInProvidedVolumes(String basePath, int numBlocks, vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID], new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId, numBlocks)); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(); vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null); totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]); } @@ -586,7 +589,7 @@ public void testProvidedReplicaSuffixExtraction() { public void testProvidedReplicaPrefix() throws Exception { for (int i = 0; i < providedVolumes.size(); i++) { FsVolumeImpl vol = providedVolumes.get(i); - ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap volumeMap = new ReplicaMap(); vol.getVolumeMap(volumeMap, null); Path expectedPrefix = new Path( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java index 59203bb7d3468..483295f80f275 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java @@ -32,7 +32,7 @@ * Unit test for ReplicasMap class */ public class TestReplicaMap { - private final ReplicaMap map = new ReplicaMap(new ReentrantReadWriteLock()); + private final ReplicaMap map = new ReplicaMap(); private final String bpid = "BP-TEST"; private final Block block = new Block(1234, 1234, 1234); @@ -112,7 +112,7 @@ public void testRemove() { @Test public void testMergeAll() { - ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap temReplicaMap = new ReplicaMap(); Block tmpBlock = new Block(5678, 5678, 5678); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); @@ -123,7 +123,7 @@ public void testMergeAll() { @Test public void testAddAll() { - ReplicaMap temReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap temReplicaMap = new ReplicaMap(); Block tmpBlock = new Block(5678, 5678, 5678); temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index e939389624130..bc9faaa2cf78c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -550,7 +550,7 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception { bpList.size() == 2); createReplicas(bpList, volumes, cluster.getFsDatasetTestUtils(dn)); - ReplicaMap oldReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); + ReplicaMap oldReplicaMap = new ReplicaMap(); oldReplicaMap.addAll(dataSet.volumeMap); cluster.restartDataNode(0);