Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public class OzoneClientConfig {
type = ConfigType.SIZE,
description = "Checksum will be computed for every bytes per checksum "
+ "number of bytes and stored sequentially. The minimum value for "
+ "this config is 256KB.",
+ "this config is 16KB.",
tags = ConfigTag.CLIENT)
private int bytesPerChecksum = 1024 * 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,9 @@ private void handleReadError(IOException cause) throws IOException {

refreshPipeline(cause);
}

@VisibleForTesting
public synchronized List<ChunkInputStream> getChunkStreams() {
return chunkStreams;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.storage;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -25,6 +26,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientFactory;

import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

/**
Expand All @@ -48,12 +50,29 @@ public DummyChunkInputStream(ChunkInfo chunkInfo,
}

@Override
protected ByteString readChunk(ChunkInfo readChunkInfo) {
ByteString byteString = ByteString.copyFrom(chunkData,
(int) readChunkInfo.getOffset(),
(int) readChunkInfo.getLen());
getReadByteBuffers().add(byteString);
return byteString;
protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
int offset = (int) readChunkInfo.getOffset();
int remainingToRead = (int) readChunkInfo.getLen();

int bufferCapacity = readChunkInfo.getChecksumData().getBytesPerChecksum();
int bufferLen;
readByteBuffers.clear();
while (remainingToRead > 0) {
if (remainingToRead < bufferCapacity) {
bufferLen = remainingToRead;
} else {
bufferLen = bufferCapacity;
}
ByteString byteString = ByteString.copyFrom(chunkData,
offset, bufferLen);

readByteBuffers.add(byteString);

offset += bufferLen;
remainingToRead -= bufferLen;
}

return BufferUtils.getReadOnlyByteBuffers(readByteBuffers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.hdds.scm.storage;

import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -94,6 +96,19 @@ private void matchWithInputData(byte[] readData, int inputDataStartIndex,
}
}

private void matchWithInputData(List<ByteString> byteStrings,
int inputDataStartIndex, int length) {
int offset = inputDataStartIndex;
int totalBufferLen = 0;
for (ByteString byteString : byteStrings) {
int bufferLen = byteString.size();
matchWithInputData(byteString.toByteArray(), offset, bufferLen);
offset += bufferLen;
totalBufferLen += bufferLen;
}
Assert.assertEquals(length, totalBufferLen);
}

/**
* Seek to a position and verify through getPos().
*/
Expand Down Expand Up @@ -123,10 +138,9 @@ public void testPartialChunkRead() throws Exception {
// To read chunk data from index 0 to 49 (len = 50), we need to read
// chunk from offset 0 to 60 as the checksum boundary is at every 20
// bytes. Verify that 60 bytes of chunk data are read and stored in the
// buffers.
matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(),
0, 60);

// buffers. Since checksum boundary is at every 20 bytes, there should be
// 60/20 number of buffers.
matchWithInputData(chunkStream.getReadByteBuffers(), 0, 60);
}

@Test
Expand All @@ -152,8 +166,7 @@ public void testSeek() throws Exception {
byte[] b = new byte[30];
chunkStream.read(b, 0, 30);
matchWithInputData(b, 25, 30);
matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(),
20, 40);
matchWithInputData(chunkStream.getReadByteBuffers(), 20, 40);

// After read, the position of the chunkStream is evaluated from the
// buffers and the chunkPosition should be reset to -1.
Expand Down Expand Up @@ -216,8 +229,8 @@ public void connectsToNewPipeline() throws Exception {
ChunkInputStream subject = new ChunkInputStream(chunkInfo, null,
clientFactory, pipelineRef::get, false, null) {
@Override
protected ByteString readChunk(ChunkInfo readChunkInfo) {
return ByteString.copyFrom(chunkData);
protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public final class ScmConfigKeys {
// 4 MB by default
public static final String OZONE_SCM_CHUNK_SIZE_DEFAULT = "4MB";

public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY =
"ozone.chunk.read.buffer.default.size";
public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT =
"64KB";

public static final String OZONE_SCM_CHUNK_LAYOUT_KEY =
"ozone.scm.chunk.layout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
package org.apache.hadoop.hdds.scm.protocolPB;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto.Builder;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DataBuffers;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto;
Expand All @@ -34,8 +39,11 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;

/**
* A set of helper functions to create responses to container commands.
*/
Expand Down Expand Up @@ -204,26 +212,46 @@ public static ContainerCommandResponseProto getPutFileResponseSuccess(

/**
* Gets a response to the read small file call.
* @param msg - Msg
* @param data - Data
* @param request - Msg
* @param dataBuffers - Data
* @param info - Info
* @return Response.
*/
public static ContainerCommandResponseProto getGetSmallFileResponseSuccess(
ContainerCommandRequestProto msg, ByteString data, ChunkInfo info) {

Preconditions.checkNotNull(msg);

ReadChunkResponseProto.Builder readChunk =
ReadChunkResponseProto.newBuilder()
.setChunkData(info)
.setData((data))
.setBlockID(msg.getGetSmallFile().getBlock().getBlockID());
ContainerCommandRequestProto request, List<ByteString> dataBuffers,
ChunkInfo info) {

Preconditions.checkNotNull(request);

boolean isReadChunkV0 = getReadChunkVersion(request.getGetSmallFile())
.equals(ContainerProtos.ReadChunkVersion.V0);

ReadChunkResponseProto.Builder readChunk;

if (isReadChunkV0) {
// V0 has all response data in a single ByteBuffer
ByteString combinedData = ByteString.EMPTY;
for (ByteString buffer : dataBuffers) {
combinedData.concat(buffer);
}
readChunk = ReadChunkResponseProto.newBuilder()
.setChunkData(info)
.setData(combinedData)
.setBlockID(request.getGetSmallFile().getBlock().getBlockID());
} else {
// V1 splits response data into a list of ByteBuffers
readChunk = ReadChunkResponseProto.newBuilder()
.setChunkData(info)
.setDataBuffers(DataBuffers.newBuilder()
.addAllBuffers(dataBuffers)
.build())
.setBlockID(request.getGetSmallFile().getBlock().getBlockID());
}

GetSmallFileResponseProto.Builder getSmallFile =
GetSmallFileResponseProto.newBuilder().setData(readChunk);

return getSuccessResponseBuilder(msg)
return getSuccessResponseBuilder(request)
.setCmdType(Type.GetSmallFile)
.setGetSmallFile(getSmallFile)
.build();
Expand All @@ -250,13 +278,29 @@ public static ContainerCommandResponseProto getReadContainerResponse(
}

public static ContainerCommandResponseProto getReadChunkResponse(
ContainerCommandRequestProto request, ByteString data) {

ReadChunkResponseProto.Builder response =
ReadChunkResponseProto.newBuilder()
.setChunkData(request.getReadChunk().getChunkData())
.setData(data)
.setBlockID(request.getReadChunk().getBlockID());
ContainerCommandRequestProto request, ChunkBuffer data,
Function<ByteBuffer, ByteString> byteBufferToByteString) {

boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk())
.equals(ContainerProtos.ReadChunkVersion.V0);

ReadChunkResponseProto.Builder response;

if (isReadChunkV0) {
// V0 has all response data in a single ByteBuffer
response = ReadChunkResponseProto.newBuilder()
.setChunkData(request.getReadChunk().getChunkData())
.setData(data.toByteString(byteBufferToByteString))
.setBlockID(request.getReadChunk().getBlockID());
} else {
// V1 splits response data into a list of ByteBuffers
response = ReadChunkResponseProto.newBuilder()
.setChunkData(request.getReadChunk().getChunkData())
.setDataBuffers(DataBuffers.newBuilder()
.addAllBuffers(data.toByteStringList(byteBufferToByteString))
.build())
.setBlockID(request.getReadChunk().getBlockID());
}

return getSuccessResponseBuilder(request)
.setReadChunk(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public static ContainerProtos.ReadChunkResponseProto readChunk(
ReadChunkRequestProto.Builder readChunkRequest =
ReadChunkRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk);
.setChunkData(chunk)
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
Expand Down Expand Up @@ -489,6 +490,7 @@ public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
GetSmallFileRequestProto
.newBuilder().setBlock(getBlock)
.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1)
.build();
String id = client.getPipeline().getClosestNode().getUuidString();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.utils;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;

public final class ClientCommandsUtils {

/** Utility classes should not be constructed. **/
private ClientCommandsUtils() {

}

public static ContainerProtos.ReadChunkVersion getReadChunkVersion(
ContainerProtos.ReadChunkRequestProto readChunkRequest) {
if (readChunkRequest.hasReadChunkVersion()) {
return readChunkRequest.getReadChunkVersion();
} else {
return ContainerProtos.ReadChunkVersion.V0;
}
}

public static ContainerProtos.ReadChunkVersion getReadChunkVersion(
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest) {
if (getSmallFileRequest.hasReadChunkVersion()) {
return getSmallFileRequest.getReadChunkVersion();
} else {
return ContainerProtos.ReadChunkVersion.V0;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.utils;

/**
* This package contains utility classes for the SCM and client protocols.
*/
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public final class OzoneConfigKeys {
"hdds.datanode.replication.work.dir";


public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024;
public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 16 * 1024;

public static final String OZONE_CLIENT_READ_TIMEOUT
= "ozone.client.read.timeout";
Expand Down
Loading