Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public ContainerDataProto getProtoBufMessage() {
builder.setContainerID(this.getContainerID());
builder.setContainerPath(this.getContainerPath());
builder.setState(this.getState());
builder.setBlockCount(this.getBlockCount());

for (Map.Entry<String, String> entry : getMetadata().entrySet()) {
ContainerProtos.KeyValue.Builder keyValBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public OzoneContainer(
controller,
conf.getObject(ReplicationConfig.class),
secConf,
certClient);
certClient,
hddsDispatcher);

readChannel = new XceiverServerGrpc(
datanodeDetails, config, hddsDispatcher, certClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,31 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ListBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkVersion;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
Expand All @@ -37,6 +55,7 @@
import org.apache.hadoop.ozone.OzoneConsts;

import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
Expand Down Expand Up @@ -110,6 +129,108 @@ public CompletableFuture<Path> download(long containerId) {
return response;
}

public CompletableFuture<List<BlockData>> listBlock(
long containerId, String datanodeUUID, int count) {
return listBlock(containerId, datanodeUUID, -1, count);
}

public CompletableFuture<List<BlockData>> listBlock(
long containerId, String datanodeUUID, long start, int count) {

ListBlockRequestProto.Builder requestBuilder =
ListBlockRequestProto.newBuilder()
.setCount(count);
if (start >= 0) {
requestBuilder.setStartLocalID(start);
}
ListBlockRequestProto request = requestBuilder.build();

ContainerCommandRequestProto command =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.ListBlock)
.setContainerID(containerId)
.setDatanodeUuid(datanodeUUID)
.setListBlock(request)
.build();

CompletableFuture<List<ContainerCommandResponseProto>> future =
new CompletableFuture<>();

client.send(command, new ContainerCommandObserver(containerId, future));

return future.thenApply(responses -> responses.stream()
.filter(r -> r.getCmdType() == Type.ListBlock)
.flatMap(r -> r.getListBlock().getBlockDataList().stream())
.collect(Collectors.toList()));
}

public CompletableFuture<Optional<ContainerDataProto>> readContainer(
long containerId, String datanodeUUID) {

ContainerCommandRequestProto command =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.ReadContainer)
.setContainerID(containerId)
.setDatanodeUuid(datanodeUUID)
.setReadContainer(ReadContainerRequestProto.newBuilder().build())
.build();

CompletableFuture<List<ContainerCommandResponseProto>> future =
new CompletableFuture<>();

client.send(command, new ContainerCommandObserver(containerId, future));

return future.thenApply(responses -> responses.stream()
.filter(r -> r.getCmdType() == Type.ReadContainer)
.findFirst()
.map(r -> r.getReadContainer().getContainerData()));
}

public CompletableFuture<Optional<ByteBuffer>> readChunk(long containerId,
String datanodeUUID, DatanodeBlockID blockID, ChunkInfo chunkInfo) {

ReadChunkRequestProto request =
ReadChunkRequestProto.newBuilder()
.setBlockID(blockID)
.setChunkData(chunkInfo)
.setReadChunkVersion(ReadChunkVersion.V0)
.build();

ContainerCommandRequestProto command =
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.ReadChunk)
.setContainerID(containerId)
.setDatanodeUuid(datanodeUUID)
.setReadChunk(request)
.build();

CompletableFuture<List<ContainerCommandResponseProto>> future =
new CompletableFuture<>();

client.send(command, new ContainerCommandObserver(containerId, future));

return future.thenApply(responses -> responses.stream()
.filter(r -> r.getCmdType() == Type.ReadChunk)
.findFirst()
.map(r -> r.getReadChunk().getData().asReadOnlyByteBuffer()));
}

public CompletableFuture<Long> writeChunk(DatanodeBlockID blockID,
ChunkInfo chunkInfo, ByteBuffer chunkData) {

WriteChunkRequestProto request = WriteChunkRequestProto.newBuilder()
.setBlockID(blockID)
.setChunkData(chunkInfo)
.setData(ByteString.copyFrom(chunkData))
.build();

CompletableFuture<Long> future = new CompletableFuture<>();

client.push(request, new PushChunkObserver(future));

return future;
}

private Path getWorkingDirectory() {
return workingDirectory;
}
Expand Down Expand Up @@ -214,4 +335,70 @@ private void deleteOutputOnFailure() {
}
}
}

/**
* gRPC stream observer to CompletableFuture adapter.
*/
public static class ContainerCommandObserver
implements StreamObserver<ContainerCommandResponseProto> {

private final List<ContainerCommandResponseProto> result;
private final CompletableFuture<List<ContainerCommandResponseProto>> future;
private final long containerId;

public ContainerCommandObserver(long containerId,
CompletableFuture<List<ContainerCommandResponseProto>> future) {
this.future = future;
this.containerId = containerId;
this.result = new ArrayList<>();
}

@Override
public void onNext(ContainerCommandResponseProto response) {
result.add(response);
}

@Override
public void onError(Throwable throwable) {
LOG.error("Command to container {} failed", containerId, throwable);
future.completeExceptionally(throwable);
}

@Override
public void onCompleted() {
future.complete(result);
}

}

/**
* gRPC stream observer to CompletableFuture adapter.
*/
public static class PushChunkObserver
implements StreamObserver<WriteChunkResponseProto> {

private final CompletableFuture<Long> future;
private long count;

PushChunkObserver(CompletableFuture<Long> future) {
this.future = future;
this.count = 0;
}

@Override
public void onNext(WriteChunkResponseProto response) {
count++;
}

@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}

@Override
public void onCompleted() {
future.complete(count);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@

import java.io.IOException;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;

import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,8 +46,12 @@ public class GrpcReplicationService extends

private final ContainerReplicationSource source;

public GrpcReplicationService(ContainerReplicationSource source) {
private final HddsDispatcher dispatcher;

public GrpcReplicationService(ContainerReplicationSource source,
HddsDispatcher dispatcher) {
this.source = source;
this.dispatcher = dispatcher;
}

@Override
Expand All @@ -60,4 +69,28 @@ public void download(CopyContainerRequestProto request,
}
}

@Override
public void send(ContainerCommandRequestProto request,
StreamObserver<ContainerCommandResponseProto> observer) {
observer.onNext(dispatcher.dispatch(request, null));
observer.onCompleted();
}

@Override
public void push(WriteChunkRequestProto request,
StreamObserver<WriteChunkResponseProto> observer) {
long containerID = request.getBlockID().getContainerID();
if (request.hasData()) {
String chunkName = request.getChunkData().getChunkName();
LOG.debug("Received chunk {} of container {}", chunkName, containerID);
// TODO: Store chunk data for reconstruction
// ByteBuffer data = request.getData().asReadOnlyByteBuffer();
} else {
LOG.info("Starting to reconstruct container {}", containerID);
// TODO: Start to reconstruct container
}
observer.onNext(WriteChunkResponseProto.newBuilder().build());
observer.onCompleted();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;

import org.apache.ratis.thirdparty.io.grpc.Server;
Expand Down Expand Up @@ -59,27 +60,33 @@ public class ReplicationServer {

private ContainerController controller;

private final HddsDispatcher dispatcher;

private int port;

public ReplicationServer(
ContainerController controller,
ReplicationConfig replicationConfig,
SecurityConfig secConf,
CertificateClient caClient
CertificateClient caClient,
HddsDispatcher dispatcher
) {
this.secConf = secConf;
this.caClient = caClient;
this.controller = controller;
this.dispatcher = dispatcher;
this.port = replicationConfig.getPort();
init();
}

public void init() {
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.addService(ServerInterceptors.intercept(new GrpcReplicationService(
new OnDemandContainerReplicationSource(controller)
), new GrpcServerInterceptor()));
.addService(ServerInterceptors.intercept(
new GrpcReplicationService(
new OnDemandContainerReplicationSource(controller),
dispatcher),
new GrpcServerInterceptor()));

if (secConf.isSecurityEnabled()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,4 +495,8 @@ service XceiverClientProtocolService {
service IntraDatanodeProtocolService {
// An intradatanode service to copy the raw container data between nodes
rpc download (CopyContainerRequestProto) returns (stream CopyContainerResponseProto);
// Send read only commands to get status and data from CoordinatorDN to HealthyDN.
rpc send(ContainerCommandRequestProto) returns (stream ContainerCommandResponseProto);
// Push recovered chunks from CoordinatorDN to TargetDN.
rpc push(WriteChunkRequestProto) returns (stream WriteChunkResponseProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationClient;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.recon.ReconServer;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
Expand Down Expand Up @@ -186,6 +187,18 @@ void waitForPipelineTobeReady(HddsProtos.ReplicationFactor factor,
*/
OzoneClient getRpcClient() throws IOException;

/**
* Creates a {@link GrpcReplicationClient} to access the replication
* service of datanode i.
*
* @param datanode the datanode which the replication client will connect to
* @return a {@link GrpcReplicationClient} connected to datanode i,
* should call close() after finish.
* @throws IOException
*/
GrpcReplicationClient getReplicationClient(DatanodeDetails datanode)
throws IOException;

/**
* Returns StorageContainerLocationClient to communicate with
* {@link StorageContainerManager} associated with the MiniOzoneCluster.
Expand Down
Loading