diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 1d2c7c75096b..b499755a0171 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -27,10 +27,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.function.Function; +import com.google.common.util.concurrent.Striped; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.StorageUnit; @@ -48,6 +49,7 @@ import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.ChunkBuffer; @@ -77,7 +79,6 @@ import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; -import org.apache.hadoop.util.AutoCloseableLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -123,8 +124,8 @@ public class KeyValueHandler extends Handler { private final Function byteBufferToByteString; private final boolean validateChunkChecksumData; - // A lock that is held during container creation. - private final AutoCloseableLock containerCreationLock; + // A striped lock that is held during container creation. + private final Striped containerCreationLocks; public KeyValueHandler(ConfigurationSource config, String datanodeId, ContainerSet contSet, VolumeSet volSet, ContainerMetrics metrics, @@ -146,9 +147,17 @@ public KeyValueHandler(ConfigurationSource config, String datanodeId, maxContainerSize = (long) config.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); - // this handler lock is used for synchronizing createContainer Requests, - // so using a fair lock here. - containerCreationLock = new AutoCloseableLock(new ReentrantLock(true)); + // this striped handler lock is used for synchronizing createContainer + // Requests. + final int threadCountPerDisk = conf.getInt( + OzoneConfigKeys + .DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME_KEY, + OzoneConfigKeys + .DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME_DEFAULT); + final int numberOfDisks = + HddsServerUtil.getDatanodeStorageDirs(conf).size(); + containerCreationLocks = Striped.lazyWeakLock( + threadCountPerDisk * numberOfDisks); boolean isUnsafeByteBufferConversionEnabled = conf.getBoolean( @@ -268,7 +277,9 @@ ContainerCommandResponseProto handleCreateContainer( newContainerData, conf); boolean created = false; - try (AutoCloseableLock l = containerCreationLock.acquire()) { + Lock containerIdLock = containerCreationLocks.get(containerID); + containerIdLock.lock(); + try { if (containerSet.getContainer(containerID) == null) { newContainer.create(volumeSet, volumeChoosingPolicy, clusterId); created = containerSet.addContainer(newContainer); @@ -280,6 +291,8 @@ ContainerCommandResponseProto handleCreateContainer( } } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); + } finally { + containerIdLock.unlock(); } if (created) {