diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java index c1445bde58ca..5d8a86bc930a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.container.replication; -import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.security.cert.X509Certificate; @@ -26,7 +25,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -35,6 +33,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import com.google.common.annotations.VisibleForTesting; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +53,10 @@ public class SimpleContainerDownloader implements ContainerDownloader { private final SecurityConfig securityConfig; private final X509Certificate caCert; - public SimpleContainerDownloader(ConfigurationSource conf, - X509Certificate caCert) { + public SimpleContainerDownloader( + ConfigurationSource conf, + X509Certificate caCert + ) { String workDirString = conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR); @@ -71,38 +72,31 @@ public SimpleContainerDownloader(ConfigurationSource conf, } @Override - public CompletableFuture getContainerDataFromReplicas(long containerId, - List sourceDatanodes) { + public CompletableFuture getContainerDataFromReplicas( + long containerId, + List sourceDatanodes + ) { CompletableFuture result = null; - //There is a chance for the download is successful but import is failed, - //due to data corruption. We need a random selected datanode to have a - //chance to succeed next time. - final ArrayList shuffledDatanodes = - new ArrayList<>(sourceDatanodes); - Collections.shuffle(shuffledDatanodes); + final List shuffledDatanodes = + shuffleDatanodes(sourceDatanodes); for (DatanodeDetails datanode : shuffledDatanodes) { try { if (result == null) { result = downloadContainer(containerId, datanode); } else { - result = result.thenApply(CompletableFuture::completedFuture) - .exceptionally(t -> { - LOG.error("Error on replicating container: " + containerId, t); - try { - GrpcReplicationClient grpcReplicationClient = - new GrpcReplicationClient(datanode.getIpAddress(), - datanode.getPort(Name.STANDALONE).getValue(), - workingDirectory, securityConfig, caCert); - return grpcReplicationClient.download(containerId); - } catch (IOException e) { - LOG.error("Error on replicating container: " + containerId, - t); - return null; - } - }).thenCompose(Function.identity()); + result = result.exceptionally(t -> { + LOG.error("Error on replicating container: " + containerId, t); + try { + return downloadContainer(containerId, datanode).join(); + } catch (Exception e) { + LOG.error("Error on replicating container: " + containerId, + e); + return null; + } + }); } } catch (Exception ex) { LOG.error(String.format( @@ -114,18 +108,42 @@ public CompletableFuture getContainerDataFromReplicas(long containerId, } + //There is a chance for the download is successful but import is failed, + //due to data corruption. We need a random selected datanode to have a + //chance to succeed next time. + @NotNull + protected List shuffleDatanodes( + List sourceDatanodes + ) { + + final ArrayList shuffledDatanodes = + new ArrayList<>(sourceDatanodes); + + Collections.shuffle(shuffledDatanodes); + + return shuffledDatanodes; + } + @VisibleForTesting protected CompletableFuture downloadContainer( long containerId, DatanodeDetails datanode ) throws Exception { CompletableFuture result; - try (GrpcReplicationClient grpcReplicationClient = + GrpcReplicationClient grpcReplicationClient = new GrpcReplicationClient(datanode.getIpAddress(), datanode.getPort(Name.STANDALONE).getValue(), - workingDirectory, securityConfig, caCert)) { - result = grpcReplicationClient.download(containerId); - } + workingDirectory, securityConfig, caCert); + result = grpcReplicationClient.download(containerId) + .thenApply(r -> { + try { + grpcReplicationClient.close(); + } catch (Exception e) { + LOG.error("Couldn't close Grpc replication client", e); + } + return r; + }); + return result; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java index 3f0f5a9faaeb..f29b1579198a 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java @@ -21,10 +21,13 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; @@ -32,17 +35,74 @@ import org.junit.Assert; import org.junit.Test; -/** - * Test container downloader. +/* + * Test SimpleContainerDownloader. */ public class TestSimpleContainerDownloader { private static final String SUCCESS_PATH = "downloaded"; + @Test + public void testGetContainerDataFromReplicasHappyPath() throws Exception { + + //GIVEN + List datanodes = createDatanodes(); + + SimpleContainerDownloader downloader = + createDownloaderWithPredefinedFailures(true); + + //WHEN + final Path result = + downloader.getContainerDataFromReplicas(1L, datanodes) + .get(1L, TimeUnit.SECONDS); + + //THEN + Assert.assertEquals(datanodes.get(0).getUuidString(), result.toString()); + } + + @Test + public void testGetContainerDataFromReplicasDirectFailure() + throws Exception { + + //GIVEN + List datanodes = createDatanodes(); + + SimpleContainerDownloader downloader = + createDownloaderWithPredefinedFailures(true, datanodes.get(0)); + + //WHEN + final Path result = + downloader.getContainerDataFromReplicas(1L, datanodes) + .get(1L, TimeUnit.SECONDS); + + //THEN + //first datanode is failed, second worked + Assert.assertEquals(datanodes.get(1).getUuidString(), result.toString()); + } + + @Test + public void testGetContainerDataFromReplicasAsyncFailure() throws Exception { + + //GIVEN + List datanodes = createDatanodes(); + + SimpleContainerDownloader downloader = + createDownloaderWithPredefinedFailures(false, datanodes.get(0)); + + //WHEN + final Path result = + downloader.getContainerDataFromReplicas(1L, datanodes) + .get(1L, TimeUnit.SECONDS); + + //THEN + //first datanode is failed, second worked + Assert.assertEquals(datanodes.get(1).getUuidString(), result.toString()); + } + /** * Test if different datanode is used for each download attempt. */ - @Test(timeout = 1000L) + @Test(timeout = 10_000L) public void testRandomSelection() throws ExecutionException, InterruptedException { @@ -62,24 +122,79 @@ protected CompletableFuture downloadContainer( } }; - //WHEN executed, THEN at least once the second datanode should be returned. + //WHEN executed, THEN at least once the second datanode should be + //returned. for (int i = 0; i < 10000; i++) { - Path path = downloader.getContainerDataFromReplicas(1L, datanodes).get(); + Path path = + downloader.getContainerDataFromReplicas(1L, datanodes).get(); if (path.toString().equals(datanodes.get(1).getUuidString())) { return; } } - //there is 1/2^10_000 chance for false positive, which is practically 0. + //there is 1/3^10_000 chance for false positive, which is practically 0. Assert.fail( "Datanodes are selected 10000 times but second datanode was never " + "used."); } + /** + * Creates downloader which fails with datanodes in the arguments. + * + * @param directException if false the exception will be wrapped in the + * returning future. + */ + private SimpleContainerDownloader createDownloaderWithPredefinedFailures( + boolean directException, + DatanodeDetails... failedDatanodes + ) { + + ConfigurationSource conf = new OzoneConfiguration(); + + final List datanodes = + Arrays.asList(failedDatanodes); + + return new SimpleContainerDownloader(conf, null) { + + //for retry testing we use predictable list of datanodes. + @Override + protected List shuffleDatanodes( + List sourceDatanodes + ) { + //turn off randomization + return sourceDatanodes; + } + + @Override + protected CompletableFuture downloadContainer( + long containerId, + DatanodeDetails datanode + ) throws Exception { + + if (datanodes.contains(datanode)) { + if (directException) { + throw new RuntimeException("Unavailable datanode"); + } else { + return CompletableFuture.supplyAsync(() -> { + throw new RuntimeException("Unavailable datanode"); + }); + } + } else { + + //path includes the dn id to make it possible to assert. + return CompletableFuture.completedFuture( + Paths.get(datanode.getUuidString())); + } + + } + }; + } + private List createDatanodes() { List datanodes = new ArrayList<>(); datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); + datanodes.add(MockDatanodeDetails.randomDatanodeDetails()); return datanodes; } } \ No newline at end of file