diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 1e9e64749a09..6becf62eaaa2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -44,6 +44,7 @@ public class ReplicationSupervisor { private final ContainerSet containerSet; private final ContainerReplicator replicator; private final ExecutorService executor; + private final AtomicLong requestCounter = new AtomicLong(); private final AtomicLong successCounter = new AtomicLong(); private final AtomicLong failureCounter = new AtomicLong(); @@ -116,10 +117,10 @@ int getInFlightReplications() { return containersInFlight.size(); } - private final class TaskRunner implements Runnable { + public final class TaskRunner implements Runnable { private final ReplicationTask task; - private TaskRunner(ReplicationTask task) { + public TaskRunner(ReplicationTask task) { this.task = task; } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java index 04bc95043df5..bee6e65f2783 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/OzoneClientProducer.java @@ -25,7 +25,6 @@ import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ozone.OzoneSecurityUtil; @@ -36,6 +35,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import com.google.common.annotations.VisibleForTesting; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMTokenProto.Type.S3AUTHINFO; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INTERNAL_ERROR; diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java index 8bd410b55a15..65096a6cf641 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java @@ -491,4 +491,12 @@ protected OzoneClient createOzoneClient(String omServiceID, return OzoneClientFactory.getRpcClient(conf); } } + + public void setTestNo(long testNo) { + this.testNo = testNo; + } + + public void setThreadNo(int threadNo) { + this.threadNo = threadNo; + } } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java new file mode 100644 index 000000000000..ad2810a21660 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.freon; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerSet; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.replication.ContainerReplicator; +import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator; +import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; +import org.apache.hadoop.ozone.container.replication.ReplicationTask; +import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; + +import com.codahale.metrics.Timer; +import org.jetbrains.annotations.NotNull; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; + +/** + * Utility to replicated closed container with datanode code. + */ +@Command(name = "cr", + aliases = "container-replicator", + description = "Replicate / download closed containers.", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) +public class ClosedContainerReplicator extends BaseFreonGenerator implements + Callable { + + @Option(names = {"--datanode"}, + description = "Replicate only containers on this specific datanode.", + defaultValue = "") + private String datanode; + + private ReplicationSupervisor supervisor; + + private Timer timer; + + private List replicationTasks; + + @Override + public Void call() throws Exception { + + OzoneConfiguration conf = createOzoneConfiguration(); + + final Collection datanodeStorageDirs = + MutableVolumeSet.getDatanodeStorageDirs(conf); + + for (String dir : datanodeStorageDirs) { + checkDestinationDirectory(dir); + } + + //logic same as the download+import on the destination datanode + initializeReplicationSupervisor(conf); + + final ContainerOperationClient containerOperationClient = + new ContainerOperationClient(conf); + + final List containerInfos = + containerOperationClient.listContainer(0L, 1_000_000); + + replicationTasks = new ArrayList<>(); + + for (ContainerInfo container : containerInfos) { + + final ContainerWithPipeline containerWithPipeline = + containerOperationClient + .getContainerWithPipeline(container.getContainerID()); + + if (container.getState() == LifeCycleState.CLOSED) { + + final List datanodesWithContainer = + containerWithPipeline.getPipeline().getNodes(); + + final List datanodeUUIDs = + datanodesWithContainer + .stream().map(DatanodeDetails::getUuidString) + .collect(Collectors.toList()); + + //if datanode is specified, replicate only container if it has a + //replica. + if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) { + replicationTasks.add(new ReplicationTask(container.getContainerID(), + datanodesWithContainer)); + } + } + + } + + //important: override the max number of tasks. + setTestNo(replicationTasks.size()); + + init(); + + timer = getMetrics().timer("replicate-container"); + runTests(this::replicateContainer); + return null; + } + + /** + * Check id target directory is not re-used. + */ + private void checkDestinationDirectory(String dirUrl) throws IOException { + final StorageLocation storageLocation = StorageLocation.parse(dirUrl); + final Path dirPath = Paths.get(storageLocation.getUri().getPath()); + + if (Files.notExists(dirPath)) { + return; + } + + if (Files.list(dirPath).count() == 0) { + return; + } + + throw new IllegalArgumentException( + "Configured storage directory " + dirUrl + + " (used as destination) should be empty"); + } + + @NotNull + private void initializeReplicationSupervisor(ConfigurationSource conf) + throws IOException { + String fakeDatanodeUuid = datanode; + + if (fakeDatanodeUuid.isEmpty()) { + fakeDatanodeUuid = UUID.randomUUID().toString(); + } + + ContainerSet containerSet = new ContainerSet(); + + ContainerMetrics metrics = ContainerMetrics.create(conf); + + MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf); + + Map handlers = new HashMap<>(); + + for (ContainerType containerType : ContainerType.values()) { + final Handler handler = + Handler.getHandlerForContainerType( + containerType, + conf, + fakeDatanodeUuid, + containerSet, + volumeSet, + metrics, + containerReplicaProto -> { + }); + handler.setScmID(UUID.randomUUID().toString()); + handlers.put(containerType, handler); + } + + ContainerController controller = + new ContainerController(containerSet, handlers); + + ContainerReplicator replicator = + new DownloadAndImportReplicator(containerSet, + controller, + new SimpleContainerDownloader(conf, null), + new TarContainerPacker()); + + supervisor = new ReplicationSupervisor(containerSet, replicator, 10); + } + + private void replicateContainer(long counter) throws Exception { + timer.time(() -> { + final ReplicationTask replicationTask = + replicationTasks.get((int) counter); + supervisor.new TaskRunner(replicationTask).run(); + return null; + }); + } +} \ No newline at end of file diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index 1b03540019bc..d3c5ae6f8438 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -52,7 +52,8 @@ DatanodeBlockPutter.class, FollowerAppendLogEntryGenerator.class, ChunkManagerDiskWrite.class, - LeaderAppendLogEntryGenerator.class}, + LeaderAppendLogEntryGenerator.class, + ClosedContainerReplicator.class}, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true) public class Freon extends GenericCli {