diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java index abeaf03c1667..275321da844b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java @@ -28,18 +28,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .CopyContainerRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .CopyContainerResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto - .IntraDatanodeProtocolServiceGrpc; -import org.apache.hadoop.hdds.protocol.datanode.proto - .IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub; - -import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc; +import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.ozone.OzoneConsts; + +import com.google.common.base.Preconditions; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; @@ -51,7 +47,7 @@ /** * Client to read container data from gRPC. */ -public class GrpcReplicationClient { +public class GrpcReplicationClient implements AutoCloseable{ private static final Logger LOG = LoggerFactory.getLogger(GrpcReplicationClient.class); @@ -118,6 +114,11 @@ public void shutdown() { } } + @Override + public void close() throws Exception { + shutdown(); + } + /** * gRPC stream observer to CompletableFuture adapter. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java index 9d7b5516a5c3..c1445bde58ca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java @@ -22,6 +22,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -32,6 +34,7 @@ import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.ozone.OzoneConfigKeys; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +56,7 @@ public class SimpleContainerDownloader implements ContainerDownloader { public SimpleContainerDownloader(ConfigurationSource conf, X509Certificate caCert) { + String workDirString = conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR); @@ -71,14 +75,18 @@ public CompletableFuture getContainerDataFromReplicas(long containerId, List sourceDatanodes) { CompletableFuture result = null; - for (DatanodeDetails datanode : sourceDatanodes) { + + //There is a chance for the download is successful but import is failed, + //due to data corruption. We need a random selected datanode to have a + //chance to succeed next time. + final ArrayList shuffledDatanodes = + new ArrayList<>(sourceDatanodes); + Collections.shuffle(shuffledDatanodes); + + for (DatanodeDetails datanode : shuffledDatanodes) { try { if (result == null) { - GrpcReplicationClient grpcReplicationClient = - new GrpcReplicationClient(datanode.getIpAddress(), - datanode.getPort(Name.STANDALONE).getValue(), - workingDirectory, securityConfig, caCert); - result = grpcReplicationClient.download(containerId); + result = downloadContainer(containerId, datanode); } else { result = result.thenApply(CompletableFuture::completedFuture) .exceptionally(t -> { @@ -101,12 +109,26 @@ public CompletableFuture getContainerDataFromReplicas(long containerId, "Container %s download from datanode %s was unsuccessful. " + "Trying the next datanode", containerId, datanode), ex); } - } return result; } + @VisibleForTesting + protected CompletableFuture downloadContainer( + long containerId, + DatanodeDetails datanode + ) throws Exception { + CompletableFuture result; + try (GrpcReplicationClient grpcReplicationClient = + new GrpcReplicationClient(datanode.getIpAddress(), + datanode.getPort(Name.STANDALONE).getValue(), + workingDirectory, securityConfig, caCert)) { + result = grpcReplicationClient.download(containerId); + } + return result; + } + @Override public void close() { // noop diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java new file mode 100644 index 000000000000..3f0f5a9faaeb --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.replication; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Test container downloader. + */ +public class TestSimpleContainerDownloader { + + private static final String SUCCESS_PATH = "downloaded"; + + /** + * Test if different datanode is used for each download attempt. + */ + @Test(timeout = 1000L) + public void testRandomSelection() + throws ExecutionException, InterruptedException { + + //GIVEN + final List datanodes = createDatanodes(); + + SimpleContainerDownloader downloader = + new SimpleContainerDownloader(new OzoneConfiguration(), null) { + + @Override + protected CompletableFuture downloadContainer( + long containerId, DatanodeDetails datanode + ) throws Exception { + //download is always successful. + return CompletableFuture + .completedFuture(Paths.get(datanode.getUuidString())); + } + }; + + //WHEN executed, THEN at least once the second datanode should be returned. + for (int i = 0; i < 10000; i++) { + Path path = downloader.getContainerDataFromReplicas(1L, datanodes).get(); + if (path.toString().equals(datanodes.get(1).getUuidString())) { + return; + } + } + + //there is 1/2^10_000 chance for false positive, which is practically 0. + Assert.fail( + "Datanodes are selected 10000 times but second datanode was never " + + "used."); + } + + private List createDatanodes() { + List datanodes = new ArrayList<>(); + datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); + datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); + return datanodes; + } +} \ No newline at end of file