Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ public final class ScmConfigKeys {
"ozone.scm.container.size";
public static final String OZONE_SCM_CONTAINER_SIZE_DEFAULT = "5GB";

public static final String OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE =
"ozone.scm.container.lock.stripes";
public static final int OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE_DEFAULT = 512;

public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
"ozone.scm.container.placement.impl";
public static final String OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY =
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,14 @@
balances the amount of metadata.
</description>
</property>
<property>
<name>ozone.scm.container.lock.stripes</name>
<value>512</value>
<tag>OZONE, SCM, PERFORMANCE, MANAGEMENT</tag>
<description>
The number of stripes created for the container state manager lock.
</description>
</property>
<property>
<name>ozone.scm.datanode.address</name>
<value/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.base.Preconditions;

import com.google.common.util.concurrent.Striped;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand All @@ -53,10 +54,8 @@
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.lock.LockManager;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;

import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.function.CheckedConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -74,6 +73,8 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE_DEFAULT;


/**
Expand All @@ -86,7 +87,7 @@
public final class ContainerStateManagerImpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adoroszlai , do we need similar striped lock in org.apache.hadoop.hdds.scm.container.ContainerManagerImpl class to be changed as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out ContainerManagerImpl.

ContainerManagerImpl uses a single lock, not a set of "managed" locks. It may be possible to improve that by adding a striped lock for some operations that apply to a single container, but that's a different task.

implements ContainerStateManager {

private final LockManager<ContainerID> lockManager;
private final Striped<ReadWriteLock> stripedLock;

/**
* Logger instance of ContainerStateManagerImpl.
Expand Down Expand Up @@ -155,7 +156,6 @@ private ContainerStateManagerImpl(final Configuration conf,
final Table<ContainerID, ContainerInfo> containerStore,
final DBTransactionBuffer buffer,
final ContainerReplicaPendingOps pendingOps) throws IOException {
ConfigurationSource confSrc = OzoneConfiguration.of(conf);
this.pipelineManager = pipelineManager;
this.containerStore = containerStore;
this.stateMachine = newStateMachine();
Expand All @@ -164,8 +164,9 @@ private ContainerStateManagerImpl(final Configuration conf,
this.lastUsedMap = new ConcurrentHashMap<>();
this.containerStateChangeActions = getContainerStateChangeActions();
this.transactionBuffer = buffer;
this.lockManager =
new LockManager<>(confSrc, true);
this.stripedLock = Striped.readWriteLock(conf.getInt(
OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE,
OZONE_SCM_CONTAINER_LOCK_STRIPE_SIZE_DEFAULT));
this.containerReplicaPendingOps = pendingOps;
initialize();
}
Expand Down Expand Up @@ -277,31 +278,22 @@ private void initialize() throws IOException {

@Override
public Set<ContainerID> getContainerIDs() {
lock.readLock().lock();
try {
try (AutoCloseableLock ignored = readLock()) {
return containers.getAllContainerIDs();
} finally {
lock.readLock().unlock();
}
}

@Override
public Set<ContainerID> getContainerIDs(final LifeCycleState state) {
lock.readLock().lock();
try {
try (AutoCloseableLock ignored = readLock()) {
return containers.getContainerIDsByState(state);
} finally {
lock.readLock().unlock();
}
}

@Override
public ContainerInfo getContainer(final ContainerID id) {
lockManager.readLock(id);
try {
try (AutoCloseableLock ignored = readLock(id)) {
return containers.getContainerInfo(id);
} finally {
lockManager.readUnlock(id);
}
}

Expand All @@ -318,45 +310,35 @@ public void addContainer(final ContainerInfoProto containerInfo)
final ContainerID containerID = container.containerID();
final PipelineID pipelineID = container.getPipelineID();

lock.writeLock().lock();
try {
lockManager.writeLock(containerID);
try {
if (!containers.contains(containerID)) {
ExecutionUtil.create(() -> {
transactionBuffer.addToBuffer(containerStore,
containerID, container);
containers.addContainer(container);
if (pipelineManager.containsPipeline(pipelineID)) {
pipelineManager.addContainerToPipeline(pipelineID, containerID);
} else if (containerInfo.getState().
equals(LifeCycleState.OPEN)) {
// Pipeline should exist, but not
throw new PipelineNotFoundException();
}
//recon may receive report of closed container,
// no corresponding Pipeline can be synced for scm.
// just only add the container.
}).onException(() -> {
containers.removeContainer(containerID);
transactionBuffer.removeFromBuffer(containerStore, containerID);
}).execute();
}
} finally {
lockManager.writeUnlock(containerID);
try (AutoCloseableLock ignoredGlobal = writeLock();
AutoCloseableLock ignored = writeLock(containerID)) {
if (!containers.contains(containerID)) {
ExecutionUtil.create(() -> {
transactionBuffer.addToBuffer(containerStore,
containerID, container);
containers.addContainer(container);
if (pipelineManager.containsPipeline(pipelineID)) {
pipelineManager.addContainerToPipeline(pipelineID, containerID);
} else if (containerInfo.getState().
equals(LifeCycleState.OPEN)) {
// Pipeline should exist, but not
throw new PipelineNotFoundException();
}
//recon may receive report of closed container,
// no corresponding Pipeline can be synced for scm.
// just only add the container.
}).onException(() -> {
containers.removeContainer(containerID);
transactionBuffer.removeFromBuffer(containerStore, containerID);
}).execute();
}
} finally {
lock.writeLock().unlock();
}
}

@Override
public boolean contains(ContainerID id) {
lockManager.readLock(id);
try {
try (AutoCloseableLock ignored = readLock(id)) {
return containers.contains(id);
} finally {
lockManager.readUnlock(id);
}
}

Expand All @@ -367,8 +349,7 @@ public void updateContainerState(final HddsProtos.ContainerID containerID,
// TODO: Remove the protobuf conversion after fixing ContainerStateMap.
final ContainerID id = ContainerID.getFromProtobuf(containerID);

lockManager.writeLock(id);
try {
try (AutoCloseableLock ignored = writeLock(id)) {
if (containers.contains(id)) {
final ContainerInfo oldInfo = containers.getContainerInfo(id);
final LifeCycleState oldState = oldInfo.getState();
Expand All @@ -387,48 +368,37 @@ public void updateContainerState(final HddsProtos.ContainerID containerID,
.accept(oldInfo);
}
}
} finally {
lockManager.writeUnlock(id);
}
}


@Override
public Set<ContainerReplica> getContainerReplicas(final ContainerID id) {
lockManager.readLock(id);
try {
try (AutoCloseableLock ignored = readLock(id)) {
return containers.getContainerReplicas(id);
} finally {
lockManager.readUnlock(id);
}
}

@Override
public void updateContainerReplica(final ContainerID id,
final ContainerReplica replica) {
lockManager.writeLock(id);
try {
try (AutoCloseableLock ignored = writeLock(id)) {
containers.updateContainerReplica(id, replica);
// Clear any pending additions for this replica as we have now seen it.
containerReplicaPendingOps.completeAddReplica(id,
replica.getDatanodeDetails(), replica.getReplicaIndex());
} finally {
lockManager.writeUnlock(id);
}
}

@Override
public void removeContainerReplica(final ContainerID id,
final ContainerReplica replica) {
lockManager.writeLock(id);
try {
try (AutoCloseableLock ignored = writeLock(id)) {
containers.removeContainerReplica(id, replica);
// Remove any pending delete replication operations for the deleted
// replica.
containerReplicaPendingOps.completeDeleteReplica(id,
replica.getDatanodeDetails(), replica.getReplicaIndex());
} finally {
lockManager.writeUnlock(id);
}
}

Expand All @@ -440,8 +410,7 @@ public void updateDeleteTransactionId(
for (Map.Entry<ContainerID, Long> transaction :
deleteTransactionMap.entrySet()) {
ContainerID containerID = transaction.getKey();
try {
lockManager.writeLock(containerID);
try (AutoCloseableLock ignored = writeLock(containerID)) {
final ContainerInfo info = containers.getContainerInfo(
transaction.getKey());
if (info == null) {
Expand All @@ -451,8 +420,6 @@ public void updateDeleteTransactionId(
}
info.updateDeleteTransactionId(transaction.getValue());
transactionBuffer.addToBuffer(containerStore, info.containerID(), info);
} finally {
lockManager.writeUnlock(containerID);
}
}
}
Expand Down Expand Up @@ -503,15 +470,12 @@ private ContainerInfo findContainerWithSpace(final long size,
searchSet) {
// Get the container with space to meet our request.
for (ContainerID id : searchSet) {
try {
lockManager.readLock(id);
try (AutoCloseableLock ignored = readLock(id)) {
final ContainerInfo containerInfo = containers.getContainerInfo(id);
if (containerInfo.getUsedBytes() + size <= this.containerSize) {
containerInfo.updateLastUsedTime();
return containerInfo;
}
} finally {
lockManager.readUnlock(id);
}
}
return null;
Expand All @@ -521,35 +485,25 @@ private ContainerInfo findContainerWithSpace(final long size,
public void removeContainer(final HddsProtos.ContainerID id)
throws IOException {
final ContainerID cid = ContainerID.getFromProtobuf(id);
lock.writeLock().lock();
try {
lockManager.writeLock(cid);
try {
final ContainerInfo containerInfo = containers.getContainerInfo(cid);
ExecutionUtil.create(() -> {
transactionBuffer.removeFromBuffer(containerStore, cid);
containers.removeContainer(cid);
}).onException(() -> containerStore.put(cid, containerInfo)).execute();
} finally {
lockManager.writeUnlock(cid);
}
} finally {
lock.writeLock().unlock();
try (AutoCloseableLock ignoredGlobal = writeLock();
AutoCloseableLock ignored = writeLock(cid)) {
final ContainerInfo containerInfo = containers.getContainerInfo(cid);
ExecutionUtil.create(() -> {
transactionBuffer.removeFromBuffer(containerStore, cid);
containers.removeContainer(cid);
}).onException(() -> containerStore.put(cid, containerInfo)).execute();
}
}

@Override
public void reinitialize(
Table<ContainerID, ContainerInfo> store) throws IOException {
lock.writeLock().lock();
try {
try (AutoCloseableLock ignored = writeLock()) {
close();
this.containerStore = store;
this.containers = new ContainerStateMap();
this.lastUsedMap = new ConcurrentHashMap<>();
initialize();
} finally {
lock.writeLock().unlock();
}
}

Expand All @@ -562,6 +516,22 @@ public void close() throws IOException {
}
}

private AutoCloseableLock readLock() {
return AutoCloseableLock.acquire(lock.readLock());
}

private AutoCloseableLock writeLock() {
return AutoCloseableLock.acquire(lock.writeLock());
}

private AutoCloseableLock readLock(ContainerID id) {
return AutoCloseableLock.acquire(stripedLock.get(id).readLock());
}

private AutoCloseableLock writeLock(ContainerID id) {
return AutoCloseableLock.acquire(stripedLock.get(id).writeLock());
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down