-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-12235. Reserve space on DN during container import operation. #7981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
22b5d41
cdeae20
34894d8
c829ef9
f8f650c
af50c25
55c8835
7dca4f8
4be1147
5d10a21
822abaa
dce8897
6319b80
29f1cae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,9 @@ | |
| import java.nio.file.Path; | ||
| import java.util.List; | ||
| import org.apache.hadoop.hdds.conf.ConfigurationSource; | ||
| import org.apache.hadoop.hdds.conf.StorageUnit; | ||
| import org.apache.hadoop.hdds.protocol.DatanodeDetails; | ||
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; | ||
| import org.apache.hadoop.ozone.container.common.impl.ContainerSet; | ||
| import org.apache.hadoop.ozone.container.common.volume.HddsVolume; | ||
| import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; | ||
|
|
@@ -44,6 +46,7 @@ public class DownloadAndImportReplicator implements ContainerReplicator { | |
| private final ContainerDownloader downloader; | ||
| private final ContainerImporter containerImporter; | ||
| private final ContainerSet containerSet; | ||
| private final long containerSize; | ||
|
|
||
| public DownloadAndImportReplicator( | ||
| ConfigurationSource conf, ContainerSet containerSet, | ||
|
|
@@ -53,6 +56,9 @@ public DownloadAndImportReplicator( | |
| this.containerSet = containerSet; | ||
| this.downloader = downloader; | ||
| this.containerImporter = containerImporter; | ||
| containerSize = (long) conf.getStorageSize( | ||
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, | ||
| ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -70,9 +76,16 @@ public void replicate(ReplicationTask task) { | |
|
|
||
| LOG.info("Starting replication of container {} from {} using {}", | ||
| containerID, sourceDatanodes, compression); | ||
| HddsVolume targetVolume = null; | ||
|
|
||
| try { | ||
| HddsVolume targetVolume = containerImporter.chooseNextVolume(); | ||
| targetVolume = containerImporter.chooseNextVolume(); | ||
errose28 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| targetVolume.incImportContainerCommitBytes(containerSize * 2); | ||
|
||
| if (targetVolume.getCurrentUsage().getAvailable() - targetVolume.getCommittedBytes() | ||
|
||
| - targetVolume.getContainerImportCommittedBytes() <= 0) { | ||
| task.setStatus(Status.FAILED); | ||
ashishkumar50 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return; | ||
| } | ||
| // Wait for the download. This thread pool is limiting the parallel | ||
| // downloads, so it's ok to block here and wait for the full download. | ||
| Path tarFilePath = | ||
|
|
@@ -95,6 +108,10 @@ public void replicate(ReplicationTask task) { | |
| } catch (IOException e) { | ||
| LOG.error("Container {} replication was unsuccessful.", containerID, e); | ||
| task.setStatus(Status.FAILED); | ||
| } finally { | ||
| if (targetVolume != null) { | ||
| targetVolume.incImportContainerCommitBytes(-containerSize * 2); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import org.apache.hadoop.hdds.utils.IOUtils; | ||
| import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; | ||
| import org.apache.hadoop.ozone.container.common.volume.HddsVolume; | ||
| import org.apache.hadoop.util.DiskChecker; | ||
| import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; | ||
| import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -50,7 +51,7 @@ class SendContainerRequestHandler | |
| private long containerId = -1; | ||
| private long nextOffset; | ||
| private OutputStream output; | ||
| private HddsVolume volume; | ||
| private HddsVolume volume = null; | ||
| private Path path; | ||
| private CopyContainerCompression compression; | ||
| private final ZeroCopyMessageMarshaller<SendContainerRequest> marshaller; | ||
|
|
@@ -85,6 +86,14 @@ public void onNext(SendContainerRequest req) { | |
| if (containerId == -1) { | ||
| containerId = req.getContainerID(); | ||
| volume = importer.chooseNextVolume(); | ||
| volume.incImportContainerCommitBytes(importer.getDefaultContainerSize() * 2); | ||
| if (volume.getCurrentUsage().getAvailable() - volume.getCommittedBytes() | ||
| - volume.getContainerImportCommittedBytes() <= 0) { | ||
| volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); | ||
| volume = null; | ||
| throw new DiskChecker.DiskOutOfSpaceException("No more available volumes"); | ||
| } | ||
|
|
||
| Path dir = ContainerImporter.getUntarDirectory(volume); | ||
| Files.createDirectories(dir); | ||
| path = dir.resolve(ContainerUtils.getContainerTarName(containerId)); | ||
|
|
@@ -110,32 +119,44 @@ public void onNext(SendContainerRequest req) { | |
|
|
||
| @Override | ||
| public void onError(Throwable t) { | ||
| LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t); | ||
| closeOutput(); | ||
| deleteTarball(); | ||
| responseObserver.onError(t); | ||
| try { | ||
| LOG.warn("Error receiving container {} at {}", containerId, nextOffset, t); | ||
| closeOutput(); | ||
| deleteTarball(); | ||
| responseObserver.onError(t); | ||
| } finally { | ||
| if (volume != null) { | ||
| volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onCompleted() { | ||
| if (output == null) { | ||
| LOG.warn("Received container without any parts"); | ||
| return; | ||
| } | ||
|
|
||
| LOG.info("Container {} is downloaded with size {}, starting to import.", | ||
| containerId, nextOffset); | ||
| closeOutput(); | ||
|
|
||
| try { | ||
| importer.importContainer(containerId, path, volume, compression); | ||
| LOG.info("Container {} is replicated successfully", containerId); | ||
| responseObserver.onNext(SendContainerResponse.newBuilder().build()); | ||
| responseObserver.onCompleted(); | ||
| } catch (Throwable t) { | ||
| LOG.warn("Failed to import container {}", containerId, t); | ||
| deleteTarball(); | ||
| responseObserver.onError(t); | ||
| if (output == null) { | ||
| LOG.warn("Received container without any parts"); | ||
| return; | ||
| } | ||
|
|
||
| LOG.info("Container {} is downloaded with size {}, starting to import.", | ||
| containerId, nextOffset); | ||
| closeOutput(); | ||
|
|
||
| try { | ||
| importer.importContainer(containerId, path, volume, compression); | ||
| LOG.info("Container {} is replicated successfully", containerId); | ||
| responseObserver.onNext(SendContainerResponse.newBuilder().build()); | ||
| responseObserver.onCompleted(); | ||
| } catch (Throwable t) { | ||
| LOG.warn("Failed to import container {}", containerId, t); | ||
| deleteTarball(); | ||
| responseObserver.onError(t); | ||
| } | ||
| } finally { | ||
| if (volume != null) { | ||
| volume.incImportContainerCommitBytes(-importer.getDefaultContainerSize() * 2); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reuse the committedBytes as there is no difference from HddsVolume point of view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done