Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,12 @@ public static String getHostName(ConfigurationSource conf)
* @param proto ContainerCommand Request proto
* @return True if its readOnly , false otherwise.
*/
public static boolean isReadOnly(
ContainerCommandRequestProtoOrBuilder proto) {
switch (proto.getCmdType()) {
public static boolean isReadOnly(ContainerCommandRequestProtoOrBuilder proto) {
return isReadOnly(proto.getCmdType());
}

public static boolean isReadOnly(ContainerProtos.Type type) {
switch (type) {
case ReadContainer:
case ReadChunk:
case ListBlock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller;
import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -107,9 +108,9 @@ public StreamObserver<ContainerCommandRequestProto> send(

@Override
public void onNext(ContainerCommandRequestProto request) {
final DispatcherContext context = DispatcherContext.getDispatcherContext(request.getCmdType());
try {
ContainerCommandResponseProto resp =
dispatcher.dispatch(request, null);
final ContainerCommandResponseProto resp = dispatcher.dispatch(request, context);
responseObserver.onNext(resp);
} catch (Throwable e) {
LOG.error("Got exception when processing"
Expand All @@ -121,6 +122,9 @@ public void onNext(ContainerCommandRequestProto request) {
if (popStream != null) {
IOUtils.close(LOG, popStream);
}
if (context != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar context creation/close should be done for ContainerStateMachine#readStateMachineData?

Copy link
Contributor Author

@szetszwo szetszwo Dec 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@duongkame , Good catch! We should if we make DispatcherContext closable for releasing buffers.

On a second thought, HDDS-7117 already has changed using MappedByteBuffer instead of allocating buffers when the size is larger than ozone.chunk.read.mapped.buffer.threshold (default 32KB). So, it seems that we don't have do anything for read. Could you test HDDS-9536 with the current code?

(Revised: fixed typo: "small" -> "larger")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds nice. @szetszwo , will update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that we don't have do anything for read

@szetszwo Should close this PR and resolve HDDS-9936 then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let's close this.

context.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,13 +710,19 @@ public CompletableFuture<Message> write(LogEntryProto entry, TransactionContext
}

@Override
public CompletableFuture<Message> query(Message request) {
public CompletableFuture<Message> query(Message message) {
try {
metrics.incNumQueryStateMachineOps();
final ContainerCommandRequestProto requestProto =
message2ContainerCommandRequestProto(request);
return CompletableFuture.completedFuture(
dispatchCommand(requestProto, null)::toByteString);
final ContainerCommandRequestProto request = message2ContainerCommandRequestProto(message);
final DispatcherContext context = DispatcherContext.getDispatcherContext(request.getCmdType());
try {
final ContainerCommandResponseProto response = dispatchCommand(request, context);
return CompletableFuture.completedFuture(response::toByteString);
} finally {
if (context != null) {
context.close();
}
}
} catch (IOException e) {
metrics.incNumQueryStateMachineFails();
return completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,56 @@

import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.UncheckedAutoCloseable;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.ReadChunk;

/**
* DispatcherContext class holds transport protocol specific context info
* required for execution of container commands over the container dispatcher.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class DispatcherContext {
private static final DispatcherContext HANDLE_READ_CHUNK
= newBuilder(Op.HANDLE_READ_CHUNK).build();
public final class DispatcherContext implements UncheckedAutoCloseable {
private static final DispatcherContext HANDLE_WRITE_CHUNK
= newBuilder(Op.HANDLE_WRITE_CHUNK).build();
private static final DispatcherContext HANDLE_GET_SMALL_FILE
= newBuilder(Op.HANDLE_GET_SMALL_FILE).build();
private static final DispatcherContext HANDLE_PUT_SMALL_FILE
= newBuilder(Op.HANDLE_PUT_SMALL_FILE).build();

public static DispatcherContext getHandleReadChunk() {
return HANDLE_READ_CHUNK;
return newBuilder(Op.HANDLE_READ_CHUNK).build();
}

public static DispatcherContext getHandleWriteChunk() {
return HANDLE_WRITE_CHUNK;
}

public static DispatcherContext getHandleGetSmallFile() {
return HANDLE_GET_SMALL_FILE;
return newBuilder(Op.HANDLE_GET_SMALL_FILE).build();
}

public static DispatcherContext getHandlePutSmallFile() {
return HANDLE_PUT_SMALL_FILE;
}

public static DispatcherContext getDispatcherContext(ContainerProtos.Type type) {
switch (type) {
case ReadChunk:
return DispatcherContext.getHandleReadChunk();
case GetSmallFile:
return DispatcherContext.getHandleGetSmallFile();
default:
return null;
}
}

/**
* Determines which stage of writeChunk a write chunk request is for.
*/
Expand Down Expand Up @@ -108,6 +121,11 @@ public static Op op(DispatcherContext context) {
return context == null ? Op.NULL : context.getOp();
}

public static void assertOp(DispatcherContext context, Op expected) {
Objects.requireNonNull(context, "context == null");
Preconditions.assertSame(expected, context.getOp(), "op");
}

private final Op op;
// whether the chunk data needs to be written or committed or both
private final WriteChunkStage stage;
Expand All @@ -118,6 +136,8 @@ public static Op op(DispatcherContext context) {

private final Map<Long, Long> container2BCSIDMap;

private final AtomicReference<UncheckedAutoCloseable> resource = new AtomicReference<>();

private DispatcherContext(Builder b) {
this.op = Objects.requireNonNull(b.op, "op == null");
this.term = b.term;
Expand Down Expand Up @@ -147,6 +167,19 @@ public Map<Long, Long> getContainer2BCSIDMap() {
return container2BCSIDMap;
}

public void setResource(UncheckedAutoCloseable closeable) {
final UncheckedAutoCloseable previous = resource.getAndSet(closeable);
Preconditions.assertNull(previous, "Resource is already set");
}

@Override
public void close() {
final UncheckedAutoCloseable closeable = resource.getAndSet(null);
if (closeable != null) {
closeable.close();
}
}

@Override
public String toString() {
return op + "-" + stage + TermIndex.valueOf(term, logIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdfs.util.Canceler;
Expand Down Expand Up @@ -89,8 +91,7 @@
*/
public class KeyValueContainer implements Container<KeyValueContainerData> {

private static final Logger LOG =
LoggerFactory.getLogger(KeyValueContainer.class);
private static final Logger LOG = LoggerFactory.getLogger(KeyValueContainer.class);

// Use a non-fair RW lock for better throughput, we may revisit this decision
// if this causes fairness issues.
Expand Down Expand Up @@ -923,6 +924,17 @@ private ContainerReplicaProto.State getHddsState()
return state;
}

void warnIfUnhealthyForRead(BlockID blockID, Type cmd) {
Preconditions.checkArgument(HddsUtils.isReadOnly(cmd), "cmd %s is NOT readonly", cmd);

// readlock is NOT required since
// (1) containerData and containerID are final, and
// (2) getState() is synchronized.
if (containerData.getState() == ContainerDataProto.State.UNHEALTHY) {
LOG.warn("{} request {} for UNHEALTHY container {} replica", cmd, blockID, containerData.getContainerID());
}
}

/**
* Returns container DB file.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
Expand Down Expand Up @@ -268,10 +269,9 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
case CompactChunk:
return handler.handleUnsupportedOp(request);
case PutSmallFile:
return handler
.handlePutSmallFile(request, kvContainer, dispatcherContext);
return handler.handlePutSmallFile(request, kvContainer, dispatcherContext);
case GetSmallFile:
return handler.handleGetSmallFile(request, kvContainer);
return handler.handleGetSmallFile(request, kvContainer, dispatcherContext);
case GetCommittedBlockLength:
return handler.handleGetCommittedBlockLength(request, kvContainer);
default:
Expand Down Expand Up @@ -580,7 +580,7 @@ ContainerCommandResponseProto handleGetBlock(
try {
BlockID blockID = BlockID.getFromProtobuf(
request.getGetBlock().getBlockID());
checkContainerIsHealthy(kvContainer, blockID, Type.GetBlock);
kvContainer.warnIfUnhealthyForRead(blockID, Type.GetBlock);
responseData = blockManager.getBlock(kvContainer, blockID)
.getProtoBufMessage();
final long numBytes = responseData.getSerializedSize();
Expand Down Expand Up @@ -615,8 +615,7 @@ ContainerCommandResponseProto handleGetCommittedBlockLength(
try {
BlockID blockID = BlockID
.getFromProtobuf(request.getGetCommittedBlockLength().getBlockID());
checkContainerIsHealthy(kvContainer, blockID,
Type.GetCommittedBlockLength);
kvContainer.warnIfUnhealthyForRead(blockID, Type.GetCommittedBlockLength);
BlockUtils.verifyBCSId(kvContainer, blockID);
blockLength = blockManager.getCommittedBlockLength(kvContainer, blockID);
} catch (StorageContainerException ex) {
Expand Down Expand Up @@ -686,6 +685,7 @@ ContainerCommandResponseProto handleDeleteBlock(
ContainerCommandResponseProto handleReadChunk(
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
DispatcherContext.assertOp(dispatcherContext, Op.HANDLE_READ_CHUNK);

if (!request.hasReadChunk()) {
if (LOG.isDebugEnabled()) {
Expand All @@ -703,11 +703,8 @@ ContainerCommandResponseProto handleReadChunk(
.getChunkData());
Preconditions.checkNotNull(chunkInfo);

checkContainerIsHealthy(kvContainer, blockID, Type.ReadChunk);
kvContainer.warnIfUnhealthyForRead(blockID, Type.ReadChunk);
BlockUtils.verifyBCSId(kvContainer, blockID);
if (dispatcherContext == null) {
dispatcherContext = DispatcherContext.getHandleReadChunk();
}

boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk())
.equals(ContainerProtos.ReadChunkVersion.V0);
Expand Down Expand Up @@ -908,7 +905,9 @@ ContainerCommandResponseProto handlePutSmallFile(
* ChunkManager to process the request.
*/
ContainerCommandResponseProto handleGetSmallFile(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
DispatcherContext.assertOp(dispatcherContext, Op.HANDLE_GET_SMALL_FILE);

if (!request.hasGetSmallFile()) {
if (LOG.isDebugEnabled()) {
Expand All @@ -923,13 +922,11 @@ ContainerCommandResponseProto handleGetSmallFile(
try {
BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock()
.getBlockID());
checkContainerIsHealthy(kvContainer, blockID, Type.GetSmallFile);
kvContainer.warnIfUnhealthyForRead(blockID, Type.GetSmallFile);
BlockData responseData = blockManager.getBlock(kvContainer, blockID);

ContainerProtos.ChunkInfo chunkInfoProto = null;
List<ByteString> dataBuffers = new ArrayList<>();
final DispatcherContext dispatcherContext
= DispatcherContext.getHandleGetSmallFile();
for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
// if the block is committed, all chunks must have been committed.
// Tmp chunk files won't exist here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data,
}
}

public static ChunkBuffer readData(long len, int bufferCapacity,
public static ChunkBuffer readData(int len, int bufferCapacity,
File file, long off, HddsVolume volume, int readMappedBufferThreshold)
throws StorageContainerException {
if (len > readMappedBufferThreshold) {
Expand Down Expand Up @@ -454,15 +454,15 @@ private static void checkSize(String of, long expected, long actual,
}
}

public static void limitReadSize(long len)
throws StorageContainerException {
if (len > OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) {
String err = String.format(
"Oversize read. max: %d, actual: %d",
OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE, len);
LOG.error(err);
throw new StorageContainerException(err, UNSUPPORTED_REQUEST);
public static int limitReadSize(long len) throws StorageContainerException {
final int max = OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE;
if (len <= max) {
return Math.toIntExact(len);
}

final String err = String.format("Oversize read. max: %d, actual: %d", max, len);
LOG.error(err);
throw new StorageContainerException(err, UNSUPPORTED_REQUEST);
}

public static StorageContainerException wrapInStorageContainerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;

import java.nio.ByteBuffer;

import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.limitReadSize;

/**
Expand Down Expand Up @@ -74,9 +72,8 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,
ChunkInfo info, DispatcherContext dispatcherContext)
throws StorageContainerException {

limitReadSize(info.getLen());
// stats are handled in ChunkManagerImpl
return ChunkBuffer.wrap(ByteBuffer.allocate((int) info.getLen()));
return ChunkBuffer.allocate(limitReadSize(info.getLen()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,
return ChunkBuffer.wrap(ByteBuffer.wrap(new byte[0]));
}

limitReadSize(info.getLen());
final int len = limitReadSize(info.getLen());

KeyValueContainerData containerData = (KeyValueContainerData) container
.getContainerData();
Expand All @@ -190,7 +190,6 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,

File chunkFile = getChunkFile(container, blockID, info);

final long len = info.getLen();
long offset = info.getOffset();
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
defaultReadBufferCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,
throws StorageContainerException {

checkLayoutVersion(container);
limitReadSize(info.getLen());
final int len = limitReadSize(info.getLen());

KeyValueContainer kvContainer = (KeyValueContainer) container;
KeyValueContainerData containerData = kvContainer.getContainerData();
Expand All @@ -229,7 +229,6 @@ public ChunkBuffer readChunk(Container container, BlockID blockID,
possibleFiles.add(finalChunkFile);
}

final long len = info.getLen();
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
defaultReadBufferCapacity);

Expand Down
Loading