diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 5275c047d43b..1bd888c84a61 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -206,7 +206,7 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, new SimpleContainerDownloader(conf, certClient)); ContainerReplicator pushReplicator = new PushReplicator(conf, new OnDemandContainerReplicationSource(container.getController()), - new GrpcContainerUploader(conf, certClient) + new GrpcContainerUploader(conf, certClient, container.getController()) ); pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull"); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index 66aa19d5580a..7b42006b2293 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -60,7 +60,7 @@ public class ContainerImporter { private final ContainerController controller; private final MutableVolumeSet volumeSet; private final VolumeChoosingPolicy volumeChoosingPolicy; - private final long containerSize; + private final long defaultContainerSize; private final Set importContainerProgress = Collections.synchronizedSet(new HashSet<>()); @@ -76,7 +76,7 @@ public ContainerImporter(@Nonnull ConfigurationSource conf, this.controller = controller; this.volumeSet = volumeSet; this.volumeChoosingPolicy = volumeChoosingPolicy; - containerSize = (long) conf.getStorageSize( + defaultContainerSize = (long) conf.getStorageSize( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); this.conf = conf; @@ -146,11 +146,12 @@ private static void deleteFileQuietely(Path tarFilePath) { } } - HddsVolume chooseNextVolume() throws IOException { + HddsVolume chooseNextVolume(long spaceToReserve) throws IOException { // Choose volume that can hold both container in tmp and dest directory + LOG.debug("Choosing volume to reserve space : {}", spaceToReserve); return volumeChoosingPolicy.chooseVolume( StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()), - getDefaultReplicationSpace()); + spaceToReserve); } public static Path getUntarDirectory(HddsVolume hddsVolume) @@ -174,6 +175,33 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) { } public long getDefaultReplicationSpace() { - return HddsServerUtil.requiredReplicationSpace(containerSize); + return HddsServerUtil.requiredReplicationSpace(defaultContainerSize); + } + + /** + * Calculate required replication space based on actual container size. + * + * @param actualContainerSize the actual size of the container in bytes + * @return required space for replication (2 * actualContainerSize) + */ + public long getRequiredReplicationSpace(long actualContainerSize) { + return HddsServerUtil.requiredReplicationSpace(actualContainerSize); + } + + /** + * Get space to reserve for replication. If replicateSize is provided, + * calculate required space based on that, otherwise return default + * replication space. + * + * @param replicateSize the size of the container to replicate in bytes + * (can be null) + * @return space to reserve for replication + */ + public long getSpaceToReserve(Long replicateSize) { + if (replicateSize != null) { + return getRequiredReplicationSpace(replicateSize); + } else { + return getDefaultReplicationSpace(); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 240ba9473d3d..2457b592b141 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -73,7 +73,9 @@ public void replicate(ReplicationTask task) { HddsVolume targetVolume = null; try { - targetVolume = containerImporter.chooseNextVolume(); + targetVolume = containerImporter.chooseNextVolume( + containerImporter.getDefaultReplicationSpace()); + // Wait for the download. This thread pool is limiting the parallel // downloads, so it's ok to block here and wait for the full download. Path tarFilePath = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java index bf381e3715be..64adcb6c6168 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.security.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; @@ -45,17 +47,29 @@ public class GrpcContainerUploader implements ContainerUploader { private final SecurityConfig securityConfig; private final CertificateClient certClient; + private final ContainerController containerController; public GrpcContainerUploader( - ConfigurationSource conf, CertificateClient certClient) { + ConfigurationSource conf, CertificateClient certClient, + ContainerController containerController) { this.certClient = certClient; + this.containerController = containerController; securityConfig = new SecurityConfig(conf); } @Override public OutputStream startUpload(long containerId, DatanodeDetails target, - CompletableFuture callback, CopyContainerCompression compression) - throws IOException { + CompletableFuture callback, CopyContainerCompression compression) throws IOException { + + // Get container size from local datanode instead of using passed replicateSize + Long containerSize = null; + Container container = containerController.getContainer(containerId); + if (container != null) { + LOG.debug("Starting upload of container {} to {} with size {}", + containerId, target, container.getContainerData().getBytesUsed()); + containerSize = container.getContainerData().getBytesUsed(); + } + GrpcReplicationClient client = createReplicationClient(target, compression); try { // gRPC runtime always provides implementation of CallStreamObserver @@ -68,7 +82,7 @@ public OutputStream startUpload(long containerId, DatanodeDetails target, (CallStreamObserver) client.upload( responseObserver), responseObserver); return new SendContainerOutputStream(requestStream, containerId, - GrpcReplicationService.BUFFER_SIZE, compression) { + GrpcReplicationService.BUFFER_SIZE, compression, containerSize) { @Override public void close() throws IOException { try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java index 5824e2d9dfa8..3bb7e463d9d3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java @@ -27,22 +27,29 @@ class SendContainerOutputStream extends GrpcOutputStream { private final CopyContainerCompression compression; + private final Long size; SendContainerOutputStream( CallStreamObserver streamObserver, - long containerId, int bufferSize, CopyContainerCompression compression) { + long containerId, int bufferSize, CopyContainerCompression compression, + Long size) { super(streamObserver, containerId, bufferSize); this.compression = compression; + this.size = size; } @Override protected void sendPart(boolean eof, int length, ByteString data) { - SendContainerRequest request = SendContainerRequest.newBuilder() + SendContainerRequest.Builder requestBuilder = SendContainerRequest.newBuilder() .setContainerID(getContainerId()) .setData(data) .setOffset(getWrittenBytes()) - .setCompression(compression.toProto()) - .build(); - getStreamObserver().onNext(request); + .setCompression(compression.toProto()); + + // Include container size in the first request + if (getWrittenBytes() == 0 && size != null) { + requestBuilder.setSize(size); + } + getStreamObserver().onNext(requestBuilder.build()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java index 9cb07a21c5dc..0824341127c3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java @@ -54,6 +54,7 @@ class SendContainerRequestHandler private Path path; private CopyContainerCompression compression; private final ZeroCopyMessageMarshaller marshaller; + private long spaceToReserve = 0; SendContainerRequestHandler( ContainerImporter importer, @@ -84,7 +85,12 @@ public void onNext(SendContainerRequest req) { if (containerId == -1) { containerId = req.getContainerID(); - volume = importer.chooseNextVolume(); + + // Use container size if available, otherwise fall back to default + spaceToReserve = importer.getSpaceToReserve( + req.hasSize() ? req.getSize() : null); + + volume = importer.chooseNextVolume(spaceToReserve); Path dir = ContainerImporter.getUntarDirectory(volume); Files.createDirectories(dir); @@ -117,8 +123,8 @@ public void onError(Throwable t) { deleteTarball(); responseObserver.onError(t); } finally { - if (volume != null) { - volume.incCommittedBytes(-importer.getDefaultReplicationSpace()); + if (volume != null && spaceToReserve > 0) { + volume.incCommittedBytes(-spaceToReserve); } } } @@ -146,8 +152,8 @@ public void onCompleted() { responseObserver.onError(t); } } finally { - if (volume != null) { - volume.incCommittedBytes(-importer.getDefaultReplicationSpace()); + if (volume != null && spaceToReserve > 0) { + volume.incCommittedBytes(-spaceToReserve); } } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java index b8df5c18e8c9..b10b412b12f0 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.junit.jupiter.api.Test; @@ -116,8 +117,8 @@ void immediateError() throws Exception { private static GrpcContainerUploader createSubject( GrpcReplicationClient client) { - - return new GrpcContainerUploader(new InMemoryConfiguration(), null) { + return new GrpcContainerUploader(new InMemoryConfiguration(), null, + mock(ContainerController.class)) { @Override protected GrpcReplicationClient createReplicationClient( DatanodeDetails target, CopyContainerCompression compression) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java index 8d0ed0401b71..8b831fa06466 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java @@ -150,7 +150,7 @@ public void init() throws Exception { }).when(importer).importContainer(anyLong(), any(), any(), any()); doReturn(true).when(importer).isAllowedContainerImport(eq( CONTAINER_ID)); - when(importer.chooseNextVolume()).thenReturn(new HddsVolume.Builder( + when(importer.chooseNextVolume(anyLong())).thenReturn(new HddsVolume.Builder( Files.createDirectory(tempDir.resolve("ImporterDir")).toString()).conf( conf).build()); @@ -193,7 +193,7 @@ public void testUpload() { ContainerReplicationSource source = new OnDemandContainerReplicationSource(containerController); - GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null); + GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null, containerController); PushReplicator pushReplicator = new PushReplicator(conf, source, uploader); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java index c688b6495103..716bf4d3aebc 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerOutputStream.java @@ -40,14 +40,14 @@ class TestSendContainerOutputStream @Override protected OutputStream createSubject() { return new SendContainerOutputStream(getObserver(), - getContainerId(), getBufferSize(), NO_COMPRESSION); + getContainerId(), getBufferSize(), NO_COMPRESSION, null); } @ParameterizedTest @EnumSource void usesCompression(CopyContainerCompression compression) throws Exception { OutputStream subject = new SendContainerOutputStream( - getObserver(), getContainerId(), getBufferSize(), compression); + getObserver(), getContainerId(), getBufferSize(), compression, null); byte[] bytes = getRandomBytes(16); subject.write(bytes, 0, bytes.length); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java index 961ed9b48c73..4fb801532f0b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; @@ -31,6 +32,7 @@ import java.io.File; import java.io.IOException; +import java.util.stream.Stream; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -51,6 +53,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; /** * Test for {@link SendContainerRequestHandler}. @@ -87,6 +92,18 @@ void setup() throws IOException { ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); } + /** + * Provides stream of different container sizes for tests. + */ + public static Stream sizeProvider() { + return Stream.of( + Arguments.of("Null replicate size (fallback to default)", null), + Arguments.of("Zero Size", 0L), + Arguments.of("Normal 2GB", 2L * 1024L * 1024L * 1024L), + Arguments.of("Overallocated 20GB", 20L * 1024L * 1024L * 1024L) + ); + } + @Test void testReceiveDataForExistingContainer() throws Exception { long containerId = 1; @@ -104,36 +121,27 @@ void testReceiveDataForExistingContainer() throws Exception { ((StorageContainerException) arg).getResult()); return null; }).when(responseObserver).onError(any()); - ByteString data = ByteString.copyFromUtf8("test"); - ContainerProtos.SendContainerRequest request - = ContainerProtos.SendContainerRequest.newBuilder() - .setContainerID(containerId) - .setData(data) - .setOffset(0) - .setCompression(NO_COMPRESSION.toProto()) - .build(); - sendContainerRequestHandler.onNext(request); + + sendContainerRequestHandler.onNext(createRequest(containerId, + ByteString.copyFromUtf8("test"), 0, null)); } - @Test - public void testSpaceReservedAndReleasedWhenRequestCompleted() throws Exception { + @ParameterizedTest(name = "for {0}") + @MethodSource("sizeProvider") + public void testSpaceReservedAndReleasedWhenRequestCompleted(String testName, Long size) throws Exception { long containerId = 1; HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); long initialCommittedBytes = volume.getCommittedBytes(); + long expectedReservedSpace = size != null ? + importer.getRequiredReplicationSpace(size) : + importer.getDefaultReplicationSpace(); - // Create request - ContainerProtos.SendContainerRequest request = ContainerProtos.SendContainerRequest.newBuilder() - .setContainerID(containerId) - .setData(ByteString.EMPTY) - .setOffset(0) - .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto()) - .build(); - - // Execute request - sendContainerRequestHandler.onNext(request); + // Create and execute the first request to reserve space + sendContainerRequestHandler.onNext( + createRequest(containerId, ByteString.EMPTY, 0, size)); // Verify commit space is reserved - assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + expectedReservedSpace); // complete the request sendContainerRequestHandler.onCompleted(); @@ -142,44 +150,50 @@ public void testSpaceReservedAndReleasedWhenRequestCompleted() throws Exception assertEquals(volume.getCommittedBytes(), initialCommittedBytes); } - @Test - public void testSpaceReservedAndReleasedWhenOnNextFails() throws Exception { + @ParameterizedTest(name = "for {0}") + @MethodSource("sizeProvider") + public void testSpaceReservedAndReleasedWhenOnNextFails(String testName, Long size) throws Exception { long containerId = 1; HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); long initialCommittedBytes = volume.getCommittedBytes(); + long expectedReservedSpace = size != null ? + importer.getRequiredReplicationSpace(size) : + importer.getDefaultReplicationSpace(); - // Create request - ContainerProtos.SendContainerRequest request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); - - // Execute request - sendContainerRequestHandler.onNext(request); + ByteString data = ByteString.copyFromUtf8("test"); + // Execute first request to reserve space + sendContainerRequestHandler.onNext( + createRequest(containerId, data, 0, size)); // Verify commit space is reserved - assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + expectedReservedSpace); // mock the importer is not allowed to import this container when(importer.isAllowedContainerImport(containerId)).thenReturn(false); - sendContainerRequestHandler.onNext(request); + sendContainerRequestHandler.onNext(createRequest(containerId, data, 0, + size)); // Verify commit space is released assertEquals(volume.getCommittedBytes(), initialCommittedBytes); } - @Test - public void testSpaceReservedAndReleasedWhenOnCompletedFails() throws Exception { + @ParameterizedTest(name = "for {0}") + @MethodSource("sizeProvider") + public void testSpaceReservedAndReleasedWhenOnCompletedFails(String testName, Long size) throws Exception { long containerId = 1; HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); long initialCommittedBytes = volume.getCommittedBytes(); - - // Create request - ContainerProtos.SendContainerRequest request = createRequest(containerId, ByteString.copyFromUtf8("test"), 0); + long expectedReservedSpace = size != null ? + importer.getRequiredReplicationSpace(size) : + importer.getDefaultReplicationSpace(); // Execute request - sendContainerRequestHandler.onNext(request); + sendContainerRequestHandler.onNext(createRequest(containerId, + ByteString.copyFromUtf8("test"), 0, size)); // Verify commit space is reserved - assertEquals(volume.getCommittedBytes(), initialCommittedBytes + 2 * containerMaxSize); + assertEquals(volume.getCommittedBytes(), initialCommittedBytes + expectedReservedSpace); doThrow(new IOException("Failed")).when(importer).importContainer(anyLong(), any(), any(), any()); @@ -189,12 +203,51 @@ public void testSpaceReservedAndReleasedWhenOnCompletedFails() throws Exception assertEquals(volume.getCommittedBytes(), initialCommittedBytes); } - private ContainerProtos.SendContainerRequest createRequest(long containerId, ByteString data, int offset) { - return ContainerProtos.SendContainerRequest.newBuilder() - .setContainerID(containerId) - .setData(data) - .setOffset(offset) - .setCompression(CopyContainerCompression.NO_COMPRESSION.toProto()) - .build(); + /** + * Test that verifies the actual space calculation difference between + * overallocated containers and default containers. + */ + @Test + public void testOverAllocatedReservesMoreSpace() { + long containerId1 = 1; + long containerId2 = 2; + long overallocatedSize = containerMaxSize * 2; // 10GB + HddsVolume volume = (HddsVolume) volumeSet.getVolumesList().get(0); + long initialCommittedBytes = volume.getCommittedBytes(); + // Test overallocated container (10GB) + SendContainerRequestHandler handler1 = new SendContainerRequestHandler(importer, responseObserver, null); + handler1.onNext(createRequest(containerId1, ByteString.EMPTY, 0, overallocatedSize)); + + long overallocatedReservation = volume.getCommittedBytes() - initialCommittedBytes; + handler1.onCompleted(); // Release space + + // Test default container (null size) + SendContainerRequestHandler handler2 = new SendContainerRequestHandler(importer, responseObserver, null); + handler2.onNext(createRequest(containerId2, ByteString.EMPTY, 0, null)); + + long defaultReservation = volume.getCommittedBytes() - initialCommittedBytes; + handler2.onCompleted(); // Release space + + // Verify overallocated container reserves more space + assertTrue(overallocatedReservation > defaultReservation); + + // Verify specific calculations + assertEquals(2 * overallocatedSize, overallocatedReservation); + assertEquals(2 * containerMaxSize, defaultReservation); + } + + private ContainerProtos.SendContainerRequest createRequest( + long containerId, ByteString data, int offset, Long size) { + ContainerProtos.SendContainerRequest.Builder builder = + ContainerProtos.SendContainerRequest.newBuilder() + .setContainerID(containerId) + .setData(data) + .setOffset(offset) + .setCompression(NO_COMPRESSION.toProto()); + + if (size != null) { + builder.setSize(size); + } + return builder.build(); } } diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index bdba99cffd08..99fcc6e271a6 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -549,6 +549,7 @@ message SendContainerRequest { required bytes data = 3; optional int64 checksum = 4; optional CopyContainerCompressProto compression = 5; + optional int64 size = 6; } message SendContainerResponse { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java index 68fecdb52d3e..968e331103e3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hdds.scm.pipeline.MockPipeline.createPipeline; import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.createContainer; import static org.apache.ozone.test.GenericTestUtils.waitFor; +import static org.junit.jupiter.api.Assertions.assertEquals; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -34,9 +35,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.ToLongFunction; import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; @@ -44,14 +50,22 @@ import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.event.Level; /** * Tests ozone containers replication. @@ -157,6 +171,70 @@ void pushUnknownContainer() throws Exception { ReplicationSupervisor::getReplicationFailureCount); } + /** + * Provides stream of different container sizes for tests. + */ + public static Stream sizeProvider() { + return Stream.of( + Arguments.of("Normal 2MB", 2L * 1024L * 1024L), + Arguments.of("Overallocated 6MB", 6L * 1024L * 1024L) + ); + } + + /** + * Tests push replication of a container with over-allocated size. + * The target datanode will need to reserve double the container size, + * which is greater than the configured max container size. + */ + @ParameterizedTest(name = "for {0}") + @MethodSource("sizeProvider") + void testPushWithOverAllocatedContainer(String testName, Long containerSize) + throws Exception { + GenericTestUtils.setLogLevel(GrpcContainerUploader.class, Level.DEBUG); + GenericTestUtils.setLogLevel(ContainerImporter.class, Level.DEBUG); + LogCapturer grpcLog = LogCapturer.captureLogs(GrpcContainerUploader.class); + LogCapturer containerImporterLog = LogCapturer.captureLogs(ContainerImporter.class); + + DatanodeDetails source = cluster.getHddsDatanodes().get(0) + .getDatanodeDetails(); + + long containerID = createOverAllocatedContainer(source, containerSize); + + DatanodeDetails target = selectOtherNode(source); + + // Get the original container size from source + Container sourceContainer = getContainer(source, containerID); + long originalSize = sourceContainer.getContainerData().getBytesUsed(); + + // Verify container is created with expected size + assertEquals(originalSize, containerSize); + + // Create replication command to push container to target + ReplicateContainerCommand cmd = + ReplicateContainerCommand.toTarget(containerID, target); + + // Execute push replication + queueAndWaitForCompletion(cmd, source, + ReplicationSupervisor::getReplicationSuccessCount); + + GenericTestUtils.waitFor(() -> { + String grpcLogs = grpcLog.getOutput(); + String containerImporterLogOutput = containerImporterLog.getOutput(); + + return grpcLogs.contains("Starting upload of container " + + containerID + " to " + target + " with size " + originalSize) && + containerImporterLogOutput.contains("Choosing volume to reserve space : " + + originalSize * 2); + }, 100, 1000); + + // Verify container was successfully replicated to target + Container targetContainer = getContainer(target, containerID); + long replicatedSize = targetContainer.getContainerData().getBytesUsed(); + + // verify sizes match exactly + assertEquals(originalSize, replicatedSize); + } + /** * Queues {@code cmd} in {@code dn}'s state machine, and waits until the * command is completed, as indicated by {@code counter} having been @@ -194,6 +272,8 @@ private static OzoneConfiguration createConfiguration() { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS); + conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, 5, StorageUnit.MB); + conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 1, StorageUnit.MB); ReplicationManagerConfiguration repConf = conf.getObject(ReplicationManagerConfiguration.class); @@ -212,4 +292,71 @@ private static long createNewClosedContainer(DatanodeDetails dn) } } + private static long createOverAllocatedContainer(DatanodeDetails dn, Long targetDataSize) throws Exception { + long containerID = CONTAINER_ID.incrementAndGet(); + try (XceiverClientSpi client = clientFactory.acquireClient( + createPipeline(singleton(dn)))) { + + // Create the container + createContainer(client, containerID, null); + + int chunkSize = 1 * 1024 * 1024; // 1MB chunks + long totalBytesWritten = 0; + + // Write data in chunks until we reach target size + while (totalBytesWritten < targetDataSize) { + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); + + // Calculate remaining bytes and adjust chunk size if needed + long remainingBytes = targetDataSize - totalBytesWritten; + int currentChunkSize = (int) Math.min(chunkSize, remainingBytes); + + // Create a write chunk request with current chunk size + ContainerProtos.ContainerCommandRequestProto writeChunkRequest = + ContainerTestHelper.getWriteChunkRequest( + createPipeline(singleton(dn)), blockID, currentChunkSize); + + // Send write chunk command + client.sendCommand(writeChunkRequest); + + // Create and send put block command + ContainerProtos.ContainerCommandRequestProto putBlockRequest = + ContainerTestHelper.getPutBlockRequest(writeChunkRequest); + client.sendCommand(putBlockRequest); + + totalBytesWritten += currentChunkSize; + } + + // Close the container + ContainerProtos.CloseContainerRequestProto closeRequest = + ContainerProtos.CloseContainerRequestProto.newBuilder().build(); + ContainerProtos.ContainerCommandRequestProto closeContainerRequest = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.CloseContainer) + .setContainerID(containerID) + .setCloseContainer(closeRequest) + .setDatanodeUuid(dn.getUuidString()) + .build(); + client.sendCommand(closeContainerRequest); + + return containerID; + } + } + + /** + * Gets the container from the specified datanode. + */ + private Container getContainer(DatanodeDetails datanode, long containerID) { + for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) { + if (datanode.equals(datanodeService.getDatanodeDetails())) { + Container container = datanodeService.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID); + if (container != null) { + return container; + } + } + } + throw new AssertionError("Container " + containerID + " not found on " + datanode); + } + }