diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index d431b494d783..7bb59247ca53 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -29,7 +29,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.STATE; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import jakarta.annotation.Nullable; @@ -214,21 +213,14 @@ public synchronized void setState(ContainerDataProto.State state) { (state != oldState)) { releaseCommitSpace(); } + } - /** - * commit space when container transitions (back) to Open. - * when? perhaps closing a container threw an exception - */ - if ((state == ContainerDataProto.State.OPEN) && - (state != oldState)) { - Preconditions.checkState(getMaxSize() > 0); - commitSpace(); - } + public boolean isCommittedSpace() { + return committedSpace; } - @VisibleForTesting - void setCommittedSpace(boolean committedSpace) { - this.committedSpace = committedSpace; + public void setCommittedSpace(boolean committed) { + committedSpace = committed; } /** @@ -356,7 +348,7 @@ public synchronized void closeContainer() { setState(ContainerDataProto.State.CLOSED); } - private void releaseCommitSpace() { + public void releaseCommitSpace() { long unused = getMaxSize() - getBytesUsed(); // only if container size < max size diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 9b5c89e1f73e..0f5c19fd3364 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -154,8 +154,6 @@ private boolean addContainer(Container container, boolean overwrite) throws throw new StorageContainerException(e, ContainerProtos.Result.IO_EXCEPTION); } missingContainerSet.remove(containerId); - // wish we could have done this from ContainerData.setState - container.getContainerData().commitSpace(); if (container.getContainerData().getState() == RECOVERING) { recoveringContainerMap.put( clock.millis() + recoveringTimeout, containerId); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java index 89c686645eac..e323eeb4b173 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/CapacityVolumeChoosingPolicy.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.util.List; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; @@ -44,11 +44,8 @@ public class CapacityVolumeChoosingPolicy implements VolumeChoosingPolicy { private static final Logger LOG = LoggerFactory.getLogger( CapacityVolumeChoosingPolicy.class); - // Stores the index of the next volume to be returned. - private final Random random = new Random(); - @Override - public HddsVolume chooseVolume(List volumes, + public synchronized HddsVolume chooseVolume(List volumes, long maxContainerSize) throws IOException { // No volumes available to choose from @@ -69,9 +66,8 @@ public HddsVolume chooseVolume(List volumes, } int count = volumesWithEnoughSpace.size(); - if (count == 1) { - return volumesWithEnoughSpace.get(0); - } else { + HddsVolume selectedVolume = volumesWithEnoughSpace.get(0); + if (count > 1) { // Even if we don't have too many volumes in volumesWithEnoughSpace, this // algorithm will still help us choose the volume with larger // available space than other volumes. @@ -83,8 +79,8 @@ public HddsVolume chooseVolume(List volumes, // 4. vol2 + vol2: 25%, result is vol2 // So we have a total of 75% chances to choose vol1, which meets our // expectation. - int firstIndex = random.nextInt(count); - int secondIndex = random.nextInt(count); + int firstIndex = ThreadLocalRandom.current().nextInt(count); + int secondIndex = ThreadLocalRandom.current().nextInt(count); HddsVolume firstVolume = volumesWithEnoughSpace.get(firstIndex); HddsVolume secondVolume = volumesWithEnoughSpace.get(secondIndex); @@ -93,7 +89,9 @@ public HddsVolume chooseVolume(List volumes, - firstVolume.getCommittedBytes(); long secondAvailable = secondVolume.getCurrentUsage().getAvailable() - secondVolume.getCommittedBytes(); - return firstAvailable < secondAvailable ? secondVolume : firstVolume; + selectedVolume = firstAvailable < secondAvailable ? secondVolume : firstVolume; } + selectedVolume.incCommittedBytes(maxContainerSize); + return selectedVolume; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java index 9945a3256b32..52c8c599703c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.slf4j.Logger; @@ -38,10 +37,10 @@ public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy { RoundRobinVolumeChoosingPolicy.class); // Stores the index of the next volume to be returned. - private AtomicInteger nextVolumeIndex = new AtomicInteger(0); + private int nextVolumeIndex = 0; @Override - public HddsVolume chooseVolume(List volumes, + public synchronized HddsVolume chooseVolume(List volumes, long maxContainerSize) throws IOException { // No volumes available to choose from @@ -53,8 +52,7 @@ public HddsVolume chooseVolume(List volumes, // since volumes could've been removed because of the failure // make sure we are not out of bounds - int nextIndex = nextVolumeIndex.get(); - int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0; + int currentVolumeIndex = nextVolumeIndex < volumes.size() ? nextVolumeIndex : 0; int startVolumeIndex = currentVolumeIndex; @@ -67,7 +65,8 @@ public HddsVolume chooseVolume(List volumes, if (hasEnoughSpace) { logIfSomeVolumesOutOfSpace(filter, LOG); - nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex); + nextVolumeIndex = currentVolumeIndex; + volume.incCommittedBytes(maxContainerSize); return volume; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 09cadd5d13fc..030392045d51 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -157,8 +157,15 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy = StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()); while (true) { HddsVolume containerVolume; + String hddsVolumeDir; try { containerVolume = volumeChoosingPolicy.chooseVolume(volumes, maxSize); + hddsVolumeDir = containerVolume.getHddsRootDir().toString(); + // Set volume before getContainerDBFile(), because we may need the + // volume to deduce the db file. + containerData.setVolume(containerVolume); + // commit bytes have been reserved in volumeChoosingPolicy#chooseVolume + containerData.setCommittedSpace(true); } catch (DiskOutOfSpaceException ex) { throw new StorageContainerException("Container creation failed, " + "due to disk out of space", ex, DISK_OUT_OF_SPACE); @@ -169,11 +176,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy } try { - String hddsVolumeDir = containerVolume.getHddsRootDir().toString(); - // Set volume before getContainerDBFile(), because we may need the - // volume to deduce the db file. - containerData.setVolume(containerVolume); - long containerID = containerData.getContainerID(); String idDir = VersionedDatanodeFeatures.ScmHA.chooseContainerPathID( containerVolume, clusterId); @@ -206,7 +208,6 @@ public void create(VolumeSet volumeSet, VolumeChoosingPolicy // Create .container file File containerFile = getContainerFile(); createContainerFile(containerFile); - return; } catch (StorageContainerException ex) { if (containerMetaDataPath != null diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index c933dc76cef9..7f192afc29eb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -434,6 +434,7 @@ ContainerCommandResponseProto handleCreateContainer( LOG.debug("Container already exists. container Id {}", containerID); } } catch (StorageContainerException ex) { + newContainerData.releaseCommitSpace(); return ContainerUtils.logAndReturnError(LOG, ex, request); } finally { containerIdLock.unlock(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 90bbb3186ad4..f3b39333e087 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -202,49 +202,56 @@ public void verifyAndFixupContainerData(ContainerData containerData) throws IOException { switch (containerData.getContainerType()) { case KeyValueContainer: - if (containerData instanceof KeyValueContainerData) { - KeyValueContainerData kvContainerData = (KeyValueContainerData) - containerData; - containerData.setVolume(hddsVolume); - KeyValueContainerUtil.parseKVContainerData(kvContainerData, config); - KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData, - config); - if (kvContainer.getContainerState() == RECOVERING) { - if (shouldDelete) { - // delete Ratis replicated RECOVERING containers - if (kvContainer.getContainerData().getReplicaIndex() == 0) { - cleanupContainer(hddsVolume, kvContainer); - } else { - kvContainer.markContainerUnhealthy(); - LOG.info("Stale recovering container {} marked UNHEALTHY", - kvContainerData.getContainerID()); - containerSet.addContainer(kvContainer); - } - } - return; - } - if (kvContainer.getContainerState() == DELETED) { - if (shouldDelete) { + if (!(containerData instanceof KeyValueContainerData)) { + throw new StorageContainerException("Container File is corrupted. " + + "ContainerType is KeyValueContainer but cast to " + + "KeyValueContainerData failed. ", + ContainerProtos.Result.CONTAINER_METADATA_ERROR); + } + + KeyValueContainerData kvContainerData = (KeyValueContainerData) + containerData; + containerData.setVolume(hddsVolume); + KeyValueContainerUtil.parseKVContainerData(kvContainerData, config); + KeyValueContainer kvContainer = new KeyValueContainer(kvContainerData, + config); + if (kvContainer.getContainerState() == RECOVERING) { + if (shouldDelete) { + // delete Ratis replicated RECOVERING containers + if (kvContainer.getContainerData().getReplicaIndex() == 0) { cleanupContainer(hddsVolume, kvContainer); + } else { + kvContainer.markContainerUnhealthy(); + LOG.info("Stale recovering container {} marked UNHEALTHY", + kvContainerData.getContainerID()); + containerSet.addContainer(kvContainer); } - return; } - try { - containerSet.addContainer(kvContainer); - } catch (StorageContainerException e) { - if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) { - throw e; - } - if (shouldDelete) { - resolveDuplicate((KeyValueContainer) containerSet.getContainer( - kvContainer.getContainerData().getContainerID()), kvContainer); + return; + } else if (kvContainer.getContainerState() == DELETED) { + if (shouldDelete) { + cleanupContainer(hddsVolume, kvContainer); + } + return; + } + + try { + containerSet.addContainer(kvContainer); + // this should be the last step of this block + containerData.commitSpace(); + } catch (StorageContainerException e) { + if (e.getResult() != ContainerProtos.Result.CONTAINER_EXISTS) { + throw e; + } + if (shouldDelete) { + KeyValueContainer existing = (KeyValueContainer) containerSet.getContainer( + kvContainer.getContainerData().getContainerID()); + boolean swapped = resolveDuplicate(existing, kvContainer); + if (swapped) { + existing.getContainerData().releaseCommitSpace(); + kvContainer.getContainerData().commitSpace(); } } - } else { - throw new StorageContainerException("Container File is corrupted. " + - "ContainerType is KeyValueContainer but cast to " + - "KeyValueContainerData failed. ", - ContainerProtos.Result.CONTAINER_METADATA_ERROR); } break; default: @@ -254,7 +261,14 @@ public void verifyAndFixupContainerData(ContainerData containerData) } } - private void resolveDuplicate(KeyValueContainer existing, + /** + * Resolve duplicate containers. + * @param existing + * @param toAdd + * @return true if the container was swapped, false otherwise + * @throws IOException + */ + private boolean resolveDuplicate(KeyValueContainer existing, KeyValueContainer toAdd) throws IOException { if (existing.getContainerData().getReplicaIndex() != 0 || toAdd.getContainerData().getReplicaIndex() != 0) { @@ -268,7 +282,7 @@ private void resolveDuplicate(KeyValueContainer existing, existing.getContainerData().getContainerID(), existing.getContainerData().getContainerPath(), toAdd.getContainerData().getContainerPath()); - return; + return false; } long existingBCSID = existing.getBlockCommitSequenceId(); @@ -288,7 +302,7 @@ private void resolveDuplicate(KeyValueContainer existing, toAdd.getContainerData().getContainerPath(), toAddState); KeyValueContainerUtil.removeContainer(toAdd.getContainerData(), hddsVolume.getConf()); - return; + return false; } else if (toAddState == CLOSED) { LOG.warn("Container {} is present at {} with state CLOSED and at " + "{} with state {}. Removing the latter container.", @@ -296,7 +310,7 @@ private void resolveDuplicate(KeyValueContainer existing, toAdd.getContainerData().getContainerPath(), existing.getContainerData().getContainerPath(), existingState); swapAndRemoveContainer(existing, toAdd); - return; + return true; } } @@ -309,6 +323,7 @@ private void resolveDuplicate(KeyValueContainer existing, toAdd.getContainerData().getContainerPath()); KeyValueContainerUtil.removeContainer(toAdd.getContainerData(), hddsVolume.getConf()); + return false; } else { LOG.warn("Container {} is present at {} with a lesser BCSID " + "than at {}. Removing the former container.", @@ -316,6 +331,7 @@ private void resolveDuplicate(KeyValueContainer existing, existing.getContainerData().getContainerPath(), toAdd.getContainerData().getContainerPath()); swapAndRemoveContainer(existing, toAdd); + return true; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index f69516f94e17..ff7b1d3b7328 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -88,7 +88,7 @@ public boolean isAllowedContainerImport(long containerID) { } public void importContainer(long containerID, Path tarFilePath, - HddsVolume hddsVolume, CopyContainerCompression compression) + HddsVolume targetVolume, CopyContainerCompression compression) throws IOException { if (!importContainerProgress.add(containerID)) { deleteFileQuietely(tarFilePath); @@ -106,11 +106,6 @@ public void importContainer(long containerID, Path tarFilePath, ContainerProtos.Result.CONTAINER_EXISTS); } - HddsVolume targetVolume = hddsVolume; - if (targetVolume == null) { - targetVolume = chooseNextVolume(); - } - KeyValueContainerData containerData; TarContainerPacker packer = getPacker(compression); @@ -148,7 +143,7 @@ HddsVolume chooseNextVolume() throws IOException { // Choose volume that can hold both container in tmp and dest directory return volumeChoosingPolicy.chooseVolume( StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), - HddsServerUtil.requiredReplicationSpace(containerSize)); + getDefaultReplicationSpace()); } public static Path getUntarDirectory(HddsVolume hddsVolume) @@ -171,7 +166,7 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) { return new TarContainerPacker(compression); } - public long getDefaultContainerSize() { - return containerSize; + public long getDefaultReplicationSpace() { + return HddsServerUtil.requiredReplicationSpace(containerSize); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 9a943c633386..240ba9473d3d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -22,11 +22,8 @@ import java.nio.file.Path; import java.util.List; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.slf4j.Logger; @@ -47,7 +44,6 @@ public class DownloadAndImportReplicator implements ContainerReplicator { private final ContainerDownloader downloader; private final ContainerImporter containerImporter; private final ContainerSet containerSet; - private final long containerSize; public DownloadAndImportReplicator( ConfigurationSource conf, ContainerSet containerSet, @@ -57,9 +53,6 @@ public DownloadAndImportReplicator( this.containerSet = containerSet; this.downloader = downloader; this.containerImporter = containerImporter; - containerSize = (long) conf.getStorageSize( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } @Override @@ -81,15 +74,6 @@ public void replicate(ReplicationTask task) { try { targetVolume = containerImporter.chooseNextVolume(); - // Increment committed bytes and verify if it doesn't cross the space left. - targetVolume.incCommittedBytes(containerSize * 2); - StorageLocationReport volumeReport = targetVolume.getReport(); - // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter - if (volumeReport.getUsableSpace() <= 0) { - LOG.warn("Container {} replication was unsuccessful, no space left on volume {}", containerID, volumeReport); - task.setStatus(Status.FAILED); - return; - } // Wait for the download. This thread pool is limiting the parallel // downloads, so it's ok to block here and wait for the full download. Path tarFilePath = @@ -114,7 +98,7 @@ public void replicate(ReplicationTask task) { task.setStatus(Status.FAILED); } finally { if (targetVolume != null) { - targetVolume.incCommittedBytes(-containerSize * 2); + targetVolume.incCommittedBytes(-containerImporter.getDefaultReplicationSpace()); } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index 5224498e7274..9cb07a21c5dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -29,9 +29,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.util.DiskChecker; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; @@ -87,16 +85,6 @@ public void onNext(SendContainerRequest req) { if (containerId == -1) { containerId = req.getContainerID(); volume = importer.chooseNextVolume(); - // Increment committed bytes and verify if it doesn't cross the space left. - volume.incCommittedBytes(importer.getDefaultContainerSize() * 2); - StorageLocationReport volumeReport = volume.getReport(); - // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter - if (volumeReport.getUsableSpace() <= 0) { - volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); - LOG.warn("Container {} import was unsuccessful, no space left on volume {}", containerId, volumeReport); - volume = null; - throw new DiskChecker.DiskOutOfSpaceException("No more available volumes"); - } Path dir = ContainerImporter.getUntarDirectory(volume); Files.createDirectories(dir); @@ -130,7 +118,7 @@ public void onError(Throwable t) { responseObserver.onError(t); } finally { if (volume != null) { - volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + volume.incCommittedBytes(-importer.getDefaultReplicationSpace()); } } } @@ -159,7 +147,7 @@ public void onCompleted() { } } finally { if (volume != null) { - volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + volume.incCommittedBytes(-importer.getDefaultReplicationSpace()); } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index cf75342efdf2..02f999013e63 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -202,10 +202,11 @@ private KeyValueContainer addContainer(ContainerSet cSet, long cID) data.addMetadata("VOLUME", "shire"); data.addMetadata("owner)", "bilbo"); KeyValueContainer container = new KeyValueContainer(data, conf); + commitBytesBefore = StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()).get(0).getCommittedBytes(); + container.create(volumeSet, volumeChoosingPolicy, SCM_ID); - commitBytesBefore = container.getContainerData() - .getVolume().getCommittedBytes(); cSet.addContainer(container); + commitBytesAfter = container.getContainerData() .getVolume().getCommittedBytes(); commitIncrement = commitBytesAfter - commitBytesBefore; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java index d6c97c5f1a36..07ae372a4cd5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestCapacityVolumeChoosingPolicy.java @@ -149,4 +149,15 @@ public void testVolumeChoosingPolicyFactory() VolumeChoosingPolicyFactory.getPolicy(CONF).getClass()); } + @Test + public void testVolumeCommittedSpace() throws Exception { + Map initialCommittedSpace = new HashMap<>(); + volumes.forEach(vol -> + initialCommittedSpace.put(vol, vol.getCommittedBytes())); + + HddsVolume selectedVolume = policy.chooseVolume(volumes, 50); + + assertEquals(initialCommittedSpace.get(selectedVolume) + 50, + selectedVolume.getCommittedBytes()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java index 1c07fe7ab7b3..2406011a3d14 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestRoundRobinVolumeChoosingPolicy.java @@ -25,7 +25,9 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory; import org.apache.hadoop.hdds.fs.MockSpaceUsageSource; @@ -115,4 +117,15 @@ public void throwsDiskOutOfSpaceIfRequestMoreThanAvailable() { "Most available space: 150 bytes"); } + @Test + public void testVolumeCommittedSpace() throws Exception { + Map initialCommittedSpace = new HashMap<>(); + volumes.forEach(vol -> + initialCommittedSpace.put(vol, vol.getCommittedBytes())); + + HddsVolume selectedVolume = policy.chooseVolume(volumes, 50); + + assertEquals(initialCommittedSpace.get(selectedVolume) + 50, + selectedVolume.getCommittedBytes()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index c700b235faf9..51a949e496fc 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -38,6 +38,8 @@ import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.File; @@ -1097,4 +1099,19 @@ private void testMixedSchemaImport(String dir, assertEquals(pendingDeleteBlockCount, importedContainer.getContainerData().getNumPendingDeletionBlocks()); } + + @ContainerTestVersionInfo.ContainerTest + public void testContainerCreationCommitSpaceReserve( + ContainerTestVersionInfo versionInfo) throws Exception { + init(versionInfo); + keyValueContainerData = spy(keyValueContainerData); + keyValueContainer = new KeyValueContainer(keyValueContainerData, CONF); + keyValueContainer = spy(keyValueContainer); + + keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); + + // verify that + verify(volumeChoosingPolicy).chooseVolume(anyList(), anyLong()); // this would reserve commit space + assertTrue(keyValueContainerData.isCommittedSpace()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 256ca20e9386..7927864861b7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -25,11 +25,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,6 +45,8 @@ import java.util.HashMap; import java.util.List; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.StorageUnit; @@ -53,6 +57,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.security.token.TokenVerifier; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; @@ -70,17 +75,22 @@ import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.util.Time; +import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit tests for {@link KeyValueHandler}. */ public class TestKeyValueHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestKeyValueHandler.class); + @TempDir private Path tempDir; @@ -91,9 +101,11 @@ public class TestKeyValueHandler { private HddsDispatcher dispatcher; private KeyValueHandler handler; + private long maxContainerSize; @BeforeEach public void setup() throws StorageContainerException { + OzoneConfiguration conf = new OzoneConfiguration(); // Create mock HddsDispatcher and KeyValueHandler. handler = mock(KeyValueHandler.class); @@ -109,6 +121,10 @@ public void setup() throws StorageContainerException { mock(ContainerMetrics.class), mock(TokenVerifier.class) ); + + maxContainerSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } /** @@ -337,6 +353,68 @@ public void testCloseInvalidContainer(ContainerLayoutVersion layoutVersion) "Close container should return Invalid container error"); } + @Test + public void testCreateContainerWithFailure() throws Exception { + final String testDir = tempDir.toString(); + final long containerID = 1L; + final String clusterId = UUID.randomUUID().toString(); + final String datanodeId = UUID.randomUUID().toString(); + final ConfigurationSource conf = new OzoneConfiguration(); + final ContainerSet containerSet = spy(newContainerSet()); + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(conf) + .clusterID(clusterId).datanodeUuid(datanodeId) + .volumeSet(volumeSet) + .build(); + + hddsVolume.format(clusterId); + hddsVolume.createWorkingDir(clusterId, null); + hddsVolume.createTmpDirs(clusterId); + + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(hddsVolume)); + + List hddsVolumeList = StorageVolumeUtil + .getHddsVolumesList(volumeSet.getVolumesList()); + + assertEquals(1, hddsVolumeList.size()); + + final ContainerMetrics metrics = ContainerMetrics.create(conf); + + final AtomicInteger icrReceived = new AtomicInteger(0); + + final KeyValueHandler kvHandler = new KeyValueHandler(conf, + datanodeId, containerSet, volumeSet, metrics, + c -> icrReceived.incrementAndGet()); + kvHandler.setClusterID(clusterId); + + final ContainerCommandRequestProto createContainer = + createContainerRequest(datanodeId, containerID); + + Semaphore semaphore = new Semaphore(1); + doAnswer(invocation -> { + semaphore.acquire(); + throw new StorageContainerException(ContainerProtos.Result.IO_EXCEPTION); + }).when(containerSet).addContainer(any()); + + semaphore.acquire(); + CompletableFuture.runAsync(() -> + kvHandler.handleCreateContainer(createContainer, null) + ); + + // commit bytes has been allocated by volumeChoosingPolicy which is called in KeyValueContainer#create + GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == maxContainerSize, + 1000, 50000); + semaphore.release(); + + LOG.info("Committed bytes: {}", hddsVolume.getCommittedBytes()); + + // release committed bytes as exception is thrown + GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 0, + 1000, 50000); + } + @Test public void testDeleteContainer() throws IOException { final String testDir = tempDir.toString(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java index 6a48765c1a91..ec5c6743e729 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java @@ -217,8 +217,18 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo) throws Exception { setLayoutAndSchemaVersion(versionInfo); setup(versionInfo); + + ContainerReader containerReader = new ContainerReader(volumeSet, + hddsVolume, containerSet, conf, true); + Thread thread = new Thread(containerReader); + thread.start(); + thread.join(); + long originalCommittedBytes = hddsVolume.getCommittedBytes(); + ContainerCache.getInstance(conf).shutdownCache(); + + long recoveringContainerId = 10; KeyValueContainerData recoveringContainerData = new KeyValueContainerData( - 10, layout, (long) StorageUnit.GB.toBytes(5), + recoveringContainerId, layout, (long) StorageUnit.GB.toBytes(5), UUID.randomUUID().toString(), datanodeId.toString()); //create a container with recovering state recoveringContainerData.setState(RECOVERING); @@ -229,13 +239,13 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo) recoveringKeyValueContainer.create( volumeSet, volumeChoosingPolicy, clusterId); - ContainerReader containerReader = new ContainerReader(volumeSet, - hddsVolume, containerSet, conf, true); - - Thread thread = new Thread(containerReader); + thread = new Thread(containerReader); thread.start(); thread.join(); + // no change, only open containers have committed space + assertEquals(originalCommittedBytes, hddsVolume.getCommittedBytes()); + // Ratis replicated recovering containers are deleted upon datanode startup if (recoveringKeyValueContainer.getContainerData().getReplicaIndex() == 0) { assertNull(containerSet.getContainer(recoveringContainerData.getContainerID())); @@ -262,6 +272,8 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo) assertEquals(i, keyValueContainerData.getNumPendingDeletionBlocks()); + + assertTrue(keyValueContainerData.isCommittedSpace()); } } @@ -313,6 +325,14 @@ public void testContainerReaderWithLoadException( hddsVolume1, containerSet1, conf, true); containerReader.readVolume(hddsVolume1.getHddsRootDir()); assertEquals(containerCount - 1, containerSet1.containerCount()); + for (Container c : containerSet1.getContainerMap().values()) { + if (c.getContainerData().getContainerID() == 0) { + assertFalse(c.getContainerData().isCommittedSpace()); + } else { + assertTrue(c.getContainerData().isCommittedSpace()); + } + } + assertEquals(hddsVolume1.getCommittedBytes(), (containerCount - 1) * StorageUnit.GB.toBytes(5)); } @ContainerTestVersionInfo.ContainerTest @@ -361,6 +381,7 @@ public void testContainerReaderWithInvalidDbPath( hddsVolume1, containerSet1, conf, true); containerReader.readVolume(hddsVolume1.getHddsRootDir()); assertEquals(0, containerSet1.containerCount()); + assertEquals(0, hddsVolume1.getCommittedBytes()); assertThat(dnLogs.getOutput()).contains("Container DB file is missing"); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java new file mode 100644 index 000000000000..5993e43e6617 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestDownloadAndImportReplicator.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +/** + * Test for DownloadAndImportReplicator. + */ +@Timeout(300) +public class TestDownloadAndImportReplicator { + + @TempDir + private File tempDir; + + private OzoneConfiguration conf; + private VolumeChoosingPolicy volumeChoosingPolicy; + private ContainerSet containerSet; + private MutableVolumeSet volumeSet; + private ContainerImporter importer; + private SimpleContainerDownloader downloader; + private DownloadAndImportReplicator replicator; + private long containerMaxSize; + + @BeforeEach + void setup() throws IOException { + conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); + volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf); + containerSet = newContainerSet(0); + volumeSet = new MutableVolumeSet("test", conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + importer = new ContainerImporter(conf, containerSet, + mock(ContainerController.class), volumeSet, volumeChoosingPolicy); + downloader = mock(SimpleContainerDownloader.class); + replicator = new DownloadAndImportReplicator(conf, containerSet, importer, + downloader); + containerMaxSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + } + + @Test + public void testCommitSpaceReleasedOnReplicationFailure() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Mock downloader to throw exception + Semaphore semaphore = new Semaphore(1); + when(downloader.getContainerDataFromReplicas(anyLong(), any(), any(), any())) + .thenAnswer(invocation -> { + semaphore.acquire(); + throw new IOException("Download failed"); + }); + + ReplicationTask task = new ReplicationTask(containerId, + Collections.singletonList(mock(DatanodeDetails.class)), replicator); + + // Acquire semaphore so that container import will pause before downloading. + semaphore.acquire(); + CompletableFuture.runAsync(() -> { + assertThrows(IOException.class, () -> replicator.replicate(task)); + }); + + // Wait such that first container import reserve space + GenericTestUtils.waitFor(() -> + volume.getCommittedBytes() > initialCommittedBytes, + 1000, 50000); + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + semaphore.release(); + + GenericTestUtils.waitFor(() -> + volume.getCommittedBytes() == initialCommittedBytes, + 1000, 50000); + + // Verify commit space is released + assertEquals(initialCommittedBytes, volume.getCommittedBytes()); + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 9acb73486a06..1e69eac2ea9e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -355,11 +355,12 @@ public void testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout @ContainerLayoutTestInfo.ContainerTest public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) throws IOException, InterruptedException, TimeoutException { + final long containerUsedSize = 100; this.layoutVersion = layout; OzoneConfiguration conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); - long containerSize = (long) conf.getStorageSize( + long containerMaxSize = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); @@ -369,13 +370,16 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) .clock(clock) .build(); + MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + long containerId = 1; // create container KeyValueContainerData containerData = new KeyValueContainerData(containerId, - ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"); - HddsVolume vol = mock(HddsVolume.class); - containerData.setVolume(vol); - containerData.incrBytesUsed(100); + ContainerLayoutVersion.FILE_PER_BLOCK, containerMaxSize, "test", "test"); + HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0); + containerData.setVolume(vol1); + containerData.incrBytesUsed(containerUsedSize); KeyValueContainer container = new KeyValueContainer(containerData, conf); ContainerController controllerMock = mock(ContainerController.class); Semaphore semaphore = new Semaphore(1); @@ -384,8 +388,7 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) semaphore.acquire(); return container; }); - MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null, - StorageVolume.VolumeType.DATA_VOLUME, null); + File tarFile = containerTarFile(containerId, containerData); SimpleContainerDownloader moc = @@ -398,14 +401,13 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) ContainerImporter importer = new ContainerImporter(conf, set, controllerMock, volumeSet, volumeChoosingPolicy); - HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0); // Initially volume has 0 commit space assertEquals(0, vol1.getCommittedBytes()); long usedSpace = vol1.getCurrentUsage().getUsedSpace(); // Initially volume has 0 used space assertEquals(0, usedSpace); // Increase committed bytes so that volume has only remaining 3 times container size space - long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - containerSize * 3; + long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - containerMaxSize * 3; vol1.incCommittedBytes(initialCommittedBytes); ContainerReplicator replicator = new DownloadAndImportReplicator(conf, set, importer, moc); @@ -424,11 +426,11 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) // Wait such that first container import reserve space GenericTestUtils.waitFor(() -> - vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - containerSize * 3, + vol1.getCommittedBytes() > initialCommittedBytes, 1000, 50000); // Volume has reserved space of 2 * containerSize - assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * containerSize); + assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); // Container 2 import will fail as container 1 has reserved space and no space left to import new container // New container import requires at least (2 * container size) long containerId2 = 2; @@ -443,10 +445,11 @@ public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) usedSpace = vol1.getCurrentUsage().getUsedSpace(); // After replication, volume used space should be increased by container used bytes - assertEquals(100, usedSpace); + assertEquals(containerUsedSize, usedSpace); - // Volume committed bytes should become initial committed bytes which was before replication - assertEquals(initialCommittedBytes, vol1.getCommittedBytes()); + // Volume committed bytes used for replication has been released, no need to reserve space for imported container + // only closed container gets replicated, so no new data will be written it + assertEquals(vol1.getCommittedBytes(), initialCommittedBytes); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java index 0d15e265ad91..441bc7890b65 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java @@ -21,18 +21,25 @@ import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.mockito.Mockito.any; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; @@ -48,7 +55,7 @@ /** * Test for {@link SendContainerRequestHandler}. */ -class TestSendContainerRequestHandler { +public class TestSendContainerRequestHandler { @TempDir private File tempDir; @@ -57,38 +64,48 @@ class TestSendContainerRequestHandler { private VolumeChoosingPolicy volumeChoosingPolicy; + private ContainerSet containerSet; + private MutableVolumeSet volumeSet; + private ContainerImporter importer; + private StreamObserver responseObserver; + private SendContainerRequestHandler sendContainerRequestHandler; + private long containerMaxSize; + @BeforeEach - void setup() { + void setup() throws IOException { conf = new OzoneConfiguration(); conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(conf); + containerSet = newContainerSet(0); + volumeSet = new MutableVolumeSet("test", conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + importer = new ContainerImporter(conf, containerSet, + mock(ContainerController.class), volumeSet, volumeChoosingPolicy); + importer = spy(importer); + responseObserver = mock(StreamObserver.class); + sendContainerRequestHandler = new SendContainerRequestHandler(importer, responseObserver, null); + containerMaxSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } @Test void testReceiveDataForExistingContainer() throws Exception { long containerId = 1; // create containerImporter - ContainerSet containerSet = newContainerSet(0); - MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null, - StorageVolume.VolumeType.DATA_VOLUME, null); - ContainerImporter containerImporter = new ContainerImporter(conf, - newContainerSet(0), mock(ContainerController.class), volumeSet, volumeChoosingPolicy); KeyValueContainerData containerData = new KeyValueContainerData(containerId, ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"); // add container to container set KeyValueContainer container = new KeyValueContainer(containerData, conf); containerSet.addContainer(container); - StreamObserver observer = mock(StreamObserver.class); doAnswer(invocation -> { Object arg = invocation.getArgument(0); assertInstanceOf(StorageContainerException.class, arg); assertEquals(ContainerProtos.Result.CONTAINER_EXISTS, ((StorageContainerException) arg).getResult()); return null; - }).when(observer).onError(any()); - SendContainerRequestHandler sendContainerRequestHandler - = new SendContainerRequestHandler(containerImporter, observer, null); + }).when(responseObserver).onError(any()); ByteString data = ByteString.copyFromUtf8("test"); ContainerProtos.SendContainerRequest request = ContainerProtos.SendContainerRequest.newBuilder() @@ -99,4 +116,87 @@ void testReceiveDataForExistingContainer() throws Exception { .build(); sendContainerRequestHandler.onNext(request); } + + @Test + public void testSpaceReservedAndReleasedWhenRequestCompleted() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Create request + ContainerProtos.SendContainerRequest request = ContainerProtos.SendContainerRequest.newBuilder() + .setContainerID(containerId) + .setData(ByteString.EMPTY) + .setOffset(0) + .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto()) + .build(); + + // Execute request + sendContainerRequestHandler.onNext(request); + + // Verify commit space is reserved + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + + // complete the request + sendContainerRequestHandler.onCompleted(); + + // Verify commit space is released + assertEquals(volume.getCommittedBytes(), initialCommittedBytes); + } + + @Test + public void testSpaceReservedAndReleasedWhenOnNextFails() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Create request + ContainerProtos.SendContainerRequest request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); + + // Execute request + sendContainerRequestHandler.onNext(request); + + // Verify commit space is reserved + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + + // mock the importer is not allowed to import this container + when(importer.isAllowedContainerImport(containerId)).thenReturn(false); + + sendContainerRequestHandler.onNext(request); + + // Verify commit space is released + assertEquals(volume.getCommittedBytes(), initialCommittedBytes); + } + + @Test + public void testSpaceReservedAndReleasedWhenOnCompletedFails() throws Exception { + long containerId = 1; + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + + // Create request + ContainerProtos.SendContainerRequest request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); + + // Execute request + sendContainerRequestHandler.onNext(request); + + // Verify commit space is reserved + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + + doThrow(new IOException("Failed")).when(importer).importContainer(anyLong(), any(), any(), any()); + + sendContainerRequestHandler.onCompleted(); + + // Verify commit space is released + assertEquals(volume.getCommittedBytes(), initialCommittedBytes); + } + + private ContainerProtos.SendContainerRequest createRequest(long containerId, ByteString data, int offset) { + return ContainerProtos.SendContainerRequest.newBuilder() + .setContainerID(containerId) + .setData(data) + .setOffset(offset) + .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto()) + .build(); + } }