Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Map;
import java.util.Objects;

/**
* This class is used for error handling methods.
Expand Down Expand Up @@ -111,7 +112,7 @@ protected void handleRetry(IOException exception, RetryPolicy retryPolicy)
if (Thread.currentThread().isInterrupted()) {
setExceptionAndThrow(exception);
}
Preconditions.checkNotNull(action);
Objects.requireNonNull(action);
Preconditions.checkArgument(
action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
if (action.delayMillis > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,23 @@
* limitations under the License.
*/

/**
* This class maintains the map of the commitIndexes to be watched for
* successful replication in the datanodes in a given pipeline. It also releases
* the buffers associated with the user data back to {@Link BufferPool} once
* minimum replication criteria is achieved during an ozone key write.
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
Expand All @@ -49,13 +48,15 @@ public class StreamCommitWatcher {
LoggerFactory.getLogger(StreamCommitWatcher.class);

private Map<Long, List<StreamBuffer>> commitIndexMap;
private List<StreamBuffer> bufferList;
private final List<StreamBuffer> bufferList;

// total data which has been successfully flushed and acknowledged
// by all servers
private long totalAckDataLength;
private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>>
replies = new ConcurrentHashMap<>();

private XceiverClientSpi xceiverClient;
private final XceiverClientSpi xceiverClient;

public StreamCommitWatcher(XceiverClientSpi xceiverClient,
List<StreamBuffer> bufferList) {
Expand Down Expand Up @@ -130,16 +131,24 @@ public XceiverClientReply streamWatchOnLastIndex()
*/
public XceiverClientReply streamWatchForCommit(long commitIndex)
throws IOException {
final long index;
final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
= JavaUtils.memoize(CompletableFuture::new);
final CompletableFuture<XceiverClientReply> f = replies.compute(commitIndex,
(key, value) -> value != null ? value : supplier.get());
if (!supplier.isInitialized()) {
// future already exists
return f.join();
}

try {
XceiverClientReply reply =
xceiverClient.watchForCommit(commitIndex);
if (reply == null) {
index = 0;
} else {
index = reply.getLogIndex();
}
adjustBuffers(index);
f.complete(reply);
final CompletableFuture<XceiverClientReply> removed
= replies.remove(commitIndex);
Preconditions.checkState(removed == f);

adjustBuffers(reply.getLogIndex());
return reply;
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;

Expand Down Expand Up @@ -132,14 +133,13 @@ private void createKey(long counter) throws Exception {
}

private void createStreamKey(long counter) throws Exception {
final ReplicationConfig replicationConfig = ReplicationConfig
.fromProtoTypeAndFactor(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
final ReplicationConfig conf = ReplicationConfig.fromProtoTypeAndFactor(
ReplicationType.RATIS, ReplicationFactor.THREE);
final String key = generateObjectName(counter);

timer.time(() -> {
try (OzoneDataStreamOutput stream = bucket
.createStreamKey(key, keySize, replicationConfig, metadata)) {
try (OzoneDataStreamOutput stream = bucket.createStreamKey(
key, keySize, conf, metadata)) {
contentGenerator.write(stream);
}
return null;
Expand Down