diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index f59cb1b22b29..315905f6d5cb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -346,6 +346,10 @@ public final class ScmConfigKeys {
"ozone.scm.container.size";
public static final String OZONE_SCM_CONTAINER_SIZE_DEFAULT = "5GB";
+ public static final String OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE =
+ "ozone.scm.container.lock.stripes";
+ public static final int OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE_DEFAULT = 512;
+
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
"ozone.scm.container.placement.impl";
public static final String OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 47c32219c605..f835ed31288c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1012,6 +1012,14 @@
balances the amount of metadata.
+
+ ozone.scm.container.lock.stripes
+ 512
+ OZONE, SCM, PERFORMANCE, MANAGEMENT
+
+ The number of stripes created for the container state manager lock.
+
+
ozone.scm.datanode.address
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index e978c1b8f50c..72d90abe1f4f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -30,6 +30,7 @@
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -53,10 +54,8 @@
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.ozone.lock.LockManager;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.function.CheckedConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +73,8 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE_DEFAULT;
/**
@@ -86,7 +87,7 @@
public final class ContainerStateManagerImpl
implements ContainerStateManager {
- private final LockManager lockManager;
+ private final Striped stripedLock;
/**
* Logger instance of ContainerStateManagerImpl.
@@ -155,7 +156,6 @@ private ContainerStateManagerImpl(final Configuration conf,
final Table containerStore,
final DBTransactionBuffer buffer,
final ContainerReplicaPendingOps pendingOps) throws IOException {
- ConfigurationSource confSrc = OzoneConfiguration.of(conf);
this.pipelineManager = pipelineManager;
this.containerStore = containerStore;
this.stateMachine = newStateMachine();
@@ -164,8 +164,9 @@ private ContainerStateManagerImpl(final Configuration conf,
this.lastUsedMap = new ConcurrentHashMap<>();
this.containerStateChangeActions = getContainerStateChangeActions();
this.transactionBuffer = buffer;
- this.lockManager =
- new LockManager<>(confSrc, true);
+ this.stripedLock = Striped.readWriteLock(conf.getInt(
+ OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE,
+ OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE_DEFAULT));
this.containerReplicaPendingOps = pendingOps;
initialize();
}
@@ -277,31 +278,22 @@ private void initialize() throws IOException {
@Override
public Set getContainerIDs() {
- lock.readLock().lock();
- try {
+ try (AutoCloseableLock ignored = readLock()) {
return containers.getAllContainerIDs();
- } finally {
- lock.readLock().unlock();
}
}
@Override
public Set getContainerIDs(final LifeCycleState state) {
- lock.readLock().lock();
- try {
+ try (AutoCloseableLock ignored = readLock()) {
return containers.getContainerIDsByState(state);
- } finally {
- lock.readLock().unlock();
}
}
@Override
public ContainerInfo getContainer(final ContainerID id) {
- lockManager.readLock(id);
- try {
+ try (AutoCloseableLock ignored = readLock(id)) {
return containers.getContainerInfo(id);
- } finally {
- lockManager.readUnlock(id);
}
}
@@ -318,45 +310,35 @@ public void addContainer(final ContainerInfoProto containerInfo)
final ContainerID containerID = container.containerID();
final PipelineID pipelineID = container.getPipelineID();
- lock.writeLock().lock();
- try {
- lockManager.writeLock(containerID);
- try {
- if (!containers.contains(containerID)) {
- ExecutionUtil.create(() -> {
- transactionBuffer.addToBuffer(containerStore,
- containerID, container);
- containers.addContainer(container);
- if (pipelineManager.containsPipeline(pipelineID)) {
- pipelineManager.addContainerToPipeline(pipelineID, containerID);
- } else if (containerInfo.getState().
- equals(LifeCycleState.OPEN)) {
- // Pipeline should exist, but not
- throw new PipelineNotFoundException();
- }
- //recon may receive report of closed container,
- // no corresponding Pipeline can be synced for scm.
- // just only add the container.
- }).onException(() -> {
- containers.removeContainer(containerID);
- transactionBuffer.removeFromBuffer(containerStore, containerID);
- }).execute();
- }
- } finally {
- lockManager.writeUnlock(containerID);
+ try (AutoCloseableLock ignoredGlobal = writeLock();
+ AutoCloseableLock ignored = writeLock(containerID)) {
+ if (!containers.contains(containerID)) {
+ ExecutionUtil.create(() -> {
+ transactionBuffer.addToBuffer(containerStore,
+ containerID, container);
+ containers.addContainer(container);
+ if (pipelineManager.containsPipeline(pipelineID)) {
+ pipelineManager.addContainerToPipeline(pipelineID, containerID);
+ } else if (containerInfo.getState().
+ equals(LifeCycleState.OPEN)) {
+ // Pipeline should exist, but not
+ throw new PipelineNotFoundException();
+ }
+ //recon may receive report of closed container,
+ // no corresponding Pipeline can be synced for scm.
+ // just only add the container.
+ }).onException(() -> {
+ containers.removeContainer(containerID);
+ transactionBuffer.removeFromBuffer(containerStore, containerID);
+ }).execute();
}
- } finally {
- lock.writeLock().unlock();
}
}
@Override
public boolean contains(ContainerID id) {
- lockManager.readLock(id);
- try {
+ try (AutoCloseableLock ignored = readLock(id)) {
return containers.contains(id);
- } finally {
- lockManager.readUnlock(id);
}
}
@@ -367,8 +349,7 @@ public void updateContainerState(final HddsProtos.ContainerID containerID,
// TODO: Remove the protobuf conversion after fixing ContainerStateMap.
final ContainerID id = ContainerID.getFromProtobuf(containerID);
- lockManager.writeLock(id);
- try {
+ try (AutoCloseableLock ignored = writeLock(id)) {
if (containers.contains(id)) {
final ContainerInfo oldInfo = containers.getContainerInfo(id);
final LifeCycleState oldState = oldInfo.getState();
@@ -387,48 +368,37 @@ public void updateContainerState(final HddsProtos.ContainerID containerID,
.accept(oldInfo);
}
}
- } finally {
- lockManager.writeUnlock(id);
}
}
@Override
public Set getContainerReplicas(final ContainerID id) {
- lockManager.readLock(id);
- try {
+ try (AutoCloseableLock ignored = readLock(id)) {
return containers.getContainerReplicas(id);
- } finally {
- lockManager.readUnlock(id);
}
}
@Override
public void updateContainerReplica(final ContainerID id,
final ContainerReplica replica) {
- lockManager.writeLock(id);
- try {
+ try (AutoCloseableLock ignored = writeLock(id)) {
containers.updateContainerReplica(id, replica);
// Clear any pending additions for this replica as we have now seen it.
containerReplicaPendingOps.completeAddReplica(id,
replica.getDatanodeDetails(), replica.getReplicaIndex());
- } finally {
- lockManager.writeUnlock(id);
}
}
@Override
public void removeContainerReplica(final ContainerID id,
final ContainerReplica replica) {
- lockManager.writeLock(id);
- try {
+ try (AutoCloseableLock ignored = writeLock(id)) {
containers.removeContainerReplica(id, replica);
// Remove any pending delete replication operations for the deleted
// replica.
containerReplicaPendingOps.completeDeleteReplica(id,
replica.getDatanodeDetails(), replica.getReplicaIndex());
- } finally {
- lockManager.writeUnlock(id);
}
}
@@ -440,8 +410,7 @@ public void updateDeleteTransactionId(
for (Map.Entry transaction :
deleteTransactionMap.entrySet()) {
ContainerID containerID = transaction.getKey();
- try {
- lockManager.writeLock(containerID);
+ try (AutoCloseableLock ignored = writeLock(containerID)) {
final ContainerInfo info = containers.getContainerInfo(
transaction.getKey());
if (info == null) {
@@ -451,8 +420,6 @@ public void updateDeleteTransactionId(
}
info.updateDeleteTransactionId(transaction.getValue());
transactionBuffer.addToBuffer(containerStore, info.containerID(), info);
- } finally {
- lockManager.writeUnlock(containerID);
}
}
}
@@ -503,15 +470,12 @@ private ContainerInfo findContainerWithSpace(final long size,
searchSet) {
// Get the container with space to meet our request.
for (ContainerID id : searchSet) {
- try {
- lockManager.readLock(id);
+ try (AutoCloseableLock ignored = readLock(id)) {
final ContainerInfo containerInfo = containers.getContainerInfo(id);
if (containerInfo.getUsedBytes() + size <= this.containerSize) {
containerInfo.updateLastUsedTime();
return containerInfo;
}
- } finally {
- lockManager.readUnlock(id);
}
}
return null;
@@ -521,35 +485,25 @@ private ContainerInfo findContainerWithSpace(final long size,
public void removeContainer(final HddsProtos.ContainerID id)
throws IOException {
final ContainerID cid = ContainerID.getFromProtobuf(id);
- lock.writeLock().lock();
- try {
- lockManager.writeLock(cid);
- try {
- final ContainerInfo containerInfo = containers.getContainerInfo(cid);
- ExecutionUtil.create(() -> {
- transactionBuffer.removeFromBuffer(containerStore, cid);
- containers.removeContainer(cid);
- }).onException(() -> containerStore.put(cid, containerInfo)).execute();
- } finally {
- lockManager.writeUnlock(cid);
- }
- } finally {
- lock.writeLock().unlock();
+ try (AutoCloseableLock ignoredGlobal = writeLock();
+ AutoCloseableLock ignored = writeLock(cid)) {
+ final ContainerInfo containerInfo = containers.getContainerInfo(cid);
+ ExecutionUtil.create(() -> {
+ transactionBuffer.removeFromBuffer(containerStore, cid);
+ containers.removeContainer(cid);
+ }).onException(() -> containerStore.put(cid, containerInfo)).execute();
}
}
@Override
public void reinitialize(
Table store) throws IOException {
- lock.writeLock().lock();
- try {
+ try (AutoCloseableLock ignored = writeLock()) {
close();
this.containerStore = store;
this.containers = new ContainerStateMap();
this.lastUsedMap = new ConcurrentHashMap<>();
initialize();
- } finally {
- lock.writeLock().unlock();
}
}
@@ -562,6 +516,22 @@ public void close() throws IOException {
}
}
+ private AutoCloseableLock readLock() {
+ return AutoCloseableLock.acquire(lock.readLock());
+ }
+
+ private AutoCloseableLock writeLock() {
+ return AutoCloseableLock.acquire(lock.writeLock());
+ }
+
+ private AutoCloseableLock readLock(ContainerID id) {
+ return AutoCloseableLock.acquire(stripedLock.get(id).readLock());
+ }
+
+ private AutoCloseableLock writeLock(ContainerID id) {
+ return AutoCloseableLock.acquire(stripedLock.get(id).writeLock());
+ }
+
public static Builder newBuilder() {
return new Builder();
}