Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions hadoop-hdds/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdds-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdds-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,44 @@
*/

package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.writeChunkAsync;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.client.BlockID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.putBlockAsync;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.writeChunkAsync;

/**
* An {@link OutputStream} used by the REST service in combination with the
* SCMClient to write the value of a key to a sequence
Expand Down Expand Up @@ -120,6 +118,13 @@ public class BlockOutputStream extends OutputStream {
private final List<DatanodeDetails> failedServers;
private final Checksum checksum;

//number of buffers used before doing a flush/putBlock.
private int flushPeriod;
//bytes remaining to write in the current buffer.
private int currentBufferRemaining;
//current buffer allocated to write
private ChunkBuffer currentBuffer;

/**
* Creates a new BlockOutputStream.
*
Expand Down Expand Up @@ -154,6 +159,14 @@ public BlockOutputStream(BlockID blockID,
this.bufferPool = bufferPool;
this.bytesPerChecksum = bytesPerChecksum;

//number of buffers used before doing a flush
refreshCurrentBuffer(bufferPool);
flushPeriod = (int) (streamBufferFlushSize / streamBufferSize);

Preconditions
.checkArgument(
(long) flushPeriod * streamBufferSize == streamBufferFlushSize);

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
Expand All @@ -165,6 +178,11 @@ public BlockOutputStream(BlockID blockID,
checksum = new Checksum(checksumType, bytesPerChecksum);
}

private void refreshCurrentBuffer(BufferPool pool) {
currentBuffer = pool.getCurrentBuffer();
currentBufferRemaining =
currentBuffer != null ? currentBuffer.remaining() : 0;
}

public BlockID getBlockID() {
return blockID.get();
Expand Down Expand Up @@ -209,9 +227,18 @@ public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
@Override
public void write(int b) throws IOException {
checkOpen();
byte[] buf = new byte[1];
buf[0] = (byte) b;
write(buf, 0, 1);
allocateNewBufferIfNeeded();
currentBuffer.put((byte) b);
currentBufferRemaining--;
writeChunkIfNeeded();
writtenDataLength++;
doFlushOrWatchIfNeeded();
}

private void writeChunkIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
writeChunk(currentBuffer);
}
}

@Override
Expand All @@ -229,32 +256,36 @@ public void write(byte[] b, int off, int len) throws IOException {
}

while (len > 0) {
// Allocate a buffer if needed. The buffer will be allocated only
// once as needed and will be reused again for multiple blockOutputStream
// entries.
final ChunkBuffer currentBuffer = bufferPool.allocateBufferIfNeeded(
bytesPerChecksum);
final int writeLen = Math.min(currentBuffer.remaining(), len);
allocateNewBufferIfNeeded();
final int writeLen = Math.min(currentBufferRemaining, len);
currentBuffer.put(b, off, writeLen);
if (!currentBuffer.hasRemaining()) {
writeChunk(currentBuffer);
}
currentBufferRemaining -= writeLen;
writeChunkIfNeeded();
off += writeLen;
len -= writeLen;
writtenDataLength += writeLen;
if (shouldFlush()) {
doFlushOrWatchIfNeeded();
}
}

private void doFlushOrWatchIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
updateFlushLength();
executePutBlock(false, false);
}
// Data in the bufferPool can not exceed streamBufferMaxSize
if (isBufferPoolFull()) {
if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
handleFullBuffer();
}
}
}

private boolean shouldFlush() {
return bufferPool.computeBufferData() % streamBufferFlushSize == 0;
private void allocateNewBufferIfNeeded() {
if (currentBufferRemaining == 0) {
currentBuffer = bufferPool.allocateBuffer(bytesPerChecksum);
currentBufferRemaining = currentBuffer.remaining();
}
}

private void updateFlushLength() {
Expand All @@ -264,6 +295,7 @@ private void updateFlushLength() {
private boolean isBufferPoolFull() {
return bufferPool.computeBufferData() == streamBufferMaxSize;
}

/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
Expand Down Expand Up @@ -334,6 +366,7 @@ private void handleFullBuffer() throws IOException {
// only contain data which have not been sufficiently replicated
private void adjustBuffersOnException() {
commitWatcher.releaseBuffersOnException();
refreshCurrentBuffer(bufferPool);
}

/**
Expand Down Expand Up @@ -363,6 +396,8 @@ private void watchForCommit(boolean bufferFull) throws IOException {
setIoException(ioe);
throw getIoException();
}
refreshCurrentBuffer(bufferPool);

}

/**
Expand Down Expand Up @@ -481,7 +516,7 @@ private void handleFlush(boolean close)
checkOpen();
// flush the last chunk data residing on the currentBuffer
if (totalDataFlushedLength < writtenDataLength) {
final ChunkBuffer currentBuffer = bufferPool.getCurrentBuffer();
refreshCurrentBuffer(bufferPool);
Preconditions.checkArgument(currentBuffer.position() > 0);
if (currentBuffer.hasRemaining()) {
writeChunk(currentBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@

package org.apache.hadoop.hdds.scm.storage;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.ozone.common.ChunkBuffer;

import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

/**
* This class creates and manages pool of n buffers.
*/
Expand All @@ -53,7 +54,7 @@ public BufferPool(int bufferSize, int capacity,
this.byteStringConversion = byteStringConversion;
}

public Function<ByteBuffer, ByteString> byteStringConversion(){
public Function<ByteBuffer, ByteString> byteStringConversion() {
return byteStringConversion;
}

Expand All @@ -65,29 +66,22 @@ ChunkBuffer getCurrentBuffer() {
* If the currentBufferIndex is less than the buffer size - 1,
* it means, the next buffer in the list has been freed up for
* rewriting. Reuse the next available buffer in such cases.
*
* <p>
* In case, the currentBufferIndex == buffer.size and buffer size is still
* less than the capacity to be allocated, just allocate a buffer of size
* chunk size.
*
*/
public ChunkBuffer allocateBufferIfNeeded(int increment) {
ChunkBuffer buffer = getCurrentBuffer();
if (buffer != null && buffer.hasRemaining()) {
return buffer;
}
if (currentBufferIndex < bufferList.size() - 1) {
buffer = getBuffer(currentBufferIndex + 1);
public ChunkBuffer allocateBuffer(int increment) {
currentBufferIndex++;
Preconditions.checkArgument(currentBufferIndex <= capacity - 1);
if (currentBufferIndex < bufferList.size()) {
return getBuffer(currentBufferIndex);
} else {
buffer = ChunkBuffer.allocate(bufferSize, increment);
bufferList.add(buffer);
final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize, increment);
bufferList.add(newBuffer);
Preconditions.checkArgument(bufferList.size() <= capacity);
return newBuffer;
}
Preconditions.checkArgument(bufferList.size() <= capacity);
currentBufferIndex++;
// TODO: Turn the below precondition check on when Standalone pipeline
// is removed in the write path in tests
// Preconditions.checkArgument(buffer.position() == 0);
return buffer;
}

void releaseBuffer(ChunkBuffer chunkBuffer) {
Expand Down Expand Up @@ -130,4 +124,11 @@ int getCurrentBufferIndex() {
return currentBufferIndex;
}

public int getNumberOfUsedBuffers() {
return currentBufferIndex + 1;
}

public int getCapacity() {
return capacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,29 @@
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class executes watchForCommit on ratis pipeline and releases
* buffers once data successfully gets replicated.
Expand Down Expand Up @@ -96,7 +97,15 @@ private long releaseBuffers(List<Long> indexes) {
long length = buffers.stream().mapToLong(ChunkBuffer::position).sum();
totalAckDataLength += length;
// clear the future object from the future Map
Preconditions.checkNotNull(futureMap.remove(totalAckDataLength));
final CompletableFuture<ContainerCommandResponseProto> remove =
futureMap.remove(totalAckDataLength);
if (remove == null) {
LOG.error("Couldn't find required future for " + totalAckDataLength);
for (Long key : futureMap.keySet()) {
LOG.error("Existing acknowledged data: " + key);
}
}
Preconditions.checkNotNull(remove);
for (ChunkBuffer byteBuffer : buffers) {
bufferPool.releaseBuffer(byteBuffer);
}
Expand Down
Loading