diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index 0bc6c3ae223a..c313477e376f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -260,6 +260,7 @@ public ContainerDataProto getProtoBufMessage() { builder.setContainerID(this.getContainerID()); builder.setContainerPath(this.getContainerPath()); builder.setState(this.getState()); + builder.setBlockCount(this.getBlockCount()); for (Map.Entry entry : getMetadata().entrySet()) { ContainerProtos.KeyValue.Builder keyValBuilder = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index 1af9c882a110..3a0f214bf85b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -181,7 +181,8 @@ public OzoneContainer( controller, conf.getObject(ReplicationConfig.class), secConf, - certClient); + certClient, + hddsDispatcher); readChannel = new XceiverServerGrpc( datanodeDetails, config, hddsDispatcher, certClient); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java index 023b251a524f..5d4f1f55bb39 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java @@ -22,13 +22,31 @@ import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkVersion; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc; import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub; import org.apache.hadoop.hdds.security.x509.SecurityConfig; @@ -37,6 +55,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import com.google.common.base.Preconditions; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; @@ -110,6 +129,108 @@ public CompletableFuture download(long containerId) { return response; } + public CompletableFuture> listBlock( + long containerId, String datanodeUUID, int count) { + return listBlock(containerId, datanodeUUID, -1, count); + } + + public CompletableFuture> listBlock( + long containerId, String datanodeUUID, long start, int count) { + + ListBlockRequestProto.Builder requestBuilder = + ListBlockRequestProto.newBuilder() + .setCount(count); + if (start >= 0) { + requestBuilder.setStartLocalID(start); + } + ListBlockRequestProto request = requestBuilder.build(); + + ContainerCommandRequestProto command = + ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.ListBlock) + .setContainerID(containerId) + .setDatanodeUuid(datanodeUUID) + .setListBlock(request) + .build(); + + CompletableFuture> future = + new CompletableFuture<>(); + + client.send(command, new ContainerCommandObserver(containerId, future)); + + return future.thenApply(responses -> responses.stream() + .filter(r -> r.getCmdType() == Type.ListBlock) + .flatMap(r -> r.getListBlock().getBlockDataList().stream()) + .collect(Collectors.toList())); + } + + public CompletableFuture> readContainer( + long containerId, String datanodeUUID) { + + ContainerCommandRequestProto command = + ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.ReadContainer) + .setContainerID(containerId) + .setDatanodeUuid(datanodeUUID) + .setReadContainer(ReadContainerRequestProto.newBuilder().build()) + .build(); + + CompletableFuture> future = + new CompletableFuture<>(); + + client.send(command, new ContainerCommandObserver(containerId, future)); + + return future.thenApply(responses -> responses.stream() + .filter(r -> r.getCmdType() == Type.ReadContainer) + .findFirst() + .map(r -> r.getReadContainer().getContainerData())); + } + + public CompletableFuture> readChunk(long containerId, + String datanodeUUID, DatanodeBlockID blockID, ChunkInfo chunkInfo) { + + ReadChunkRequestProto request = + ReadChunkRequestProto.newBuilder() + .setBlockID(blockID) + .setChunkData(chunkInfo) + .setReadChunkVersion(ReadChunkVersion.V0) + .build(); + + ContainerCommandRequestProto command = + ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.ReadChunk) + .setContainerID(containerId) + .setDatanodeUuid(datanodeUUID) + .setReadChunk(request) + .build(); + + CompletableFuture> future = + new CompletableFuture<>(); + + client.send(command, new ContainerCommandObserver(containerId, future)); + + return future.thenApply(responses -> responses.stream() + .filter(r -> r.getCmdType() == Type.ReadChunk) + .findFirst() + .map(r -> r.getReadChunk().getData().asReadOnlyByteBuffer())); + } + + public CompletableFuture writeChunk(DatanodeBlockID blockID, + ChunkInfo chunkInfo, ByteBuffer chunkData) { + + WriteChunkRequestProto request = WriteChunkRequestProto.newBuilder() + .setBlockID(blockID) + .setChunkData(chunkInfo) + .setData(ByteString.copyFrom(chunkData)) + .build(); + + CompletableFuture future = new CompletableFuture<>(); + + client.push(request, new PushChunkObserver(future)); + + return future; + } + private Path getWorkingDirectory() { return workingDirectory; } @@ -214,4 +335,70 @@ private void deleteOutputOnFailure() { } } } + + /** + * gRPC stream observer to CompletableFuture adapter. + */ + public static class ContainerCommandObserver + implements StreamObserver { + + private final List result; + private final CompletableFuture> future; + private final long containerId; + + public ContainerCommandObserver(long containerId, + CompletableFuture> future) { + this.future = future; + this.containerId = containerId; + this.result = new ArrayList<>(); + } + + @Override + public void onNext(ContainerCommandResponseProto response) { + result.add(response); + } + + @Override + public void onError(Throwable throwable) { + LOG.error("Command to container {} failed", containerId, throwable); + future.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + future.complete(result); + } + + } + + /** + * gRPC stream observer to CompletableFuture adapter. + */ + public static class PushChunkObserver + implements StreamObserver { + + private final CompletableFuture future; + private long count; + + PushChunkObserver(CompletableFuture future) { + this.future = future; + this.count = 0; + } + + @Override + public void onNext(WriteChunkResponseProto response) { + count++; + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onCompleted() { + future.complete(count); + } + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java index 60897a5db897..66b9f9b1d45e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java @@ -20,10 +20,15 @@ import java.io.IOException; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc; +import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +46,12 @@ public class GrpcReplicationService extends private final ContainerReplicationSource source; - public GrpcReplicationService(ContainerReplicationSource source) { + private final HddsDispatcher dispatcher; + + public GrpcReplicationService(ContainerReplicationSource source, + HddsDispatcher dispatcher) { this.source = source; + this.dispatcher = dispatcher; } @Override @@ -60,4 +69,28 @@ public void download(CopyContainerRequestProto request, } } + @Override + public void send(ContainerCommandRequestProto request, + StreamObserver observer) { + observer.onNext(dispatcher.dispatch(request, null)); + observer.onCompleted(); + } + + @Override + public void push(WriteChunkRequestProto request, + StreamObserver observer) { + long containerID = request.getBlockID().getContainerID(); + if (request.hasData()) { + String chunkName = request.getChunkData().getChunkName(); + LOG.debug("Received chunk {} of container {}", chunkName, containerID); + // TODO: Store chunk data for reconstruction + // ByteBuffer data = request.getData().asReadOnlyByteBuffer(); + } else { + LOG.info("Starting to reconstruct container {}", containerID); + // TODO: Start to reconstruct container + } + observer.onNext(WriteChunkResponseProto.newBuilder().build()); + observer.onCompleted(); + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index bf8d6f102565..d45f8d20ecc2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor; import org.apache.hadoop.hdds.utils.HAUtils; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.ratis.thirdparty.io.grpc.Server; @@ -59,17 +60,21 @@ public class ReplicationServer { private ContainerController controller; + private final HddsDispatcher dispatcher; + private int port; public ReplicationServer( ContainerController controller, ReplicationConfig replicationConfig, SecurityConfig secConf, - CertificateClient caClient + CertificateClient caClient, + HddsDispatcher dispatcher ) { this.secConf = secConf; this.caClient = caClient; this.controller = controller; + this.dispatcher = dispatcher; this.port = replicationConfig.getPort(); init(); } @@ -77,9 +82,11 @@ public ReplicationServer( public void init() { NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) - .addService(ServerInterceptors.intercept(new GrpcReplicationService( - new OnDemandContainerReplicationSource(controller) - ), new GrpcServerInterceptor())); + .addService(ServerInterceptors.intercept( + new GrpcReplicationService( + new OnDemandContainerReplicationSource(controller), + dispatcher), + new GrpcServerInterceptor())); if (secConf.isSecurityEnabled()) { try { diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 3f6e9996d272..45fc50261664 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -495,4 +495,8 @@ service XceiverClientProtocolService { service IntraDatanodeProtocolService { // An intradatanode service to copy the raw container data between nodes rpc download (CopyContainerRequestProto) returns (stream CopyContainerResponseProto); + // Send read only commands to get status and data from CoordinatorDN to HealthyDN. + rpc send(ContainerCommandRequestProto) returns (stream ContainerCommandResponseProto); + // Push recovered chunks from CoordinatorDN to TargetDN. + rpc push(WriteChunkRequestProto) returns (stream WriteChunkResponseProto); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 556a7487c5c3..8c747309778f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.container.replication.GrpcReplicationClient; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.recon.ReconServer; import org.apache.hadoop.security.authentication.client.AuthenticationException; @@ -186,6 +187,18 @@ void waitForPipelineTobeReady(HddsProtos.ReplicationFactor factor, */ OzoneClient getRpcClient() throws IOException; + /** + * Creates a {@link GrpcReplicationClient} to access the replication + * service of datanode i. + * + * @param datanode the datanode which the replication client will connect to + * @return a {@link GrpcReplicationClient} connected to datanode i, + * should call close() after finish. + * @throws IOException + */ + GrpcReplicationClient getReplicationClient(DatanodeDetails datanode) + throws IOException; + /** * Returns StorageContainerLocationClient to communicate with * {@link StorageContainerManager} associated with the MiniOzoneCluster. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 6920dd4df954..eabcefc5e982 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.client.OzoneClient; @@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; +import org.apache.hadoop.ozone.container.replication.GrpcReplicationClient; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMStorage; @@ -329,6 +331,16 @@ public OzoneClient getRpcClient() throws IOException { return OzoneClientFactory.getRpcClient(conf); } + @Override + public GrpcReplicationClient getReplicationClient(DatanodeDetails datanode) + throws IOException { + String workdir = conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR, + System.getProperty("java.io.tmpdir")); + return new GrpcReplicationClient(datanode.getIpAddress(), + datanode.getPort(DatanodeDetails.Port.Name.REPLICATION).getValue(), + Paths.get(workdir), new SecurityConfig(conf), caClient); + } + /** * Returns an RPC proxy connected to this cluster's StorageContainerManager * for accessing container location information. Callers take ownership of diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java new file mode 100644 index 000000000000..12d093a97650 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.replication; + +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +/** + * Tests for {@link GrpcReplicationClient} and {@link GrpcReplicationService}. + */ +public class TestReplicationService { + + private static MiniOzoneCluster cluster; + private static OzoneManager om; + private static StorageContainerManager scm; + private static OzoneClient client; + private static ObjectStore store; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static final String SCM_ID = UUID.randomUUID().toString(); + private static final String CLUSTER_ID = UUID.randomUUID().toString(); + private static final int NUM_KEYS = 2; + private static final byte[] VALUE = + UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + private static long containerID; + + @BeforeAll + public static void init() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1); + conf.setBoolean(OzoneConfigKeys.OZONE_ACL_ENABLED, true); + conf.set(OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS, + OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE); + startCluster(conf); + prepareData(NUM_KEYS); + } + + @AfterAll + public static void stop() throws IOException { + stopCluster(); + } + + @Test + public void testReadContainer() throws Exception { + DatanodeDetails dn = cluster.getHddsDatanodes().get(0).getDatanodeDetails(); + GrpcReplicationClient rc = cluster.getReplicationClient(dn); + Optional maybeProto = + rc.readContainer(containerID, dn.getUuidString()).get(); + Assertions.assertTrue(maybeProto.isPresent()); + ContainerDataProto proto = maybeProto.get(); + Assertions.assertEquals(containerID, proto.getContainerID()); + Assertions.assertEquals(NUM_KEYS * VALUE.length, proto.getBytesUsed()); + Assertions.assertEquals(NUM_KEYS, proto.getBlockCount()); + rc.close(); + } + + @Test + public void testListBlock() throws Exception { + DatanodeDetails dn = cluster.getHddsDatanodes().get(0).getDatanodeDetails(); + GrpcReplicationClient rc = cluster.getReplicationClient(dn); + List blocks = + rc.listBlock(containerID, dn.getUuidString(), NUM_KEYS + 1).get(); + Assertions.assertEquals(NUM_KEYS, blocks.size()); + Assertions.assertEquals(VALUE.length, blocks.get(0).getSize()); + rc.close(); + } + + @Test + public void testReadChunk() throws Exception { + DatanodeDetails dn = cluster.getHddsDatanodes().get(0).getDatanodeDetails(); + GrpcReplicationClient rc = cluster.getReplicationClient(dn); + List blocks = + rc.listBlock(containerID, dn.getUuidString(), NUM_KEYS).get(); + + for (BlockData block : blocks) { + Assertions.assertEquals(1, block.getChunksCount()); + Optional maybeChunk = rc.readChunk(containerID, + dn.getUuidString(), block.getBlockID(), block.getChunks(0)).get(); + Assertions.assertTrue(maybeChunk.isPresent()); + ByteBuffer chunk = maybeChunk.get(); + byte[] chunkData = new byte[chunk.remaining()]; + chunk.get(chunkData); + Assertions.assertArrayEquals(VALUE, chunkData); + } + rc.close(); + } + + public static void startCluster(OzoneConfiguration conf) throws Exception { + // Reduce long wait time in MiniOzoneClusterImpl#waitForHddsDatanodesStop + // for testZReadKeyWithUnhealthyContainerReplica. + conf.set("ozone.scm.stale.node.interval", "10s"); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3) + .setTotalPipelineNumLimit(10) + .setScmId(SCM_ID) + .setClusterId(CLUSTER_ID) + .build(); + cluster.waitForClusterToBeReady(); + om = cluster.getOzoneManager(); + scm = cluster.getStorageContainerManager(); + client = OzoneClientFactory.getRpcClient(conf); + store = client.getObjectStore(); + storageContainerLocationClient = + cluster.getStorageContainerLocationClient(); + } + + public static void prepareData(int numKeys) throws Exception { + final String volumeName = UUID.randomUUID().toString(); + final String bucketName = UUID.randomUUID().toString(); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + final ReplicationConfig repConfig = ReplicationConfig + .fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.THREE); + for (int i = 0; i < numKeys; i++) { + final String keyName = UUID.randomUUID().toString(); + try (OutputStream out = bucket.createKey( + keyName, VALUE.length, repConfig, new HashMap<>())) { + out.write(VALUE); + } + } + Optional maybeContainerID = scm.getContainerManager() + .getContainerIDs().stream().findAny().map(ContainerID::getId); + Assertions.assertTrue(maybeContainerID.isPresent()); + containerID = maybeContainerID.get(); + } + + public static void stopCluster() throws IOException { + if (client != null) { + client.close(); + } + + if (storageContainerLocationClient != null) { + storageContainerLocationClient.close(); + } + + if (cluster != null) { + cluster.shutdown(); + } + } + +}