Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {

private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();

// Similar to 'BufferPool' but this list maintains only references
// to the ByteBuffers.
private List<StreamBuffer> bufferList;

// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
// request will fail upfront.
Expand Down Expand Up @@ -133,7 +138,8 @@ public BlockDataStreamOutput(
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token
Token<? extends TokenIdentifier> token,
List<StreamBuffer> bufferList
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
Expand All @@ -148,7 +154,7 @@ public BlockDataStreamOutput(
// Alternatively, stream setup can be delayed till the first chunk write.
this.out = setupStream(pipeline);
this.token = token;

this.bufferList = bufferList;
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
.getStreamBufferSize());

Expand All @@ -159,7 +165,7 @@ public BlockDataStreamOutput(

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
commitWatcher = new StreamCommitWatcher(xceiverClient);
commitWatcher = new StreamCommitWatcher(xceiverClient, bufferList);
totalDataFlushedLength = 0;
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
Expand Down Expand Up @@ -251,8 +257,11 @@ public void write(ByteBuffer b, int off, int len) throws IOException {
if (len == 0) {
return;
}
writeChunkToContainer(
(ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len));

final StreamBuffer buf = new StreamBuffer(b, off, len);
bufferList.add(buf);

writeChunkToContainer(buf.duplicate());

writtenDataLength += len;
}
Expand All @@ -261,15 +270,38 @@ private void updateFlushLength() {
totalDataFlushedLength = writtenDataLength;
}

@VisibleForTesting
public long getTotalDataFlushedLength() {
return totalDataFlushedLength;
}
/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
* @param len length of data to write
* @throws IOException if error occurred
*/

// TODO: We need add new retry policy without depend on bufferPool.
public void writeOnRetry(long len) throws IOException {
if (len == 0) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
}
int count = 0;
while (len > 0) {
final StreamBuffer buf = bufferList.get(count);
final long writeLen = Math.min(buf.length(), len);
final ByteBuffer duplicated = buf.duplicate();
if (writeLen != buf.length()) {
duplicated.limit(Math.toIntExact(len));
}
writeChunkToContainer(duplicated);
len -= writeLen;
count++;
writtenDataLength += writeLen;
}


}

Expand Down Expand Up @@ -314,6 +346,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force) throws IOException {
checkOpen();
long flushPos = totalDataFlushedLength;
final List<StreamBuffer> byteBufferList;
if (!force) {
Preconditions.checkNotNull(bufferList);
byteBufferList = bufferList;
Preconditions.checkNotNull(byteBufferList);
} else {
byteBufferList = null;
}
flush();
if (close) {
dataStreamCloseReply = out.closeAsync();
Expand Down Expand Up @@ -344,12 +384,12 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ commitWatcher.getCommitInfoSetSize() + " flushLength "
+ commitWatcher.getCommitInfoMapSize() + " flushLength "
+ flushPos + " blockID " + blockID);
}
// for standalone protocol, logIndex will always be 0.
commitWatcher.updateCommitInfoSet(
asyncReply.getLogIndex());
commitWatcher
.updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
Expand Down Expand Up @@ -589,4 +629,8 @@ private void handleExecutionException(Exception ex) throws IOException {
setIoException(ex);
throw getIoException();
}

public long getTotalAckDataLength() {
return commitWatcher.getTotalAckDataLength();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.storage;

import java.nio.ByteBuffer;

/**
* Used for streaming write.
*/
public class StreamBuffer {
private final ByteBuffer buffer;

public StreamBuffer(ByteBuffer buffer) {
this.buffer = buffer.asReadOnlyBuffer();
}

public StreamBuffer(ByteBuffer buffer, int offset, int length) {
this((ByteBuffer) buffer.asReadOnlyBuffer().position(offset)
.limit(offset + length));
}

public ByteBuffer duplicate() {
return buffer.duplicate();
}

public int length() {
return buffer.limit() - buffer.position();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Set;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
* This class executes watchForCommit on ratis pipeline and releases
Expand All @@ -48,7 +52,12 @@ public class StreamCommitWatcher {
private static final Logger LOG =
LoggerFactory.getLogger(StreamCommitWatcher.class);

private Set<Long> commitIndexSet;
private Map<Long, List<StreamBuffer>> commitIndexMap;
private List<StreamBuffer> bufferList;

// total data which has been successfully flushed and acknowledged
// by all servers
private long totalAckDataLength;

// future Map to hold up all putBlock futures
private ConcurrentHashMap<Long,
Expand All @@ -57,18 +66,22 @@ public class StreamCommitWatcher {

private XceiverClientSpi xceiverClient;

public StreamCommitWatcher(XceiverClientSpi xceiverClient) {
public StreamCommitWatcher(XceiverClientSpi xceiverClient,
List<StreamBuffer> bufferList) {
this.xceiverClient = xceiverClient;
commitIndexSet = new ConcurrentSkipListSet();
commitIndexMap = new ConcurrentSkipListMap<>();
futureMap = new ConcurrentHashMap<>();
this.bufferList = bufferList;
totalAckDataLength = 0;
}

public void updateCommitInfoSet(long index) {
commitIndexSet.add(index);
public void updateCommitInfoMap(long index, List<StreamBuffer> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}

int getCommitInfoSetSize() {
return commitIndexSet.size();
int getCommitInfoMapSize() {
return commitIndexMap.size();
}

/**
Expand All @@ -78,12 +91,12 @@ int getCommitInfoSetSize() {
* @throws IOException in case watchForCommit fails
*/
public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
if (!commitIndexSet.isEmpty()) {
if (!commitIndexMap.isEmpty()) {
// wait for the first commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long index =
commitIndexSet.stream().mapToLong(v -> v).min()
commitIndexMap.keySet().stream().mapToLong(v -> v).min()
.getAsLong();
if (LOG.isDebugEnabled()) {
LOG.debug("waiting for first index {} to catch up", index);
Expand All @@ -102,12 +115,12 @@ public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
*/
public XceiverClientReply streamWatchOnLastIndex()
throws IOException {
if (!commitIndexSet.isEmpty()) {
if (!commitIndexMap.isEmpty()) {
// wait for the commit index in the commitIndex2flushedDataMap
// to get committed to all or majority of nodes in case timeout
// happens.
long index =
commitIndexSet.stream().mapToLong(v -> v).max()
commitIndexMap.keySet().stream().mapToLong(v -> v).max()
.getAsLong();
if (LOG.isDebugEnabled()) {
LOG.debug("waiting for last flush Index {} to catch up", index);
Expand All @@ -127,9 +140,16 @@ public XceiverClientReply streamWatchOnLastIndex()
*/
public XceiverClientReply streamWatchForCommit(long commitIndex)
throws IOException {
final long index;
try {
XceiverClientReply reply =
xceiverClient.watchForCommit(commitIndex);
if (reply == null) {
index = 0;
} else {
index = reply.getLogIndex();
}
adjustBuffers(index);
return reply;
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
Expand All @@ -140,11 +160,52 @@ public XceiverClientReply streamWatchForCommit(long commitIndex)
}
}

void releaseBuffersOnException() {
adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
}

private void adjustBuffers(long commitIndex) {
List<Long> keyList = commitIndexMap.keySet().stream()
.filter(p -> p <= commitIndex).collect(Collectors.toList());
if (!keyList.isEmpty()) {
releaseBuffers(keyList);
}
}

private long releaseBuffers(List<Long> indexes) {
Preconditions.checkArgument(!commitIndexMap.isEmpty());
for (long index : indexes) {
Preconditions.checkState(commitIndexMap.containsKey(index));
final List<StreamBuffer> buffers = commitIndexMap.remove(index);
final long length =
buffers.stream().mapToLong(StreamBuffer::length).sum();
totalAckDataLength += length;
// clear the future object from the future Map
final CompletableFuture<ContainerCommandResponseProto> remove =
futureMap.remove(totalAckDataLength);
if (remove == null) {
LOG.error("Couldn't find required future for " + totalAckDataLength);
for (Long key : futureMap.keySet()) {
LOG.error("Existing acknowledged data: " + key);
}
}
for (StreamBuffer byteBuffer : buffers) {
bufferList.remove(byteBuffer);
}
}
return totalAckDataLength;
}

public long getTotalAckDataLength() {
return totalAckDataLength;
}

private IOException getIOExceptionForWatchForCommit(long commitIndex,
Exception e) {
LOG.warn("watchForCommit failed for index {}", commitIndex, e);
IOException ioException = new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
releaseBuffersOnException();
return ioException;
}

Expand All @@ -155,12 +216,12 @@ ContainerCommandResponseProto>> getFutureMap() {
}

public void cleanup() {
if (commitIndexSet != null) {
commitIndexSet.clear();
if (commitIndexMap != null) {
commitIndexMap.clear();
}
if (futureMap != null) {
futureMap.clear();
}
commitIndexSet = null;
commitIndexMap = null;
}
}
Loading