Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
36469a4
Make volume space check and reservation as an atomic operation in Vol…
peterxcli Apr 26, 2025
1b3cb82
Explictly reserve/release commit space
peterxcli Apr 28, 2025
55b9ca9
Add coverge of commit space reserve/release cases for container impor…
peterxcli Apr 29, 2025
8e59cf6
Add tests for container creation commit space reservation and excepti…
peterxcli Apr 29, 2025
85ee3bc
original behaviour didnt consider container state when committing space
peterxcli Apr 29, 2025
f50134f
set commit right after the volume is choosen and handle release case …
peterxcli May 7, 2025
35f47d8
container reader commit space after adding container and some refactor
peterxcli May 7, 2025
57f254d
Add test in TestContainerReader
peterxcli May 7, 2025
47cb71d
Addressed comments
peterxcli May 7, 2025
da81fe7
Merge remote-tracking branch 'upstream/master' into hdds12810-make-vo…
peterxcli May 7, 2025
45c0045
Move release commit byte check logic to TestKeyValueHandler
peterxcli May 7, 2025
f52069b
Use threadlocal random for Capacity and Remove AtomicInteger as Capac…
peterxcli May 8, 2025
c9d7470
Release older and commit newer if container got conflict and swapped
peterxcli May 9, 2025
bb94094
Set commit space to true right after volume is choosen in KeyValueCon…
peterxcli May 13, 2025
53a5b61
Merge remote-tracking branch 'upstream/master' into hdds12810-make-vo…
peterxcli May 13, 2025
83d397b
Set volume first, too, to prevent releaseSpace throw NPE when getVolume
peterxcli May 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.apache.hadoop.ozone.OzoneConsts.STATE;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import jakarta.annotation.Nullable;
Expand Down Expand Up @@ -214,21 +213,14 @@ public synchronized void setState(ContainerDataProto.State state) {
(state != oldState)) {
releaseCommitSpace();
}
}

/**
* commit space when container transitions (back) to Open.
* when? perhaps closing a container threw an exception
*/
if ((state == ContainerDataProto.State.OPEN) &&
(state != oldState)) {
Preconditions.checkState(getMaxSize() > 0);
commitSpace();
}
public boolean isCommittedSpace() {
return committedSpace;
}

@VisibleForTesting
void setCommittedSpace(boolean committedSpace) {
this.committedSpace = committedSpace;
public void setCommittedSpace(boolean committed) {
committedSpace = committed;
}

/**
Expand Down Expand Up @@ -356,7 +348,7 @@ public synchronized void closeContainer() {
setState(ContainerDataProto.State.CLOSED);
}

private void releaseCommitSpace() {
public void releaseCommitSpace() {
long unused = getMaxSize() - getBytesUsed();

// only if container size < max size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ private boolean addContainer(Container<?> container, boolean overwrite) throws
throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION);
}
missingContainerSet.remove(containerId);
// wish we could have done this from ContainerData.setState
container.getContainerData().commitSpace();
if (container.getContainerData().getState() == RECOVERING) {
recoveringContainerMap.put(
clock.millis() + recoveringTimeout, containerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
Expand All @@ -44,11 +44,8 @@ public class CapacityVolumeChoosingPolicy implements VolumeChoosingPolicy {
private static final Logger LOG = LoggerFactory.getLogger(
CapacityVolumeChoosingPolicy.class);

// Stores the index of the next volume to be returned.
private final Random random = new Random();

@Override
public HddsVolume chooseVolume(List<HddsVolume> volumes,
public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {

// No volumes available to choose from
Expand All @@ -69,9 +66,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
}

int count = volumesWithEnoughSpace.size();
if (count == 1) {
return volumesWithEnoughSpace.get(0);
} else {
HddsVolume selectedVolume = volumesWithEnoughSpace.get(0);
if (count > 1) {
// Even if we don't have too many volumes in volumesWithEnoughSpace, this
// algorithm will still help us choose the volume with larger
// available space than other volumes.
Expand All @@ -83,8 +79,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
// 4. vol2 + vol2: 25%, result is vol2
// So we have a total of 75% chances to choose vol1, which meets our
// expectation.
int firstIndex = random.nextInt(count);
int secondIndex = random.nextInt(count);
int firstIndex = ThreadLocalRandom.current().nextInt(count);
int secondIndex = ThreadLocalRandom.current().nextInt(count);

HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex);
HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex);
Expand All @@ -93,7 +89,9 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,
- firstVolume.getCommittedBytes();
long secondAvailable = secondVolume.getCurrentUsage().getAvailable()
- secondVolume.getCommittedBytes();
return firstAvailable < secondAvailable ? secondVolume : firstVolume;
selectedVolume = firstAvailable < secondAvailable ? secondVolume : firstVolume;
}
selectedVolume.incCommittedBytes(maxContainerSize);
return selectedVolume;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.slf4j.Logger;
Expand All @@ -38,10 +37,10 @@ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {
RoundRobinVolumeChoosingPolicy.class);

// Stores the index of the next volume to be returned.
private AtomicInteger nextVolumeIndex = new AtomicInteger(0);
private int nextVolumeIndex = 0;

@Override
public HddsVolume chooseVolume(List<HddsVolume> volumes,
public synchronized HddsVolume chooseVolume(List<HddsVolume> volumes,
long maxContainerSize) throws IOException {

// No volumes available to choose from
Expand All @@ -53,8 +52,7 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,

// since volumes could've been removed because of the failure
// make sure we are not out of bounds
int nextIndex = nextVolumeIndex.get();
int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0;
int currentVolumeIndex = nextVolumeIndex < volumes.size() ? nextVolumeIndex : 0;

int startVolumeIndex = currentVolumeIndex;

Expand All @@ -67,7 +65,8 @@ public HddsVolume chooseVolume(List<HddsVolume> volumes,

if (hasEnoughSpace) {
logIfSomeVolumesOutOfSpace(filter, LOG);
nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex);
nextVolumeIndex = currentVolumeIndex;
volume.incCommittedBytes(maxContainerSize);
return volume;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,15 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
= StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList());
while (true) {
HddsVolume containerVolume;
String hddsVolumeDir;
try {
containerVolume = volumeChoosingPolicy.chooseVolume(volumes, maxSize);
hddsVolumeDir = containerVolume.getHddsRootDir().toString();
// Set volume before getContainerDBFile(), because we may need the
// volume to deduce the db file.
containerData.setVolume(containerVolume);
// commit bytes have been reserved in volumeChoosingPolicy#chooseVolume
containerData.setCommittedSpace(true);
} catch (DiskOutOfSpaceException ex) {
throw new StorageContainerException("Container creation failed, " +
"due to disk out of space", ex, DISK_OUT_OF_SPACE);
Expand All @@ -169,11 +176,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
}

try {
String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
// Set volume before getContainerDBFile(), because we may need the
// volume to deduce the db file.
containerData.setVolume(containerVolume);

long containerID = containerData.getContainerID();
String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID(
containerVolume, clusterId);
Expand Down Expand Up @@ -206,7 +208,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy
// Create .container file
File containerFile = getContainerFile();
createContainerFile(containerFile);

return;
} catch (StorageContainerException ex) {
if (containerMetaDataPath != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ ContainerCommandResponseProto handleCreateContainer(
LOG.debug("Container already exists. container Id {}", containerID);
}
} catch (StorageContainerException ex) {
newContainerData.releaseCommitSpace();
return ContainerUtils.logAndReturnError(LOG, ex, request);
} finally {
containerIdLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,49 +202,56 @@ public void verifyAndFixupContainerData(ContainerData containerData)
throws IOException {
switch (containerData.getContainerType()) {
case KeyValueContainer:
if (containerData instanceof KeyValueContainerData) {
KeyValueContainerData kvContainerData = (KeyValueContainerData)
containerData;
containerData.setVolume(hddsVolume);
KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
config);
if (kvContainer.getContainerState() == RECOVERING) {
if (shouldDelete) {
// delete Ratis replicated RECOVERING containers
if (kvContainer.getContainerData().getReplicaIndex() == 0) {
cleanupContainer(hddsVolume, kvContainer);
} else {
kvContainer.markContainerUnhealthy();
LOG.info("Stale recovering container {} marked UNHEALTHY",
kvContainerData.getContainerID());
containerSet.addContainer(kvContainer);
}
}
return;
}
if (kvContainer.getContainerState() == DELETED) {
if (shouldDelete) {
if (!(containerData instanceof KeyValueContainerData)) {
throw new StorageContainerException("Container File is corrupted. " +
"ContainerType is KeyValueContainer but cast to " +
"KeyValueContainerData failed. ",
ContainerProtos.Result.CONTAINER_METADATA_ERROR);
}

KeyValueContainerData kvContainerData = (KeyValueContainerData)
containerData;
containerData.setVolume(hddsVolume);
KeyValueContainerUtil.parseKVContainerData(kvContainerData, config);
KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData,
config);
if (kvContainer.getContainerState() == RECOVERING) {
if (shouldDelete) {
// delete Ratis replicated RECOVERING containers
if (kvContainer.getContainerData().getReplicaIndex() == 0) {
cleanupContainer(hddsVolume, kvContainer);
} else {
kvContainer.markContainerUnhealthy();
LOG.info("Stale recovering container {} marked UNHEALTHY",
kvContainerData.getContainerID());
containerSet.addContainer(kvContainer);
}
return;
}
try {
containerSet.addContainer(kvContainer);
} catch (StorageContainerException e) {
if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
throw e;
}
if (shouldDelete) {
resolveDuplicate((KeyValueContainer) containerSet.getContainer(
kvContainer.getContainerData().getContainerID()), kvContainer);
return;
} else if (kvContainer.getContainerState() == DELETED) {
if (shouldDelete) {
cleanupContainer(hddsVolume, kvContainer);
}
return;
}

try {
containerSet.addContainer(kvContainer);
// this should be the last step of this block
containerData.commitSpace();
} catch (StorageContainerException e) {
if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) {
throw e;
}
if (shouldDelete) {
KeyValueContainer existing = (KeyValueContainer) containerSet.getContainer(
kvContainer.getContainerData().getContainerID());
boolean swapped = resolveDuplicate(existing, kvContainer);
if (swapped) {
existing.getContainerData().releaseCommitSpace();
kvContainer.getContainerData().commitSpace();
}
}
} else {
throw new StorageContainerException("Container File is corrupted. " +
"ContainerType is KeyValueContainer but cast to " +
"KeyValueContainerData failed. ",
ContainerProtos.Result.CONTAINER_METADATA_ERROR);
}
break;
default:
Expand All @@ -254,7 +261,14 @@ public void verifyAndFixupContainerData(ContainerData containerData)
}
}

private void resolveDuplicate(KeyValueContainer existing,
/**
* Resolve duplicate containers.
* @param existing
* @param toAdd
* @return true if the container was swapped, false otherwise
* @throws IOException
*/
private boolean resolveDuplicate(KeyValueContainer existing,
KeyValueContainer toAdd) throws IOException {
if (existing.getContainerData().getReplicaIndex() != 0 ||
toAdd.getContainerData().getReplicaIndex() != 0) {
Expand All @@ -268,7 +282,7 @@ private void resolveDuplicate(KeyValueContainer existing,
existing.getContainerData().getContainerID(),
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath());
return;
return false;
}

long existingBCSID = existing.getBlockCommitSequenceId();
Expand All @@ -288,15 +302,15 @@ private void resolveDuplicate(KeyValueContainer existing,
toAdd.getContainerData().getContainerPath(), toAddState);
KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
hddsVolume.getConf());
return;
return false;
} else if (toAddState == CLOSED) {
LOG.warn("Container {} is present at {} with state CLOSED and at " +
"{} with state {}. Removing the latter container.",
toAdd.getContainerData().getContainerID(),
toAdd.getContainerData().getContainerPath(),
existing.getContainerData().getContainerPath(), existingState);
swapAndRemoveContainer(existing, toAdd);
return;
return true;
}
}

Expand All @@ -309,13 +323,15 @@ private void resolveDuplicate(KeyValueContainer existing,
toAdd.getContainerData().getContainerPath());
KeyValueContainerUtil.removeContainer(toAdd.getContainerData(),
hddsVolume.getConf());
return false;
} else {
LOG.warn("Container {} is present at {} with a lesser BCSID " +
"than at {}. Removing the former container.",
existing.getContainerData().getContainerID(),
existing.getContainerData().getContainerPath(),
toAdd.getContainerData().getContainerPath());
swapAndRemoveContainer(existing, toAdd);
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public boolean isAllowedContainerImport(long containerID) {
}

public void importContainer(long containerID, Path tarFilePath,
HddsVolume hddsVolume, CopyContainerCompression compression)
HddsVolume targetVolume, CopyContainerCompression compression)
throws IOException {
if (!importContainerProgress.add(containerID)) {
deleteFileQuietely(tarFilePath);
Expand All @@ -106,11 +106,6 @@ public void importContainer(long containerID, Path tarFilePath,
ContainerProtos.Result.CONTAINER_EXISTS);
}

HddsVolume targetVolume = hddsVolume;
if (targetVolume == null) {
targetVolume = chooseNextVolume();
}

KeyValueContainerData containerData;
TarContainerPacker packer = getPacker(compression);

Expand Down Expand Up @@ -148,7 +143,7 @@ HddsVolume chooseNextVolume() throws IOException {
// Choose volume that can hold both container in tmp and dest directory
return volumeChoosingPolicy.chooseVolume(
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
HddsServerUtil.requiredReplicationSpace(containerSize));
getDefaultReplicationSpace());
}

public static Path getUntarDirectory(HddsVolume hddsVolume)
Expand All @@ -171,7 +166,7 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) {
return new TarContainerPacker(compression);
}

public long getDefaultContainerSize() {
return containerSize;
public long getDefaultReplicationSpace() {
return HddsServerUtil.requiredReplicationSpace(containerSize);
}
}
Loading