Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
new SimpleContainerDownloader(conf, certClient));
ContainerReplicator pushReplicator = new PushReplicator(conf,
new OnDemandContainerReplicationSource(container.getController()),
new GrpcContainerUploader(conf, certClient)
new GrpcContainerUploader(conf, certClient, container.getController())
);

pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class ContainerImporter {
private final ContainerController controller;
private final MutableVolumeSet volumeSet;
private final VolumeChoosingPolicy volumeChoosingPolicy;
private final long containerSize;
private final long defaultContainerSize;

private final Set<Long> importContainerProgress
= Collections.synchronizedSet(new HashSet<>());
Expand All @@ -76,7 +76,7 @@ public ContainerImporter(@Nonnull ConfigurationSource conf,
this.controller = controller;
this.volumeSet = volumeSet;
this.volumeChoosingPolicy = volumeChoosingPolicy;
containerSize = (long) conf.getStorageSize(
defaultContainerSize = (long) conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
this.conf = conf;
Expand Down Expand Up @@ -146,11 +146,12 @@ private static void deleteFileQuietely(Path tarFilePath) {
}
}

HddsVolume chooseNextVolume() throws IOException {
HddsVolume chooseNextVolume(long spaceToReserve) throws IOException {
// Choose volume that can hold both container in tmp and dest directory
LOG.debug("Choosing volume to reserve space : {}", spaceToReserve);
return volumeChoosingPolicy.chooseVolume(
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
getDefaultReplicationSpace());
spaceToReserve);
}

public static Path getUntarDirectory(HddsVolume hddsVolume)
Expand All @@ -174,6 +175,33 @@ protected TarContainerPacker getPacker(CopyContainerCompression compression) {
}

public long getDefaultReplicationSpace() {
return HddsServerUtil.requiredReplicationSpace(containerSize);
return HddsServerUtil.requiredReplicationSpace(defaultContainerSize);
}

/**
* Calculate required replication space based on actual container size.
*
* @param actualContainerSize the actual size of the container in bytes
* @return required space for replication (2 * actualContainerSize)
*/
public long getRequiredReplicationSpace(long actualContainerSize) {
return HddsServerUtil.requiredReplicationSpace(actualContainerSize);
}

/**
* Get space to reserve for replication. If replicateSize is provided,
* calculate required space based on that, otherwise return default
* replication space.
*
* @param replicateSize the size of the container to replicate in bytes
* (can be null)
* @return space to reserve for replication
*/
public long getSpaceToReserve(Long replicateSize) {
if (replicateSize != null) {
return getRequiredReplicationSpace(replicateSize);
} else {
return getDefaultReplicationSpace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public void replicate(ReplicationTask task) {
HddsVolume targetVolume = null;

try {
targetVolume = containerImporter.chooseNextVolume();
targetVolume = containerImporter.chooseNextVolume(
containerImporter.getDefaultReplicationSpace());

// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Path tarFilePath =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
Expand All @@ -45,17 +47,29 @@ public class GrpcContainerUploader implements ContainerUploader {

private final SecurityConfig securityConfig;
private final CertificateClient certClient;
private final ContainerController containerController;

public GrpcContainerUploader(
ConfigurationSource conf, CertificateClient certClient) {
ConfigurationSource conf, CertificateClient certClient,
ContainerController containerController) {
this.certClient = certClient;
this.containerController = containerController;
securityConfig = new SecurityConfig(conf);
}

@Override
public OutputStream startUpload(long containerId, DatanodeDetails target,
CompletableFuture<Void> callback, CopyContainerCompression compression)
throws IOException {
CompletableFuture<Void> callback, CopyContainerCompression compression) throws IOException {

// Get container size from local datanode instead of using passed replicateSize
Long containerSize = null;
Container<?> container = containerController.getContainer(containerId);
if (container != null) {
LOG.debug("Starting upload of container {} to {} with size {}",
containerId, target, container.getContainerData().getBytesUsed());
containerSize = container.getContainerData().getBytesUsed();
}

GrpcReplicationClient client = createReplicationClient(target, compression);
try {
// gRPC runtime always provides implementation of CallStreamObserver
Expand All @@ -68,7 +82,7 @@ public OutputStream startUpload(long containerId, DatanodeDetails target,
(CallStreamObserver<SendContainerRequest>) client.upload(
responseObserver), responseObserver);
return new SendContainerOutputStream(requestStream, containerId,
GrpcReplicationService.BUFFER_SIZE, compression) {
GrpcReplicationService.BUFFER_SIZE, compression, containerSize) {
@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,29 @@
class SendContainerOutputStream extends GrpcOutputStream<SendContainerRequest> {

private final CopyContainerCompression compression;
private final Long size;

SendContainerOutputStream(
CallStreamObserver<SendContainerRequest> streamObserver,
long containerId, int bufferSize, CopyContainerCompression compression) {
long containerId, int bufferSize, CopyContainerCompression compression,
Long size) {
super(streamObserver, containerId, bufferSize);
this.compression = compression;
this.size = size;
}

@Override
protected void sendPart(boolean eof, int length, ByteString data) {
SendContainerRequest request = SendContainerRequest.newBuilder()
SendContainerRequest.Builder requestBuilder = SendContainerRequest.newBuilder()
.setContainerID(getContainerId())
.setData(data)
.setOffset(getWrittenBytes())
.setCompression(compression.toProto())
.build();
getStreamObserver().onNext(request);
.setCompression(compression.toProto());

// Include container size in the first request
if (getWrittenBytes() == 0 && size != null) {
requestBuilder.setSize(size);
}
getStreamObserver().onNext(requestBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SendContainerRequestHandler
private Path path;
private CopyContainerCompression compression;
private final ZeroCopyMessageMarshaller<SendContainerRequest> marshaller;
private long spaceToReserve = 0;

SendContainerRequestHandler(
ContainerImporter importer,
Expand Down Expand Up @@ -84,7 +85,12 @@ public void onNext(SendContainerRequest req) {

if (containerId == -1) {
containerId = req.getContainerID();
volume = importer.chooseNextVolume();

// Use container size if available, otherwise fall back to default
spaceToReserve = importer.getSpaceToReserve(
req.hasSize() ? req.getSize() : null);

volume = importer.chooseNextVolume(spaceToReserve);

Path dir = ContainerImporter.getUntarDirectory(volume);
Files.createDirectories(dir);
Expand Down Expand Up @@ -117,8 +123,8 @@ public void onError(Throwable t) {
deleteTarball();
responseObserver.onError(t);
} finally {
if (volume != null) {
volume.incCommittedBytes(-importer.getDefaultReplicationSpace());
if (volume != null && spaceToReserve > 0) {
volume.incCommittedBytes(-spaceToReserve);
}
}
}
Expand Down Expand Up @@ -146,8 +152,8 @@ public void onCompleted() {
responseObserver.onError(t);
}
} finally {
if (volume != null) {
volume.incCommittedBytes(-importer.getDefaultReplicationSpace());
if (volume != null && spaceToReserve > 0) {
volume.incCommittedBytes(-spaceToReserve);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -116,8 +117,8 @@ void immediateError() throws Exception {

private static GrpcContainerUploader createSubject(
GrpcReplicationClient client) {

return new GrpcContainerUploader(new InMemoryConfiguration(), null) {
return new GrpcContainerUploader(new InMemoryConfiguration(), null,
mock(ContainerController.class)) {
@Override
protected GrpcReplicationClient createReplicationClient(
DatanodeDetails target, CopyContainerCompression compression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void init() throws Exception {
}).when(importer).importContainer(anyLong(), any(), any(), any());
doReturn(true).when(importer).isAllowedContainerImport(eq(
CONTAINER_ID));
when(importer.chooseNextVolume()).thenReturn(new HddsVolume.Builder(
when(importer.chooseNextVolume(anyLong())).thenReturn(new HddsVolume.Builder(
Files.createDirectory(tempDir.resolve("ImporterDir")).toString()).conf(
conf).build());

Expand Down Expand Up @@ -193,7 +193,7 @@ public void testUpload() {
ContainerReplicationSource source =
new OnDemandContainerReplicationSource(containerController);

GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null);
GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null, containerController);

PushReplicator pushReplicator = new PushReplicator(conf, source, uploader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ class TestSendContainerOutputStream
@Override
protected OutputStream createSubject() {
return new SendContainerOutputStream(getObserver(),
getContainerId(), getBufferSize(), NO_COMPRESSION);
getContainerId(), getBufferSize(), NO_COMPRESSION, null);
}

@ParameterizedTest
@EnumSource
void usesCompression(CopyContainerCompression compression) throws Exception {
OutputStream subject = new SendContainerOutputStream(
getObserver(), getContainerId(), getBufferSize(), compression);
getObserver(), getContainerId(), getBufferSize(), compression, null);

byte[] bytes = getRandomBytes(16);
subject.write(bytes, 0, bytes.length);
Expand Down
Loading