byteStringConversion() {
return byteStringConversion;
}
@@ -65,29 +66,22 @@ ChunkBuffer getCurrentBuffer() {
* If the currentBufferIndex is less than the buffer size - 1,
* it means, the next buffer in the list has been freed up for
* rewriting. Reuse the next available buffer in such cases.
- *
+ *
* In case, the currentBufferIndex == buffer.size and buffer size is still
* less than the capacity to be allocated, just allocate a buffer of size
* chunk size.
- *
*/
- public ChunkBuffer allocateBufferIfNeeded(int increment) {
- ChunkBuffer buffer = getCurrentBuffer();
- if (buffer != null && buffer.hasRemaining()) {
- return buffer;
- }
- if (currentBufferIndex < bufferList.size() - 1) {
- buffer = getBuffer(currentBufferIndex + 1);
+ public ChunkBuffer allocateBuffer(int increment) {
+ currentBufferIndex++;
+ Preconditions.checkArgument(currentBufferIndex <= capacity - 1);
+ if (currentBufferIndex < bufferList.size()) {
+ return getBuffer(currentBufferIndex);
} else {
- buffer = ChunkBuffer.allocate(bufferSize, increment);
- bufferList.add(buffer);
+ final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize, increment);
+ bufferList.add(newBuffer);
+ Preconditions.checkArgument(bufferList.size() <= capacity);
+ return newBuffer;
}
- Preconditions.checkArgument(bufferList.size() <= capacity);
- currentBufferIndex++;
- // TODO: Turn the below precondition check on when Standalone pipeline
- // is removed in the write path in tests
- // Preconditions.checkArgument(buffer.position() == 0);
- return buffer;
}
void releaseBuffer(ChunkBuffer chunkBuffer) {
@@ -130,4 +124,11 @@ int getCurrentBufferIndex() {
return currentBufferIndex;
}
+ public int getNumberOfUsedBuffers() {
+ return currentBufferIndex + 1;
+ }
+
+ public int getCapacity() {
+ return capacity;
+ }
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 34d0d7c58a09..ab6e2526d5c9 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -24,28 +24,29 @@
*/
package org.apache.hadoop.hdds.scm.storage;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientReply;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This class executes watchForCommit on ratis pipeline and releases
* buffers once data successfully gets replicated.
@@ -96,7 +97,15 @@ private long releaseBuffers(List indexes) {
long length = buffers.stream().mapToLong(ChunkBuffer::position).sum();
totalAckDataLength += length;
// clear the future object from the future Map
- Preconditions.checkNotNull(futureMap.remove(totalAckDataLength));
+ final CompletableFuture remove =
+ futureMap.remove(totalAckDataLength);
+ if (remove == null) {
+ LOG.error("Couldn't find required future for " + totalAckDataLength);
+ for (Long key : futureMap.keySet()) {
+ LOG.error("Existing acknowledged data: " + key);
+ }
+ }
+ Preconditions.checkNotNull(remove);
for (ChunkBuffer byteBuffer : buffers) {
bufferPool.releaseBuffer(byteBuffer);
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
new file mode 100644
index 000000000000..141a1d81e832
--- /dev/null
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * UNIT test for BlockOutputStream.
+ *
+ * Compares bytes written to the stream and received in the ChunkWriteRequests.
+ */
+public class TestBlockOutputStreamCorrectness {
+
+ private static final long SEED = 18480315L;
+
+ private int writeUnitSize = 1;
+
+ @Test
+ public void test() throws IOException {
+
+ final BufferPool bufferPool = new BufferPool(4 * 1024 * 1024, 32 / 4);
+
+ for (int block = 0; block < 10; block++) {
+ BlockOutputStream outputStream =
+ createBlockOutputStream(bufferPool);
+
+ Random random = new Random(SEED);
+
+ int max = 256 * 1024 * 1024 / writeUnitSize;
+
+ byte[] writeBuffer = new byte[writeUnitSize];
+ for (int t = 0; t < max; t++) {
+ if (writeUnitSize > 1) {
+ for (int i = 0; i < writeBuffer.length; i++) {
+ writeBuffer[i] = (byte) random.nextInt();
+ }
+ outputStream.write(writeBuffer, 0, writeBuffer.length);
+ } else {
+ outputStream.write((byte) random.nextInt());
+ }
+ }
+ outputStream.close();
+ }
+ }
+
+ @NotNull
+ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
+ throws IOException {
+
+ final Pipeline pipeline = MockPipeline.createRatisPipeline();
+
+ final XceiverClientManager xcm = Mockito.mock(XceiverClientManager.class);
+ Mockito.when(xcm.acquireClient(Mockito.any()))
+ .thenReturn(new MockXceiverClientSpi(pipeline));
+
+ BlockOutputStream outputStream = new BlockOutputStream(
+ new BlockID(1L, 1L),
+ xcm,
+ pipeline,
+ 4 * 1024 * 1024,
+ 16 * 1024 * 1024,
+ true,
+ 32 * 1024 * 1024,
+ bufferPool,
+ ChecksumType.NONE,
+ 256 * 1024);
+ return outputStream;
+ }
+
+ /**
+ * XCeiverClient which simulates responses.
+ */
+ private class MockXceiverClientSpi extends XceiverClientSpi {
+
+ private final Pipeline pipeline;
+
+ private Random expectedRandomStream = new Random(SEED);
+
+ private AtomicInteger counter = new AtomicInteger();
+
+ MockXceiverClientSpi(Pipeline pipeline) {
+ super();
+ this.pipeline = pipeline;
+ }
+
+ @Override
+ public void connect() throws Exception {
+
+ }
+
+ @Override
+ public void connect(String encodedToken) throws Exception {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public XceiverClientReply sendCommandAsync(
+ ContainerCommandRequestProto request
+ )
+ throws IOException, ExecutionException, InterruptedException {
+
+ final ContainerCommandResponseProto.Builder builder =
+ ContainerCommandResponseProto.newBuilder()
+ .setResult(Result.SUCCESS)
+ .setCmdType(request.getCmdType());
+
+ switch (request.getCmdType()) {
+ case PutBlock:
+ builder.setPutBlock(PutBlockResponseProto.newBuilder()
+ .setCommittedBlockLength(
+ GetCommittedBlockLengthResponseProto.newBuilder()
+ .setBlockID(
+ request.getPutBlock().getBlockData().getBlockID())
+ .setBlockLength(
+ request.getPutBlock().getBlockData().getSize())
+ .build())
+ .build());
+ break;
+ case WriteChunk:
+ ByteString data = request.getWriteChunk().getData();
+ final byte[] writePayload = data.toByteArray();
+ for (int i = 0; i < writePayload.length; i++) {
+ byte expectedByte = (byte) expectedRandomStream.nextInt();
+ Assert.assertEquals(expectedByte,
+ writePayload[i]);
+ }
+ break;
+ default:
+ //no-op
+ }
+ final XceiverClientReply result = new XceiverClientReply(
+ CompletableFuture.completedFuture(builder.build()));
+ result.setLogIndex(counter.incrementAndGet());
+ return result;
+
+ }
+
+ @Override
+ public ReplicationType getPipelineType() {
+ return null;
+ }
+
+ @Override
+ public XceiverClientReply watchForCommit(long index)
+ throws InterruptedException, ExecutionException, TimeoutException,
+ IOException {
+ final ContainerCommandResponseProto.Builder builder =
+ ContainerCommandResponseProto.newBuilder()
+ .setCmdType(Type.WriteChunk)
+ .setResult(Result.SUCCESS);
+ final XceiverClientReply xceiverClientReply = new XceiverClientReply(
+ CompletableFuture.completedFuture(builder.build()));
+ xceiverClientReply.setLogIndex(index);
+ return xceiverClientReply;
+ }
+
+ @Override
+ public long getReplicatedMinCommitIndex() {
+ return 0;
+ }
+
+ @Override
+ public Map
+ sendCommandOnAllNodes(ContainerCommandRequestProto request
+ ) throws IOException, InterruptedException {
+ return null;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
new file mode 100644
index 000000000000..cd53d71e4767
--- /dev/null
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for buffer pool.
+ */
+public class TestBufferPool {
+
+ @Test
+ public void releaseAndReallocate() {
+ BufferPool pool = new BufferPool(1024, 8);
+ ChunkBuffer cb1 = pool.allocateBuffer(0);
+ ChunkBuffer cb2 = pool.allocateBuffer(0);
+ ChunkBuffer cb3 = pool.allocateBuffer(0);
+
+ pool.releaseBuffer(cb1);
+
+ //current state cb2, -> cb3, cb1
+ final ChunkBuffer allocated = pool.allocateBuffer(0);
+ Assert.assertEquals(3, pool.getSize());
+ Assert.assertEquals(cb1, allocated);
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 1c7d1f6408d8..fd49529cbf15 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -31,9 +31,9 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
/**
* A Client for the storageContainer protocol.
@@ -43,7 +43,7 @@ public abstract class XceiverClientSpi implements Closeable {
private final AtomicInteger referenceCount;
private boolean isEvicted;
- XceiverClientSpi() {
+ public XceiverClientSpi() {
this.referenceCount = new AtomicInteger(0);
this.isEvicted = false;
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 11acf82ff32f..ea5fc825ca81 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,12 +18,41 @@
package org.apache.hadoop.hdds.scm.storage;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CloseContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .BlockNotCommittedException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSelector;
import org.apache.hadoop.io.Text;
@@ -31,53 +60,8 @@
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.common.helpers
- .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .CloseContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .DatanodeBlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .GetBlockRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .GetBlockResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .GetSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .GetSmallFileResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .PutBlockRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .PutSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ReadChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ReadContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ReadContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .WriteChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- PutSmallFileResponseProto;
-import org.apache.hadoop.hdds.client.BlockID;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
/**
* Implementation of all container protocol calls performed by Container
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index b7ba6d6c5afd..65f8a895a4b6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.ozone.common;
-import org.apache.hadoop.hdds.scm.ByteStringConversion;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
@@ -27,6 +24,10 @@
import java.util.function.Function;
import java.util.function.Supplier;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
/** Buffer for a block chunk. */
public interface ChunkBuffer {
@@ -88,6 +89,13 @@ default ChunkBuffer put(byte[] b) {
return put(ByteBuffer.wrap(b));
}
+ /** Similar to {@link ByteBuffer#put(byte[])}. */
+ default ChunkBuffer put(byte b) {
+ byte[] buf = new byte[1];
+ buf[0] = (byte) b;
+ return put(buf, 0, 1);
+ }
+
/** Similar to {@link ByteBuffer#put(byte[], int, int)}. */
default ChunkBuffer put(byte[] b, int offset, int length) {
return put(ByteBuffer.wrap(b, offset, length));
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 299cab87bef8..dd8163504bf5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.ozone.common;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
@@ -29,6 +27,8 @@
import java.util.Objects;
import java.util.function.Function;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
/** {@link ChunkBuffer} implementation using a single {@link ByteBuffer}. */
final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
private final ByteBuffer buffer;
@@ -104,6 +104,12 @@ public ChunkBuffer put(ByteBuffer b) {
return this;
}
+ @Override
+ public ChunkBuffer put(byte b) {
+ buffer.put(b);
+ return this;
+ }
+
@Override
public ChunkBuffer clear() {
buffer.clear();
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
index a8ddac102ec1..556c052b32d8 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
@@ -17,16 +17,19 @@
*/
package org.apache.hadoop.hdds.scm.pipeline;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+
+import com.google.common.base.Preconditions;
+
/**
* Provides {@link Pipeline} factory methods for testing.
*/
@@ -68,6 +71,22 @@ public static Pipeline createPipeline(Iterable ids) {
.build();
}
+ public static Pipeline createRatisPipeline() {
+
+ List nodes = new ArrayList<>();
+ nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+
+ return Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.OPEN)
+ .setId(PipelineID.randomId())
+ .setType(ReplicationType.RATIS)
+ .setFactor(ReplicationFactor.THREE)
+ .setNodes(nodes)
+ .build();
+ }
+
private MockPipeline() {
throw new UnsupportedOperationException("no instances");
}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
index 9ca735cb789e..9b69fad79154 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
@@ -17,12 +17,6 @@
*/
package org.apache.hadoop.ozone.common;
-import org.apache.hadoop.hdds.utils.MockGatheringChannel;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
-
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -33,6 +27,13 @@
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hdds.utils.MockGatheringChannel;
+
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
/**
* Test {@link ChunkBuffer} implementations.
*/
@@ -71,7 +72,8 @@ public void testIncrementalChunkBuffer() {
private static void runTestIncrementalChunkBuffer(int increment, int n) {
final byte[] expected = new byte[n];
ThreadLocalRandom.current().nextBytes(expected);
- runTestImpl(expected, increment, ChunkBuffer.allocate(n, increment));
+ runTestImpl(expected, increment,
+ new IncrementalChunkBuffer(n, increment, false));
}
@Test(timeout = 1_000)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index 0a2ff14706e4..63beeeea68a2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -17,6 +17,14 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
@@ -24,11 +32,11 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -43,6 +51,9 @@
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
+
+import static java.util.Collections.singletonList;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
@@ -50,21 +61,9 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static java.util.Collections.singletonList;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
- OZONE_SCM_STALENODE_INTERVAL;
/**
* Class to test CommitWatcher functionality.
@@ -187,7 +186,7 @@ public void testReleaseBuffers() throws Exception {
ContainerTestHelper
.getWriteChunkRequest(pipeline, blockID, chunkSize, null);
// add the data to the buffer pool
- final ChunkBuffer byteBuffer = bufferPool.allocateBufferIfNeeded(0);
+ final ChunkBuffer byteBuffer = bufferPool.allocateBuffer(0);
byteBuffer.put(writeChunkRequest.getWriteChunk().getData());
ratisClient.sendCommandAsync(writeChunkRequest);
ContainerProtos.ContainerCommandRequestProto putBlockRequest =
@@ -264,7 +263,7 @@ public void testReleaseBuffersOnException() throws Exception {
ContainerTestHelper
.getWriteChunkRequest(pipeline, blockID, chunkSize, null);
// add the data to the buffer pool
- final ChunkBuffer byteBuffer = bufferPool.allocateBufferIfNeeded(0);
+ final ChunkBuffer byteBuffer = bufferPool.allocateBuffer(0);
byteBuffer.put(writeChunkRequest.getWriteChunk().getData());
ratisClient.sendCommandAsync(writeChunkRequest);
ContainerProtos.ContainerCommandRequestProto putBlockRequest =