Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
36469a4
Make volume space check and reservation as an atomic operation in Vol…
peterxcli Apr 26, 2025
1b3cb82
Explictly reserve/release commit space
peterxcli Apr 28, 2025
55b9ca9
Add coverge of commit space reserve/release cases for container impor…
peterxcli Apr 29, 2025
8e59cf6
Add tests for container creation commit space reservation and excepti…
peterxcli Apr 29, 2025
85ee3bc
original behaviour didnt consider container state when committing space
peterxcli Apr 29, 2025
f50134f
set commit right after the volume is choosen and handle release case …
peterxcli May 7, 2025
35f47d8
container reader commit space after adding container and some refactor
peterxcli May 7, 2025
57f254d
Add test in TestContainerReader
peterxcli May 7, 2025
47cb71d
Addressed comments
peterxcli May 7, 2025
da81fe7
Merge remote-tracking branch 'upstream/master' into hdds12810-make-vo…
peterxcli May 7, 2025
45c0045
Move release commit byte check logic to TestKeyValueHandler
peterxcli May 7, 2025
f52069b
Use threadlocal random for Capacity and Remove AtomicInteger as Capac…
peterxcli May 8, 2025
c9d7470
Release older and commit newer if container got conflict and swapped
peterxcli May 9, 2025
bb94094
Set commit space to true right after volume is choosen in KeyValueCon…
peterxcli May 13, 2025
53a5b61
Merge remote-tracking branch 'upstream/master' into hdds12810-make-vo…
peterxcli May 13, 2025
83d397b
Set volume first, too, to prevent releaseSpace throw NPE when getVolume
peterxcli May 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.apache.hadoop.ozone.OzoneConsts.STATE;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import jakarta.annotation.Nullable;
Expand Down Expand Up @@ -214,21 +213,18 @@ public synchronized void setState(ContainerDataProto.State state) {
(state != oldState)) {
releaseCommitSpace();
}
}

/**
* commit space when container transitions (back) to Open.
* when? perhaps closing a container threw an exception
*/
if ((state == ContainerDataProto.State.OPEN) &&
(state != oldState)) {
Preconditions.checkState(getMaxSize() > 0);
commitSpace();
}
public boolean isCommittedSpace() {
return committedSpace;
}

@VisibleForTesting
void setCommittedSpace(boolean committedSpace) {
this.committedSpace = committedSpace;
public void setCommittedSpace(boolean committed) {
if (committed) {
//we don't expect duplicate space commit
Preconditions.checkState(!committedSpace);
}
committedSpace = committed;
}

/**
Expand Down Expand Up @@ -356,7 +352,7 @@ public synchronized void closeContainer() {
setState(ContainerDataProto.State.CLOSED);
}

private void releaseCommitSpace() {
public void releaseCommitSpace() {
long unused = getMaxSize() - getBytesUsed();

// only if container size < max size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ private boolean addContainer(Container<?> container, boolean overwrite) throws
throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION);
}
missingContainerSet.remove(containerId);
// wish we could have done this from ContainerData.setState
container.getContainerData().commitSpace();
if (container.getContainerData().getState() == RECOVERING) {
recoveringContainerMap.put(
clock.millis() + recoveringTimeout, containerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class CapacityVolumeChoosingPolicy implements VolumeChoosingPolicy {
private final Random random = new Random();

@Override
public HddsVolume chooseVolume(List<HddsVolume> volumes,
public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {

// No volumes available to choose from
Expand All @@ -69,9 +69,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
}

int count = volumesWithEnoughSpace.size();
if (count == 1) {
return volumesWithEnoughSpace.get(0);
} else {
HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
if (count > 1) {
// Even if we don't have too many volumes in volumesWithEnoughSpace, this
// algorithm will still help us choose the volume with larger
// available space than other volumes.
Expand All @@ -93,7 +92,9 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
- firstVolume.getCommittedBytes();
long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
- secondVolume.getCommittedBytes();
return firstAvailable < secondAvailable ? secondVolume : firstVolume;
selectedVolume = firstAvailable < secondAvailable ? secondVolume : firstVolume;
}
selectedVolume.incCommittedBytes(maxContainerSize);
return selectedVolume;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {
private AtomicInteger nextVolumeIndex = new AtomicInteger(0);

@Override
public HddsVolume chooseVolume(List<HddsVolume> volumes,
public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {

// No volumes available to choose from
Expand All @@ -68,6 +68,7 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
if (hasEnoughSpace) {
logIfSomeVolumesOutOfSpace(filter, LOG);
nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex);
volume.incCommittedBytes(maxContainerSize);
return volume;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
CONTAINER_INTERNAL_ERROR);
}

Boolean exceptionThrown = false;
try {
String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
// Set volume before getContainerDBFile(), because we may need the
Expand Down Expand Up @@ -207,18 +208,24 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
File containerFile = getContainerFile();
createContainerFile(containerFile);

// commit space has been reserved by volumeChoosingPolicy
containerData.setCommittedSpace(true);

return;
} catch (StorageContainerException ex) {
exceptionThrown = true;
if (containerMetaDataPath != null
&& containerMetaDataPath.getParentFile().exists()) {
FileUtil.fullyDelete(containerMetaDataPath.getParentFile());
}
throw ex;
} catch (FileAlreadyExistsException ex) {
exceptionThrown = true;
throw new StorageContainerException("Container creation failed " +
"because ContainerFile already exists", ex,
CONTAINER_ALREADY_EXISTS);
} catch (IOException ex) {
exceptionThrown = true;
// This is a general catch all - no space left of device, which should
// not happen as the volume Choosing policy should filter out full
// disks, but it may still be possible if the disk quickly fills,
Expand All @@ -236,6 +243,10 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
"Failed to create " + containerData + " on all volumes: " + volumeSet.getVolumesList(),
ex, CONTAINER_INTERNAL_ERROR);
}
} finally {
if (exceptionThrown) {
containerData.releaseCommitSpace();
}
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.DELETED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -202,50 +203,55 @@ public void verifyAndFixupContainerData(ContainerData containerData)
throws IOException {
switch (containerData.getContainerType()) {
case KeyValueContainer:
if (containerData instanceof KeyValueContainerData) {
KeyValueContainerData kvContainerData = (KeyValueContainerData)
containerData;
containerData.setVolume(hddsVolume);
KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
config);
if (kvContainer.getContainerState() == RECOVERING) {
if (shouldDelete) {
// delete Ratis replicated RECOVERING containers
if (kvContainer.getContainerData().getReplicaIndex() == 0) {
cleanupContainer(hddsVolume, kvContainer);
} else {
kvContainer.markContainerUnhealthy();
LOG.info("Stale recovering container {} marked UNHEALTHY",
kvContainerData.getContainerID());
containerSet.addContainer(kvContainer);
}
}
return;
}
if (kvContainer.getContainerState() == DELETED) {
if (shouldDelete) {
cleanupContainer(hddsVolume, kvContainer);
}
return;
}
try {
containerSet.addContainer(kvContainer);
} catch (StorageContainerException e) {
if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
throw e;
}
if (shouldDelete) {
resolveDuplicate((KeyValueContainer) containerSet.getContainer(
kvContainer.getContainerData().getContainerID()), kvContainer);
}
}
} else {
if (!(containerData instanceof KeyValueContainerData)) {
throw new StorageContainerException("Container File is corrupted. " +
"ContainerType is KeyValueContainer but cast to " +
"KeyValueContainerData failed. ",
ContainerProtos.Result.CONTAINER_METADATA_ERROR);
}

KeyValueContainerData kvContainerData = (KeyValueContainerData)
containerData;
containerData.setVolume(hddsVolume);
KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
config);
if (kvContainer.getContainerState() == RECOVERING) {
if (shouldDelete) {
// delete Ratis replicated RECOVERING containers
if (kvContainer.getContainerData().getReplicaIndex() == 0) {
cleanupContainer(hddsVolume, kvContainer);
} else {
kvContainer.markContainerUnhealthy();
LOG.info("Stale recovering container {} marked UNHEALTHY",
kvContainerData.getContainerID());
containerSet.addContainer(kvContainer);
}
}
return;
} else if (kvContainer.getContainerState() == DELETED) {
if (shouldDelete) {
cleanupContainer(hddsVolume, kvContainer);
}
return;
}

try {
if (kvContainer.getContainerState() == OPEN) {
// only open container would get new data written in
containerData.commitSpace();
}
containerSet.addContainer(kvContainer);
} catch (StorageContainerException e) {
if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
throw e;
}
if (shouldDelete) {
resolveDuplicate((KeyValueContainer) containerSet.getContainer(
kvContainer.getContainerData().getContainerID()), kvContainer);
}
containerData.releaseCommitSpace();
}
break;
default:
throw new StorageContainerException("Unrecognized ContainerType " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public boolean isAllowedContainerImport(long containerID) {
}

public void importContainer(long containerID, Path tarFilePath,
HddsVolume hddsVolume, CopyContainerCompression compression)
HddsVolume targetVolume, CopyContainerCompression compression)
throws IOException {
if (!importContainerProgress.add(containerID)) {
deleteFileQuietely(tarFilePath);
Expand All @@ -106,11 +106,6 @@ public void importContainer(long containerID, Path tarFilePath,
ContainerProtos.Result.CONTAINER_EXISTS);
}

HddsVolume targetVolume = hddsVolume;
if (targetVolume == null) {
targetVolume = chooseNextVolume();
}

KeyValueContainerData containerData;
TarContainerPacker packer = getPacker(compression);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,15 +81,6 @@ public void replicate(ReplicationTask task) {

try {
targetVolume = containerImporter.chooseNextVolume();
// Increment committed bytes and verify if it doesn't cross the space left.
targetVolume.incCommittedBytes(containerSize * 2);
StorageLocationReport volumeReport = targetVolume.getReport();
// Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter
if (volumeReport.getUsableSpace() <= 0) {
LOG.warn("Container {} replication was unsuccessful, no space left on volume {}", containerID, volumeReport);
task.setStatus(Status.FAILED);
return;
}
// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
Expand All @@ -114,7 +105,7 @@ public void replicate(ReplicationTask task) {
task.setStatus(Status.FAILED);
} finally {
if (targetVolume != null) {
targetVolume.incCommittedBytes(-containerSize * 2);
targetVolume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(containerSize));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.util.DiskChecker;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,16 +86,6 @@ public void onNext(SendContainerRequest req) {
if (containerId == -1) {
containerId = req.getContainerID();
volume = importer.chooseNextVolume();
// Increment committed bytes and verify if it doesn't cross the space left.
volume.incCommittedBytes(importer.getDefaultContainerSize() * 2);
StorageLocationReport volumeReport = volume.getReport();
// Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter
if (volumeReport.getUsableSpace() <= 0) {
volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
LOG.warn("Container {} import was unsuccessful, no space left on volume {}", containerId, volumeReport);
volume = null;
throw new DiskChecker.DiskOutOfSpaceException("No more available volumes");
}

Path dir = ContainerImporter.getUntarDirectory(volume);
Files.createDirectories(dir);
Expand Down Expand Up @@ -130,7 +119,7 @@ public void onError(Throwable t) {
responseObserver.onError(t);
} finally {
if (volume != null) {
volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
volume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(importer.getDefaultContainerSize()));
}
}
}
Expand Down Expand Up @@ -159,7 +148,7 @@ public void onCompleted() {
}
} finally {
if (volume != null) {
volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
volume.incCommittedBytes(-HddsServerUtil.requiredReplicationSpace(importer.getDefaultContainerSize()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,11 @@ private KeyValueContainer addContainer(ContainerSet cSet, long cID)
data.addMetadata("VOLUME", "shire");
data.addMetadata("owner)", "bilbo");
KeyValueContainer container = new KeyValueContainer(data, conf);
commitBytesBefore = StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()).get(0).getCommittedBytes();

container.create(volumeSet, volumeChoosingPolicy, SCM_ID);
commitBytesBefore = container.getContainerData()
.getVolume().getCommittedBytes();
cSet.addContainer(container);

commitBytesAfter = container.getContainerData()
.getVolume().getCommittedBytes();
commitIncrement = commitBytesAfter - commitBytesBefore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,15 @@ public void testVolumeChoosingPolicyFactory()
VolumeChoosingPolicyFactory.getPolicy(CONF).getClass());
}

@Test
public void testVolumeCommittedSpace() throws Exception {
Map<HddsVolume, Long> initialCommittedSpace = new HashMap<>();
volumes.forEach(vol ->
initialCommittedSpace.put(vol, vol.getCommittedBytes()));

HddsVolume selectedVolume = policy.chooseVolume(volumes, 50);

assertEquals(initialCommittedSpace.get(selectedVolume) + 50,
selectedVolume.getCommittedBytes());
}
}
Loading