Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
Expand Down Expand Up @@ -115,6 +116,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final StreamCommitWatcher commitWatcher;
private final AtomicReference<CompletableFuture<
ContainerCommandResponseProto>> putBlockFuture
= new AtomicReference<>(CompletableFuture.completedFuture(null));

private final List<DatanodeDetails> failedServers;
private final Checksum checksum;
Expand Down Expand Up @@ -381,8 +385,7 @@ private void watchForCommit(boolean bufferFull) throws IOException {
* @param force true if no data was written since most recent putBlock and
* stream is being closed
*/
private CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> executePutBlock(boolean close,
private void executePutBlock(boolean close,
boolean force) throws IOException {
checkOpen();
long flushPos = totalDataFlushedLength;
Expand All @@ -399,56 +402,54 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
dataStreamCloseReply = out.closeAsync();
}

CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> flushFuture = null;
try {
BlockData blockData = containerBlockData.build();
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, blockData, close, token);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
throw new CompletionException(sce);
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null && !force) {
BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.get().getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID.set(responseBlockID);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
final CompletableFuture<ContainerCommandResponseProto> flushFuture
= asyncReply.getResponse().thenApplyAsync(e -> {
try {
validateResponse(e);
} catch (IOException sce) {
throw new CompletionException(sce);
}
// if the ioException is not set, putBlock is successful
if (getIoException() == null && !force) {
BlockID responseBlockID = BlockID.getFromProtobuf(
e.getPutBlock().getCommittedBlockLength().getBlockID());
Preconditions.checkState(blockID.get().getContainerBlockID()
.equals(responseBlockID.getContainerBlockID()));
// updates the bcsId of the block
blockID.set(responseBlockID);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding index " + asyncReply.getLogIndex() +
" commitMap size "
+ commitWatcher.getCommitInfoMapSize() + " flushLength "
+ flushPos + " blockID " + blockID);
}
// for standalone protocol, logIndex will always be 0.
commitWatcher
.updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
if (LOG.isDebugEnabled()) {
LOG.debug("putBlock failed for blockID {} with exception {}",
blockID, e.getLocalizedMessage());
}
CompletionException ce = new CompletionException(e);
setIoException(ce);
throw ce;
});
}
// for standalone protocol, logIndex will always be 0.
commitWatcher
.updateCommitInfoMap(asyncReply.getLogIndex(),
byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
if (LOG.isDebugEnabled()) {
LOG.debug("putBlock failed for blockID {} with exception {}",
blockID, e.getLocalizedMessage());
}
CompletionException ce = new CompletionException(e);
setIoException(ce);
throw ce;
});
putBlockFuture.updateAndGet(f -> f.thenCombine(flushFuture,
(previous, current) -> current));
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
commitWatcher.getFutureMap().put(flushPos, flushFuture);
return flushFuture;
}

@Override
Expand Down Expand Up @@ -484,7 +485,7 @@ private void handleFlush(boolean close)
// data since latest flush - we need to send the "EOF" flag
executePutBlock(true, true);
}
waitOnFlushFutures();
putBlockFuture.get().get();
watchForCommit(false);
// just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded
Expand Down Expand Up @@ -512,15 +513,6 @@ public void close() throws IOException {
}
}

private void waitOnFlushFutures()
throws InterruptedException, ExecutionException {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[commitWatcher.getFutureMap().size()]));
// wait for all the transactions to complete
combinedFuture.get();
}

private void validateResponse(
ContainerProtos.ContainerCommandResponseProto responseProto)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.slf4j.Logger;
Expand All @@ -35,9 +34,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
Expand All @@ -59,18 +55,12 @@ public class StreamCommitWatcher {
// by all servers
private long totalAckDataLength;

// future Map to hold up all putBlock futures
private ConcurrentHashMap<Long,
CompletableFuture<ContainerCommandResponseProto>>
futureMap;

private XceiverClientSpi xceiverClient;

public StreamCommitWatcher(XceiverClientSpi xceiverClient,
List<StreamBuffer> bufferList) {
this.xceiverClient = xceiverClient;
commitIndexMap = new ConcurrentSkipListMap<>();
futureMap = new ConcurrentHashMap<>();
this.bufferList = bufferList;
totalAckDataLength = 0;
}
Expand Down Expand Up @@ -180,15 +170,6 @@ private long releaseBuffers(List<Long> indexes) {
final long length =
buffers.stream().mapToLong(StreamBuffer::position).sum();
totalAckDataLength += length;
// clear the future object from the future Map
final CompletableFuture<ContainerCommandResponseProto> remove =
futureMap.remove(totalAckDataLength);
if (remove == null) {
LOG.error("Couldn't find required future for " + totalAckDataLength);
for (Long key : futureMap.keySet()) {
LOG.error("Existing acknowledged data: " + key);
}
}
for (StreamBuffer byteBuffer : buffers) {
bufferList.remove(byteBuffer);
}
Expand All @@ -209,19 +190,10 @@ private IOException getIOExceptionForWatchForCommit(long commitIndex,
return ioException;
}

public ConcurrentMap<Long,
CompletableFuture<
ContainerCommandResponseProto>> getFutureMap() {
return futureMap;
}

public void cleanup() {
if (commitIndexMap != null) {
commitIndexMap.clear();
}
if (futureMap != null) {
futureMap.clear();
}
commitIndexMap = null;
}
}