diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java index e29670d7814c..cad1d0479249 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractDataStreamOutput.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Map; +import java.util.Objects; /** * This class is used for error handling methods. @@ -111,7 +112,7 @@ protected void handleRetry(IOException exception, RetryPolicy retryPolicy) if (Thread.currentThread().isInterrupted()) { setExceptionAndThrow(exception); } - Preconditions.checkNotNull(action); + Objects.requireNonNull(action); Preconditions.checkArgument( action.action == RetryPolicy.RetryAction.RetryDecision.RETRY); if (action.delayMillis > 0) { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 1820416d32fa..8ca70de81684 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -16,17 +16,13 @@ * limitations under the License. */ -/** - * This class maintains the map of the commitIndexes to be watched for - * successful replication in the datanodes in a given pipeline. It also releases - * the buffers associated with the user data back to {@Link BufferPool} once - * minimum replication criteria is achieved during an ozone key write. - */ package org.apache.hadoop.hdds.scm.storage; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.MemoizedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +30,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -49,13 +48,15 @@ public class StreamCommitWatcher { LoggerFactory.getLogger(StreamCommitWatcher.class); private Map> commitIndexMap; - private List bufferList; + private final List bufferList; // total data which has been successfully flushed and acknowledged // by all servers private long totalAckDataLength; + private final ConcurrentMap> + replies = new ConcurrentHashMap<>(); - private XceiverClientSpi xceiverClient; + private final XceiverClientSpi xceiverClient; public StreamCommitWatcher(XceiverClientSpi xceiverClient, List bufferList) { @@ -130,16 +131,24 @@ public XceiverClientReply streamWatchOnLastIndex() */ public XceiverClientReply streamWatchForCommit(long commitIndex) throws IOException { - final long index; + final MemoizedSupplier> supplier + = JavaUtils.memoize(CompletableFuture::new); + final CompletableFuture f = replies.compute(commitIndex, + (key, value) -> value != null ? value : supplier.get()); + if (!supplier.isInitialized()) { + // future already exists + return f.join(); + } + try { XceiverClientReply reply = xceiverClient.watchForCommit(commitIndex); - if (reply == null) { - index = 0; - } else { - index = reply.getLogIndex(); - } - adjustBuffers(index); + f.complete(reply); + final CompletableFuture removed + = replies.remove(commitIndex); + Preconditions.checkState(removed == f); + + adjustBuffers(reply.getLogIndex()); return reply; } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java index 6ab5c0300978..43cdfcfc50d8 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyGenerator.java @@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; @@ -132,14 +133,13 @@ private void createKey(long counter) throws Exception { } private void createStreamKey(long counter) throws Exception { - final ReplicationConfig replicationConfig = ReplicationConfig - .fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS, - HddsProtos.ReplicationFactor.THREE); + final ReplicationConfig conf = ReplicationConfig.fromProtoTypeAndFactor( + ReplicationType.RATIS, ReplicationFactor.THREE); final String key = generateObjectName(counter); timer.time(() -> { - try (OzoneDataStreamOutput stream = bucket - .createStreamKey(key, keySize, replicationConfig, metadata)) { + try (OzoneDataStreamOutput stream = bucket.createStreamKey( + key, keySize, conf, metadata)) { contentGenerator.write(stream); } return null;