diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java index 330cc1d9f39..b6356934ca3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/AvailableSpaceFilter.java @@ -33,7 +33,7 @@ public class AvailableSpaceFilter implements Predicate { new HashMap<>(); private long mostAvailableSpace = Long.MIN_VALUE; - AvailableSpaceFilter(long requiredSpace) { + public AvailableSpaceFilter(long requiredSpace) { this.requiredSpace = requiredSpace; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index 46bbb666201..10567475e3a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -128,6 +128,8 @@ public void importContainer(long containerID, Path tarFilePath, try (InputStream input = Files.newInputStream(tarFilePath)) { Container container = controller.importContainer( containerData, input, packer); + // After container import is successful, increase used space for the volume + targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed()); containerSet.addContainerByOverwriteMissingContainer(container); } } finally { @@ -172,4 +174,7 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) { return new TarContainerPacker(compression); } + public long getDefaultContainerSize() { + return containerSize; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 69f375b06c1..8c44d0d0781 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -22,8 +22,11 @@ import java.nio.file.Path; import java.util.List; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.slf4j.Logger; @@ -44,6 +47,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator { private final ContainerDownloader downloader; private final ContainerImporter containerImporter; private final ContainerSet containerSet; + private final long containerSize; public DownloadAndImportReplicator( ConfigurationSource conf, ContainerSet containerSet, @@ -53,6 +57,9 @@ public DownloadAndImportReplicator( this.containerSet = containerSet; this.downloader = downloader; this.containerImporter = containerImporter; + containerSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } @Override @@ -70,9 +77,19 @@ public void replicate(ReplicationTask task) { LOG.info("Starting replication of container {} from {} using {}", containerID, sourceDatanodes, compression); + HddsVolume targetVolume = null; try { - HddsVolume targetVolume = containerImporter.chooseNextVolume(); + targetVolume = containerImporter.chooseNextVolume(); + // Increment committed bytes and verify if it doesn't cross the space left. + targetVolume.incCommittedBytes(containerSize * 2); + // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter + AvailableSpaceFilter filter = new AvailableSpaceFilter(0); + if (!filter.test(targetVolume)) { + LOG.warn("Container {} replication was unsuccessful, due to no space left", containerID); + task.setStatus(Status.FAILED); + return; + } // Wait for the download. This thread pool is limiting the parallel // downloads, so it's ok to block here and wait for the full download. Path tarFilePath = @@ -95,6 +112,10 @@ public void replicate(ReplicationTask task) { } catch (IOException e) { LOG.error("Container {} replication was unsuccessful.", containerID, e); task.setStatus(Status.FAILED); + } finally { + if (targetVolume != null) { + targetVolume.incCommittedBytes(-containerSize * 2); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index a89e274f9c6..e76a44e680d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -29,7 +29,9 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.util.DiskChecker; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; @@ -50,7 +52,7 @@ class SendContainerRequestHandler private long containerId = -1; private long nextOffset; private OutputStream output; - private HddsVolume volume; + private HddsVolume volume = null; private Path path; private CopyContainerCompression compression; private final ZeroCopyMessageMarshaller marshaller; @@ -85,6 +87,17 @@ public void onNext(SendContainerRequest req) { if (containerId == -1) { containerId = req.getContainerID(); volume = importer.chooseNextVolume(); + // Increment committed bytes and verify if it doesn't cross the space left. + volume.incCommittedBytes(importer.getDefaultContainerSize() * 2); + // Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter + AvailableSpaceFilter filter = new AvailableSpaceFilter(0); + if (!filter.test(volume)) { + volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + LOG.warn("Container {} import was unsuccessful, due to no space left", containerId); + volume = null; + throw new DiskChecker.DiskOutOfSpaceException("No more available volumes"); + } + Path dir = ContainerImporter.getUntarDirectory(volume); Files.createDirectories(dir); path = dir.resolve(ContainerUtils.getContainerTarName(containerId)); @@ -110,32 +123,44 @@ public void onNext(SendContainerRequest req) { @Override public void onError(Throwable t) { - LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t); - closeOutput(); - deleteTarball(); - responseObserver.onError(t); + try { + LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t); + closeOutput(); + deleteTarball(); + responseObserver.onError(t); + } finally { + if (volume != null) { + volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + } + } } @Override public void onCompleted() { - if (output == null) { - LOG.warn("Received container without any parts"); - return; - } - - LOG.info("Container {} is downloaded with size {}, starting to import.", - containerId, nextOffset); - closeOutput(); - try { - importer.importContainer(containerId, path, volume, compression); - LOG.info("Container {} is replicated successfully", containerId); - responseObserver.onNext(SendContainerResponse.newBuilder().build()); - responseObserver.onCompleted(); - } catch (Throwable t) { - LOG.warn("Failed to import container {}", containerId, t); - deleteTarball(); - responseObserver.onError(t); + if (output == null) { + LOG.warn("Received container without any parts"); + return; + } + + LOG.info("Container {} is downloaded with size {}, starting to import.", + containerId, nextOffset); + closeOutput(); + + try { + importer.importContainer(containerId, path, volume, compression); + LOG.info("Container {} is replicated successfully", containerId); + responseObserver.onNext(SendContainerResponse.newBuilder().build()); + responseObserver.onCompleted(); + } catch (Throwable t) { + LOG.warn("Failed to import container {}", containerId, t); + deleteTarball(); + responseObserver.onError(t); + } + } finally { + if (volume != null) { + volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2); + } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index b6517f4fead..fb606459942 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -42,6 +42,9 @@ import jakarta.annotation.Nonnull; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Clock; @@ -52,24 +55,36 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; @@ -77,6 +92,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask; @@ -100,6 +116,9 @@ public class TestReplicationSupervisor { private static final long CURRENT_TERM = 1; + @TempDir + private File tempDir; + private final ContainerReplicator noopReplicator = task -> { }; private final ContainerReplicator throwingReplicator = task -> { throw new RuntimeException("testing replication failure"); @@ -328,6 +347,126 @@ public void testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout .contains("Container 1 replication was unsuccessful."); } + @ContainerLayoutTestInfo.ContainerTest + public void testReplicationImportReserveSpace(ContainerLayoutVersion layout) + throws IOException, InterruptedException, TimeoutException { + this.layoutVersion = layout; + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath()); + + long containerSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .executor(newDirectExecutorService()) + .clock(clock) + .build(); + + long containerId = 1; + // create container + KeyValueContainerData containerData = new KeyValueContainerData(containerId, + ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"); + HddsVolume vol = mock(HddsVolume.class); + containerData.setVolume(vol); + containerData.incrBytesUsed(100); + KeyValueContainer container = new KeyValueContainer(containerData, conf); + ContainerController controllerMock = mock(ContainerController.class); + Semaphore semaphore = new Semaphore(1); + when(controllerMock.importContainer(any(), any(), any())) + .thenAnswer((invocation) -> { + semaphore.acquire(); + return container; + }); + MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null, + StorageVolume.VolumeType.DATA_VOLUME, null); + File tarFile = containerTarFile(containerId, containerData); + + SimpleContainerDownloader moc = + mock(SimpleContainerDownloader.class); + when( + moc.getContainerDataFromReplicas(anyLong(), anyList(), + any(Path.class), any())) + .thenReturn(tarFile.toPath()); + + ContainerImporter importer = + new ContainerImporter(conf, set, controllerMock, volumeSet); + + HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0); + // Initially volume has 0 commit space + assertEquals(0, vol1.getCommittedBytes()); + long usedSpace = vol1.getCurrentUsage().getUsedSpace(); + // Initially volume has 0 used space + assertEquals(0, usedSpace); + // Increase committed bytes so that volume has only remaining 3 times container size space + long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - containerSize * 3; + vol1.incCommittedBytes(initialCommittedBytes); + ContainerReplicator replicator = + new DownloadAndImportReplicator(conf, set, importer, moc); + replicatorRef.set(replicator); + + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer + .captureLogs(DownloadAndImportReplicator.LOG); + + // Acquire semaphore so that container import will pause after reserving space. + semaphore.acquire(); + CompletableFuture.runAsync(() -> { + try { + supervisor.addTask(createTask(containerId)); + } catch (Exception ex) { + } + }); + + // Wait such that first container import reserve space + GenericTestUtils.waitFor(() -> + vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - containerSize * 3, + 1000, 50000); + + // Volume has reserved space of 2 * containerSize + assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * containerSize); + // Container 2 import will fail as container 1 has reserved space and no space left to import new container + // New container import requires at least (2 * container size) + long containerId2 = 2; + supervisor.addTask(createTask(containerId2)); + GenericTestUtils.waitFor(() -> 1 == supervisor.getReplicationFailureCount(), + 1000, 50000); + assertThat(logCapturer.getOutput()).contains("No volumes have enough space for a new container"); + // Release semaphore so that first container import will pass + semaphore.release(); + GenericTestUtils.waitFor(() -> + 1 == supervisor.getReplicationSuccessCount(), 1000, 50000); + + usedSpace = vol1.getCurrentUsage().getUsedSpace(); + // After replication, volume used space should be increased by container used bytes + assertEquals(100, usedSpace); + + // Volume committed bytes should become initial committed bytes which was before replication + assertEquals(initialCommittedBytes, vol1.getCommittedBytes()); + + } + + + private File containerTarFile( + long containerId, ContainerData containerData) throws IOException { + File yamlFile = new File(tempDir, "container.yaml"); + ContainerDataYaml.createContainerFile(containerData, + yamlFile); + File tarFile = new File(tempDir, + ContainerUtils.getContainerTarName(containerId)); + try (OutputStream output = Files.newOutputStream(tarFile.toPath())) { + ArchiveOutputStream archive = new TarArchiveOutputStream(output); + TarArchiveEntry entry = archive.createArchiveEntry(yamlFile, + "container.yaml"); + archive.putArchiveEntry(entry); + try (InputStream input = Files.newInputStream(yamlFile.toPath())) { + IOUtils.copy(input, archive); + } + archive.closeArchiveEntry(); + } + return tarFile; + } + @ContainerLayoutTestInfo.ContainerTest public void testTaskBeyondDeadline(ContainerLayoutVersion layout) { this.layoutVersion = layout;