Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.container;

import jakarta.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -189,8 +190,10 @@ default ContainerInfo getMatchingContainer(long size, String owner,
* @param owner - the user which requires space in its owned container
* @param pipeline - pipeline to which the container should belong.
* @param excludedContainerIDS - containerIds to be excluded.
* @return ContainerInfo for the matching container.
* @return ContainerInfo for the matching container, or null if a container could not be found and could not be
* allocated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use @nullable, which is very straightforward.

*/
@Nullable
ContainerInfo getMatchingContainer(long size, String owner,
Pipeline pipeline,
Set<ContainerID> excludedContainerIDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class ContainerManagerImpl implements ContainerManager {
@SuppressWarnings("java:S2245") // no need for secure random
private final Random random = new Random();

private final long maxContainerSize;

/**
*
*/
Expand Down Expand Up @@ -109,6 +112,9 @@ public ContainerManagerImpl(
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);

maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);

this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
}

Expand Down Expand Up @@ -346,14 +352,24 @@ public ContainerInfo getMatchingContainer(final long size, final String owner,
synchronized (pipeline.getId()) {
containerIDs = getContainersForOwner(pipeline, owner);
if (containerIDs.size() < getOpenContainerCountPerPipeline(pipeline)) {
allocateContainer(pipeline, owner);
containerIDs = getContainersForOwner(pipeline, owner);
if (pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) {
allocateContainer(pipeline, owner);
containerIDs = getContainersForOwner(pipeline, owner);
} else {
LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the if (LOG.isDebugEnabled) check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the LOG.isDebugEnabled check isn't needed when using slf4j with "{}" style logging. The log string is evaluated after the debug method itself calls isDebugEnabled().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea if the log parameters are just simple references to be passed, the isDebugEnabled() is no needed as the logger does it internally.

Copy link
Contributor

@ChenSammi ChenSammi Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know that slf4j with "{}" will delay the string construction, thanks!

pipeline, maxContainerSize);
}
}
containerIDs.removeAll(excludedContainerIDs);
containerInfo = containerStateManager.getMatchingContainer(
size, owner, pipeline.getId(), containerIDs);
if (containerInfo == null) {
containerInfo = allocateContainer(pipeline, owner);
if (pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) {
containerInfo = allocateContainer(pipeline, owner);
} else {
LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.",
pipeline, maxContainerSize);
}
}
return containerInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ Map<SCMCommandProto.Type, Integer> getTotalDatanodeCommandCounts(
/** @return the datanode of the given id if it exists; otherwise, return null. */
@Nullable DatanodeDetails getNode(@Nullable DatanodeID id);

@Nullable
DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails);

/**
* Given datanode address(Ipaddress or hostname), returns a list of
* DatanodeDetails for the datanodes running at that address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.math.RoundingMode;
import java.net.InetAddress;
Expand Down Expand Up @@ -1713,6 +1714,15 @@ public DatanodeInfo getNode(DatanodeID id) {
}
}

@Override
@Nullable
public DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails) {
if (datanodeDetails == null) {
return null;
}
return getNode(datanodeDetails.getID());
}

/**
* Given datanode address(Ipaddress or hostname), return a list of
* DatanodeDetails for the datanodes registered on that address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,13 @@ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
* Release write lock.
*/
void releaseWriteLock();

/**
* Checks whether all Datanodes in the specified pipeline have greater than the specified space, containerSize.
* @param pipeline pipeline to check
* @param containerSize the required amount of space
* @return false if all the volumes on any Datanode in the pipeline have space less than equal to the specified
* containerSize, otherwise true
*/
boolean hasEnoughSpace(Pipeline pipeline, long containerSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
Expand All @@ -53,6 +54,7 @@
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
Expand Down Expand Up @@ -633,6 +635,20 @@ private boolean isOpenWithUnregisteredNodes(Pipeline pipeline) {
return false;
}

@Override
public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) {
for (DatanodeDetails node : pipeline.getNodes()) {
if (!(node instanceof DatanodeInfo)) {
node = nodeManager.getDatanodeInfo(node);
}
if (!SCMCommonPlacementPolicy.hasEnoughSpace(node, 0, containerSize, null)) {
return false;
}
}

return true;
}

/**
* Schedules a fixed interval job to create pipelines.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,14 @@ private ContainerInfo allocateContainer(ReplicationConfig repConfig,

Pipeline newPipeline = pipelineManager.createPipeline(repConfig,
excludedNodes, Collections.emptyList());
// the returned ContainerInfo should not be null (due to not enough space in the Datanodes specifically) because
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could newPipeline be null when DN is close to full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I checked, pipeline will not be null. There's a checkPipeline(pipeline) method that checks for null and throws an exception instead.

// this is a new pipeline and pipeline creation checks for sufficient space in the Datanodes
ContainerInfo container =
containerManager.getMatchingContainer(size, owner, newPipeline);
if (container == null) {
// defensive null handling
throw new IOException("Could not allocate a new container");
}
pipelineManager.openPipeline(newPipeline.getId());
LOG.info("Created and opened new pipeline {}", newPipeline);
return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -253,6 +254,19 @@ public static StorageReportProto createStorageReport(DatanodeID nodeId, String p
StorageTypeProto.DISK);
}

public static List<StorageReportProto> createStorageReports(DatanodeID nodeID, long capacity, long remaining,
long committed) {
return Collections.singletonList(
StorageReportProto.newBuilder()
.setStorageUuid(nodeID.toString())
.setStorageLocation("test")
.setCapacity(capacity)
.setRemaining(remaining)
.setCommitted(committed)
.setScmUsed(200L - remaining)
.build());
}

public static StorageReportProto createStorageReport(DatanodeID nodeId, String path,
long capacity, long used, long remaining, StorageTypeProto type) {
return createStorageReport(nodeId, path, capacity, used, remaining,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;

import jakarta.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -833,6 +834,21 @@ public DatanodeDetails getNode(DatanodeID id) {
return node == null ? null : (DatanodeDetails)node;
}

@Nullable
@Override
public DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails) {
DatanodeDetails node = getNode(datanodeDetails.getID());
if (node == null) {
return null;
}

DatanodeInfo datanodeInfo = new DatanodeInfo(datanodeDetails, NodeStatus.inServiceHealthy(), null);
long capacity = 50L * 1024 * 1024 * 1024;
datanodeInfo.updateStorageReports(HddsTestUtils.createStorageReports(datanodeInfo.getID(), capacity, capacity,
0L));
return datanodeInfo;
}

@Override
public List<DatanodeDetails> getNodesByAddress(String address) {
List<DatanodeDetails> results = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.container;

import jakarta.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -347,6 +348,12 @@ public DatanodeDetails getNode(DatanodeID id) {
return null;
}

@Nullable
@Override
public DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails) {
return null;
}

@Override
public List<DatanodeDetails> getNodesByAddress(String address) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand All @@ -49,6 +55,7 @@
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
Expand Down Expand Up @@ -77,6 +84,7 @@ public class TestContainerManagerImpl {
private SequenceIdGenerator sequenceIdGen;
private NodeManager nodeManager;
private ContainerReplicaPendingOps pendingOpsMock;
private PipelineManager pipelineManager;

@BeforeAll
static void init() {
Expand All @@ -92,8 +100,7 @@ void setUp() throws Exception {
nodeManager = new MockNodeManager(true, 10);
sequenceIdGen = new SequenceIdGenerator(
conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore));
final PipelineManager pipelineManager =
new MockPipelineManager(dbStore, scmhaManager, nodeManager);
pipelineManager = new MockPipelineManager(dbStore, scmhaManager, nodeManager);
pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
ReplicationFactor.THREE));
pendingOpsMock = mock(ContainerReplicaPendingOps.class);
Expand Down Expand Up @@ -121,6 +128,58 @@ void testAllocateContainer() throws Exception {
container.containerID()));
}

/**
* getMatchingContainer allocates a new container in some cases. This test verifies that a container is not
* allocated when nodes in that pipeline don't have enough space for a new container.
*/
@Test
public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException {
long sizeRequired = 256 * 1024 * 1024; // 256 MB
Pipeline pipeline = pipelineManager.getPipelines().iterator().next();
// MockPipelineManager#hasEnoughSpace always returns false
// the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer
ContainerInfo container = containerManager
.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNull(container);

// create an EC pipeline to test for EC containers
ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
pipelineManager.createPipeline(ecReplicationConfig);
pipeline = pipelineManager.getPipelines(ecReplicationConfig).iterator().next();
container = containerManager.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNull(container);
}

@Test
public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() throws IOException {
long sizeRequired = 256 * 1024 * 1024; // 256 MB

// create a spy to mock hasEnoughSpace to always return true
PipelineManager spyPipelineManager = spy(pipelineManager);
doReturn(true).when(spyPipelineManager)
.hasEnoughSpace(any(Pipeline.class), anyLong());

// create a new ContainerManager using the spy
File tempDir = new File(testDir, "tempDir");
OzoneConfiguration conf = SCMTestUtils.getConf(tempDir);
ContainerManager manager = new ContainerManagerImpl(conf,
scmhaManager, sequenceIdGen, spyPipelineManager,
SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock);

Pipeline pipeline = spyPipelineManager.getPipelines().iterator().next();
// the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer
ContainerInfo container = manager
.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNotNull(container);

// create an EC pipeline to test for EC containers
ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
spyPipelineManager.createPipeline(ecReplicationConfig);
pipeline = spyPipelineManager.getPipelines(ecReplicationConfig).iterator().next();
container = manager.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
assertNotNull(container);
}

@Test
void testUpdateContainerState() throws Exception {
final ContainerInfo container = containerManager.allocateContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,9 @@ public void releaseWriteLock() {
public boolean isPipelineCreationFrozen() {
return false;
}

@Override
public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) {
return false;
}
}
Loading