Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class AvailableSpaceFilter implements Predicate<HddsVolume> {
new HashMap<>();
private long mostAvailableSpace = Long.MIN_VALUE;

AvailableSpaceFilter(long requiredSpace) {
public AvailableSpaceFilter(long requiredSpace) {
this.requiredSpace = requiredSpace;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public void importContainer(long containerID, Path tarFilePath,
try (InputStream input = Files.newInputStream(tarFilePath)) {
Container container = controller.importContainer(
containerData, input, packer);
// After container import is successful, increase used space for the volume
targetVolume.incrementUsedSpace(container.getContainerData().getBytesUsed());
containerSet.addContainerByOverwriteMissingContainer(container);
}
} finally {
Expand Down Expand Up @@ -172,4 +174,7 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) {
return new TarContainerPacker(compression);
}

public long getDefaultContainerSize() {
return containerSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import java.nio.file.Path;
import java.util.List;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
import org.slf4j.Logger;
Expand All @@ -44,6 +47,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
private final ContainerDownloader downloader;
private final ContainerImporter containerImporter;
private final ContainerSet containerSet;
private final long containerSize;

public DownloadAndImportReplicator(
ConfigurationSource conf, ContainerSet containerSet,
Expand All @@ -53,6 +57,9 @@ public DownloadAndImportReplicator(
this.containerSet = containerSet;
this.downloader = downloader;
this.containerImporter = containerImporter;
containerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
}

@Override
Expand All @@ -70,9 +77,19 @@ public void replicate(ReplicationTask task) {

LOG.info("Starting replication of container {} from {} using {}",
containerID, sourceDatanodes, compression);
HddsVolume targetVolume = null;

try {
HddsVolume targetVolume = containerImporter.chooseNextVolume();
targetVolume = containerImporter.chooseNextVolume();
// Increment committed bytes and verify if it doesn't cross the space left.
targetVolume.incCommittedBytes(containerSize * 2);
// Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter
AvailableSpaceFilter filter = new AvailableSpaceFilter(0);
if (!filter.test(targetVolume)) {
LOG.warn("Container {} replication was unsuccessful, due to no space left", containerID);
task.setStatus(Status.FAILED);
return;
}
// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
Expand All @@ -95,6 +112,10 @@ public void replicate(ReplicationTask task) {
} catch (IOException e) {
LOG.error("Container {} replication was unsuccessful.", containerID, e);
task.setStatus(Status.FAILED);
} finally {
if (targetVolume != null) {
targetVolume.incCommittedBytes(-containerSize * 2);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.util.DiskChecker;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
Expand All @@ -50,7 +52,7 @@ class SendContainerRequestHandler
private long containerId = -1;
private long nextOffset;
private OutputStream output;
private HddsVolume volume;
private HddsVolume volume = null;
private Path path;
private CopyContainerCompression compression;
private final ZeroCopyMessageMarshaller<SendContainerRequest> marshaller;
Expand Down Expand Up @@ -85,6 +87,17 @@ public void onNext(SendContainerRequest req) {
if (containerId == -1) {
containerId = req.getContainerID();
volume = importer.chooseNextVolume();
// Increment committed bytes and verify if it doesn't cross the space left.
volume.incCommittedBytes(importer.getDefaultContainerSize() * 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test covering this? It seems to me the modification added in TestReplicationSupervisor only cover the changes in DownloadAndImportReplicator.

Just want to make sure, cause I'm adding more coverage of commit space on success and failure path with HDDS-12810

// Already committed bytes increased above, so required space is not required here in AvailableSpaceFilter
AvailableSpaceFilter filter = new AvailableSpaceFilter(0);
if (!filter.test(volume)) {
volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
LOG.warn("Container {} import was unsuccessful, due to no space left", containerId);
volume = null;
throw new DiskChecker.DiskOutOfSpaceException("No more available volumes");
}

Path dir = ContainerImporter.getUntarDirectory(volume);
Files.createDirectories(dir);
path = dir.resolve(ContainerUtils.getContainerTarName(containerId));
Expand All @@ -110,32 +123,44 @@ public void onNext(SendContainerRequest req) {

@Override
public void onError(Throwable t) {
LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t);
closeOutput();
deleteTarball();
responseObserver.onError(t);
try {
LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t);
closeOutput();
deleteTarball();
responseObserver.onError(t);
} finally {
if (volume != null) {
volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
}
}
}

@Override
public void onCompleted() {
if (output == null) {
LOG.warn("Received container without any parts");
return;
}

LOG.info("Container {} is downloaded with size {}, starting to import.",
containerId, nextOffset);
closeOutput();

try {
importer.importContainer(containerId, path, volume, compression);
LOG.info("Container {} is replicated successfully", containerId);
responseObserver.onNext(SendContainerResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (Throwable t) {
LOG.warn("Failed to import container {}", containerId, t);
deleteTarball();
responseObserver.onError(t);
if (output == null) {
LOG.warn("Received container without any parts");
return;
}

LOG.info("Container {} is downloaded with size {}, starting to import.",
containerId, nextOffset);
closeOutput();

try {
importer.importContainer(containerId, path, volume, compression);
LOG.info("Container {} is replicated successfully", containerId);
responseObserver.onNext(SendContainerResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (Throwable t) {
LOG.warn("Failed to import container {}", containerId, t);
deleteTarball();
responseObserver.onError(t);
}
} finally {
if (volume != null) {
volume.incCommittedBytes(-importer.getDefaultContainerSize() * 2);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import jakarta.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
Expand All @@ -52,31 +55,44 @@
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask;
Expand All @@ -100,6 +116,9 @@ public class TestReplicationSupervisor {

private static final long CURRENT_TERM = 1;

@TempDir
private File tempDir;

private final ContainerReplicator noopReplicator = task -> { };
private final ContainerReplicator throwingReplicator = task -> {
throw new RuntimeException("testing replication failure");
Expand Down Expand Up @@ -328,6 +347,126 @@ public void testDownloadAndImportReplicatorFailure(ContainerLayoutVersion layout
.contains("Container 1 replication was unsuccessful.");
}

@ContainerLayoutTestInfo.ContainerTest
public void testReplicationImportReserveSpace(ContainerLayoutVersion layout)
throws IOException, InterruptedException, TimeoutException {
this.layoutVersion = layout;
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, tempDir.getAbsolutePath());

long containerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);

ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.executor(newDirectExecutorService())
.clock(clock)
.build();

long containerId = 1;
// create container
KeyValueContainerData containerData = new KeyValueContainerData(containerId,
ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
HddsVolume vol = mock(HddsVolume.class);
containerData.setVolume(vol);
containerData.incrBytesUsed(100);
KeyValueContainer container = new KeyValueContainer(containerData, conf);
ContainerController controllerMock = mock(ContainerController.class);
Semaphore semaphore = new Semaphore(1);
when(controllerMock.importContainer(any(), any(), any()))
.thenAnswer((invocation) -> {
semaphore.acquire();
return container;
});
MutableVolumeSet volumeSet = new MutableVolumeSet(datanode.getUuidString(), conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
File tarFile = containerTarFile(containerId, containerData);

SimpleContainerDownloader moc =
mock(SimpleContainerDownloader.class);
when(
moc.getContainerDataFromReplicas(anyLong(), anyList(),
any(Path.class), any()))
.thenReturn(tarFile.toPath());

ContainerImporter importer =
new ContainerImporter(conf, set, controllerMock, volumeSet);

HddsVolume vol1 = (HddsVolume) volumeSet.getVolumesList().get(0);
// Initially volume has 0 commit space
assertEquals(0, vol1.getCommittedBytes());
long usedSpace = vol1.getCurrentUsage().getUsedSpace();
// Initially volume has 0 used space
assertEquals(0, usedSpace);
// Increase committed bytes so that volume has only remaining 3 times container size space
long initialCommittedBytes = vol1.getCurrentUsage().getCapacity() - containerSize * 3;
vol1.incCommittedBytes(initialCommittedBytes);
ContainerReplicator replicator =
new DownloadAndImportReplicator(conf, set, importer, moc);
replicatorRef.set(replicator);

GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(DownloadAndImportReplicator.LOG);

// Acquire semaphore so that container import will pause after reserving space.
semaphore.acquire();
CompletableFuture.runAsync(() -> {
try {
supervisor.addTask(createTask(containerId));
} catch (Exception ex) {
}
});

// Wait such that first container import reserve space
GenericTestUtils.waitFor(() ->
vol1.getCommittedBytes() > vol1.getCurrentUsage().getCapacity() - containerSize * 3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space should be moved from committed to used after this. Can we add a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for this.
Used space is not incremented during container import and it was just relied on DU which is 1 hour by default.
I have increased used space of volume after container import is successful.

1000, 50000);

// Volume has reserved space of 2 * containerSize
assertEquals(vol1.getCommittedBytes(), initialCommittedBytes + 2 * containerSize);
// Container 2 import will fail as container 1 has reserved space and no space left to import new container
// New container import requires at least (2 * container size)
long containerId2 = 2;
supervisor.addTask(createTask(containerId2));
GenericTestUtils.waitFor(() -> 1 == supervisor.getReplicationFailureCount(),
1000, 50000);
assertThat(logCapturer.getOutput()).contains("No volumes have enough space for a new container");
// Release semaphore so that first container import will pass
semaphore.release();
GenericTestUtils.waitFor(() ->
1 == supervisor.getReplicationSuccessCount(), 1000, 50000);

usedSpace = vol1.getCurrentUsage().getUsedSpace();
// After replication, volume used space should be increased by container used bytes
assertEquals(100, usedSpace);

// Volume committed bytes should become initial committed bytes which was before replication
assertEquals(initialCommittedBytes, vol1.getCommittedBytes());

}


private File containerTarFile(
long containerId, ContainerData containerData) throws IOException {
File yamlFile = new File(tempDir, "container.yaml");
ContainerDataYaml.createContainerFile(containerData,
yamlFile);
File tarFile = new File(tempDir,
ContainerUtils.getContainerTarName(containerId));
try (OutputStream output = Files.newOutputStream(tarFile.toPath())) {
ArchiveOutputStream<TarArchiveEntry> archive = new TarArchiveOutputStream(output);
TarArchiveEntry entry = archive.createArchiveEntry(yamlFile,
"container.yaml");
archive.putArchiveEntry(entry);
try (InputStream input = Files.newInputStream(yamlFile.toPath())) {
IOUtils.copy(input, archive);
}
archive.closeArchiveEntry();
}
return tarFile;
}

@ContainerLayoutTestInfo.ContainerTest
public void testTaskBeyondDeadline(ContainerLayoutVersion layout) {
this.layoutVersion = layout;
Expand Down