diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 1cfb771dc957..05c93af2620b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -89,6 +89,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat; import org.apache.ratis.util.TaskQueue; import org.apache.ratis.util.function.CheckedSupplier; +import org.apache.ratis.util.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -519,6 +520,22 @@ public CompletableFuture stream(RaftClientRequest request) { }, executor); } + public CompletableFuture link(DataStream stream, LogEntryProto entry) { + return CompletableFuture.supplyAsync(() -> { + if (stream == null) { + return JavaUtils.completeExceptionally( + new IllegalStateException("DataStream is null")); + } + if (stream.getDataChannel().isOpen()) { + return JavaUtils.completeExceptionally( + new IllegalStateException( + "DataStream: " + stream + " is not closed properly")); + } else { + return CompletableFuture.completedFuture(null); + } + }, executor); + } + private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { int hash = Objects.hashCode(req.getBlockID()); if (hash == Integer.MIN_VALUE) {