Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ default ContainerInfo getMatchingContainer(long size, String owner,
* @param owner - the user which requires space in its owned container
* @param pipeline - pipeline to which the container should belong.
* @param excludedContainerIDS - containerIds to be excluded.
* @return ContainerInfo for the matching container.
* @return ContainerInfo for the matching container, or null if a container could not be found and could not be
* allocated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use @nullable, which is very straightforward.

*/
ContainerInfo getMatchingContainer(long size, String owner,
Pipeline pipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class ContainerManagerImpl implements ContainerManager {
@SuppressWarnings("java:S2245") // no need for secure random
private final Random random = new Random();

private final long containerSize;

/**
*
*/
Expand Down Expand Up @@ -109,6 +112,9 @@ public ContainerManagerImpl(
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);

containerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);

this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
}

Expand Down Expand Up @@ -346,14 +352,24 @@ public ContainerInfo getMatchingContainer(final long size, final String owner,
synchronized (pipeline.getId()) {
containerIDs = getContainersForOwner(pipeline, owner);
if (containerIDs.size() < getOpenContainerCountPerPipeline(pipeline)) {
allocateContainer(pipeline, owner);
containerIDs = getContainersForOwner(pipeline, owner);
if (pipelineManager.pipelineHasEnoughSpaceForNewContainer(pipeline, containerSize)) {
allocateContainer(pipeline, owner);
containerIDs = getContainersForOwner(pipeline, owner);
} else {
LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the if (LOG.isDebugEnabled) check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the LOG.isDebugEnabled check isn't needed when using slf4j with "{}" style logging. The log string is evaluated after the debug method itself calls isDebugEnabled().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea if the log parameters are just simple references to be passed, the isDebugEnabled() is no needed as the logger does it internally.

Copy link
Contributor

@ChenSammi ChenSammi Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know that slf4j with "{}" will delay the string construction, thanks!

pipeline, containerSize);
}
}
containerIDs.removeAll(excludedContainerIDs);
containerInfo = containerStateManager.getMatchingContainer(
size, owner, pipeline.getId(), containerIDs);
if (containerInfo == null) {
containerInfo = allocateContainer(pipeline, owner);
if (pipelineManager.pipelineHasEnoughSpaceForNewContainer(pipeline, containerSize)) {
containerInfo = allocateContainer(pipeline, owner);
} else {
LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.",
pipeline, containerSize);
}
}
return containerInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,13 @@ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
* Release write lock.
*/
void releaseWriteLock();

/**
* Checks whether all Datanodes in the specified pipeline have greater than the specified space, containerSize.
* @param pipeline pipeline to check
* @param containerSize the required amount of space
* @return false if all the volumes on any Datanode in the pipeline have space less than equal to the specified
* containerSize, otherwise true
*/
boolean pipelineHasEnoughSpaceForNewContainer(Pipeline pipeline, long containerSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineHasEnoughSpaceForNewContainer -> HasEnoughSpace

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand All @@ -53,6 +54,7 @@
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
Expand Down Expand Up @@ -633,6 +635,20 @@ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) {
return false;
}

@Override
public boolean pipelineHasEnoughSpaceForNewContainer(Pipeline pipeline, long containerSize) {
for (DatanodeDetails node : pipeline.getNodes()) {
if (!(node instanceof DatanodeInfo)) {
node = nodeManager.getNode(node.getID());
}
if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize, null)) {
return false;
}
}

return true;
}

/**
* Schedules a fixed interval job to create pipelines.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,14 @@ private ContainerInfo allocateContainer(ReplicationConfig repConfig,

Pipeline newPipeline = pipelineManager.createPipeline(repConfig,
excludedNodes, Collections.emptyList());
// the returned ContainerInfo should not be null (due to not enough space in the Datanodes specifically) because
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could newPipeline be null when DN is close to full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I checked, pipeline will not be null. There's a checkPipeline(pipeline) method that checks for null and throws an exception instead.

// this is a new pipeline and pipeline creation checks for sufficient space in the Datanodes
ContainerInfo container =
containerManager.getMatchingContainer(size, owner, newPipeline);
if (container == null) {
// defensive null handling
throw new IOException("Could not allocate a new container");
}
pipelineManager.openPipeline(newPipeline.getId());
LOG.info("Created and opened new pipeline {}", newPipeline);
return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand All @@ -49,6 +55,7 @@
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
Expand Down Expand Up @@ -77,6 +84,7 @@ public class TestContainerManagerImpl {
private SequenceIdGenerator sequenceIdGen;
private NodeManager nodeManager;
private ContainerReplicaPendingOps pendingOpsMock;
private PipelineManager pipelineManager;

@BeforeAll
static void init() {
Expand All @@ -92,8 +100,7 @@ void setUp() throws Exception {
nodeManager = new MockNodeManager(true, 10);
sequenceIdGen = new SequenceIdGenerator(
conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore));
final PipelineManager pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
pipelineManager = new MockPipelineManager(dbStore, scmhaManager, nodeManager);
pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
ReplicationFactor.THREE));
pendingOpsMock = mock(ContainerReplicaPendingOps.class);
Expand Down Expand Up @@ -125,6 +132,58 @@ void testAllocateContainer() throws Exception {
container.containerID()));
}

/**
* getMatchingContainer allocates a new container in some cases. This test verifies that a container is not
* allocated when nodes in that pipeline don't have enough space for a new container.
*/
@Test
public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException {
long sizeRequired = 256 * 1024 * 1024; // 256 MB
Pipeline pipeline = pipelineManager.getPipelines().iterator().next();
// MockPipelineManager#pipelineHasEnoughSpaceForNewContainer always returns false
// the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer
ContainerInfo container = containerManager
.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNull(container);

// create an EC pipeline to test for EC containers
ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
pipelineManager.createPipeline(ecReplicationConfig);
pipeline = pipelineManager.getPipelines(ecReplicationConfig).iterator().next();
container = containerManager.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNull(container);
}

@Test
public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() throws IOException {
long sizeRequired = 256 * 1024 * 1024; // 256 MB

// create a spy to mock pipelineHasEnoughSpaceForNewContainer to always return true
PipelineManager spyPipelineManager = spy(pipelineManager);
doReturn(true).when(spyPipelineManager)
.pipelineHasEnoughSpaceForNewContainer(any(Pipeline.class), anyLong());

// create a new ContainerManager using the spy
File tempDir = new File(testDir, "tempDir");
OzoneConfiguration conf = SCMTestUtils.getConf(tempDir);
ContainerManager manager = new ContainerManagerImpl(conf,
scmhaManager, sequenceIdGen, spyPipelineManager,
SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock);

Pipeline pipeline = spyPipelineManager.getPipelines().iterator().next();
// the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer
ContainerInfo container = manager
.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNotNull(container);

// create an EC pipeline to test for EC containers
ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
spyPipelineManager.createPipeline(ecReplicationConfig);
pipeline = spyPipelineManager.getPipelines(ecReplicationConfig).iterator().next();
container = manager.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNotNull(container);
}

@Test
void testUpdateContainerState() throws Exception {
final ContainerInfo container = containerManager.allocateContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,9 @@ public void releaseWriteLock() {
public boolean isPipelineCreationFrozen() {
return false;
}

@Override
public boolean pipelineHasEnoughSpaceForNewContainer(Pipeline pipeline, long containerSize) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.pipeline;

import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
Expand Down Expand Up @@ -49,6 +50,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
Expand All @@ -59,17 +61,21 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand All @@ -88,6 +94,7 @@
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
Expand All @@ -111,6 +118,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/**
* Tests for PipelineManagerImpl.
Expand Down Expand Up @@ -933,6 +941,79 @@ public void testCreatePipelineForRead() throws IOException {
}
}

/**
* {@link PipelineManager#pipelineHasEnoughSpaceForNewContainer(Pipeline, long)} should return false if all the
* volumes on any Datanode in the pipeline have less than equal to the space required for creating a new container.
*/
@Test
public void testPipelineHasEnoughSpaceForNewContainer() throws IOException {
// create a Mock NodeManager, the MockNodeManager class doesn't work for this test
NodeManager mockedNodeManager = Mockito.mock(NodeManager.class);
PipelineManagerImpl pipelineManager = PipelineManagerImpl.newPipelineManager(conf,
SCMHAManagerStub.getInstance(true),
mockedNodeManager,
SCMDBDefinition.PIPELINES.getTable(dbStore),
new EventQueue(),
scmContext,
serviceManager,
testClock);

Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setNodes(ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails(),
MockDatanodeDetails.randomDatanodeDetails()))
.setState(OPEN)
.setReplicationConfig(ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, THREE))
.build();
List<DatanodeDetails> nodes = pipeline.getNodes();
assertEquals(3, nodes.size());

long containerSize = 100L;

// Case 1: All nodes have enough space.
List<DatanodeInfo> datanodeInfoList = new ArrayList<>();
for (DatanodeDetails dn : nodes) {
// the method being tested needs NodeManager to return DatanodeInfo because DatanodeInfo has storage
// information (it extends DatanodeDetails)
DatanodeInfo info = new DatanodeInfo(dn, null, null);
info.updateStorageReports(createStorageReports(200L, 200L, 10L));
doReturn(info).when(mockedNodeManager).getNode(dn.getID());
datanodeInfoList.add(info);
}
assertTrue(pipelineManager.pipelineHasEnoughSpaceForNewContainer(pipeline, containerSize));

// Case 2: One node does not have enough space.
/*
Interestingly, SCMCommonPlacementPolicy#hasEnoughSpace returns false if exactly the required amount of space
is available. Which means it won't allow creating a pipeline on a node if all volumes have exactly 5 GB
available. We follow the same behavior here in the case of a new replica.

So here, remaining - committed == containerSize, and pipelineHasEnoughSpaceForNewContainer returns false.
TODO should this return true instead?
*/
datanodeInfoList.get(0).updateStorageReports(createStorageReports(200L, 120L, 20L));
assertFalse(pipelineManager.pipelineHasEnoughSpaceForNewContainer(pipeline, containerSize));

// Case 3: All nodes do not have enough space.
for (DatanodeInfo info : datanodeInfoList) {
info.updateStorageReports(createStorageReports(200L, 100L, 20L));
}
assertFalse(pipelineManager.pipelineHasEnoughSpaceForNewContainer(pipeline, containerSize));
}

private List<StorageReportProto> createStorageReports(long capacity, long remaining, long committed) {
return Collections.singletonList(
StorageReportProto.newBuilder()
.setStorageUuid(UUID.randomUUID().toString())
.setStorageLocation("test")
.setCapacity(capacity)
.setRemaining(remaining)
.setCommitted(committed)
.setScmUsed(200L - remaining)
.build());
}

private Set<ContainerReplica> createContainerReplicasList(
List <DatanodeDetails> dns) {
Set<ContainerReplica> replicas = new HashSet<>();
Expand Down