Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ public static boolean isReadOnly(
case PutSmallFile:
case StreamInit:
case StreamWrite:
case FinalizeBlock:
default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,18 @@ public static ContainerCommandResponseProto getReadChunkResponse(
.build();
}

public static ContainerCommandResponseProto getFinalizeBlockResponse(
ContainerCommandRequestProto msg, BlockData data) {

ContainerProtos.FinalizeBlockResponseProto.Builder blockData =
ContainerProtos.FinalizeBlockResponseProto.newBuilder()
.setBlockData(data);

return getSuccessResponseBuilder(msg)
.setFinalizeBlock(blockData)
.build();
}

private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,12 @@ public static ContainerCommandRequestProto getDummyCommandRequestProto(
.build())
.build());
break;
case FinalizeBlock:
builder
.setFinalizeBlock(ContainerProtos
.FinalizeBlockRequestProto.newBuilder()
.setBlockID(fakeBlockId).build());
break;

default:
Assert.fail("Unhandled request type " + cmdType + " in unit test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public enum DNAction implements AuditAction {
GET_SMALL_FILE,
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT;
STREAM_INIT,
FINALIZE_BLOCK;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ private static DNAction getAuditAction(Type cmdType) {
case CloseContainer : return DNAction.CLOSE_CONTAINER;
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
case FinalizeBlock : return DNAction.FINALIZE_BLOCK;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down Expand Up @@ -897,6 +898,12 @@ private static Map<String, String> getAuditParams(
.toString());
return auditParams;

case FinalizeBlock:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getFinalizeBlock().getBlockID())
.toString());
return auditParams;

default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ public abstract void deleteBlock(Container container, BlockData blockData)
public abstract void deleteUnreferenced(Container container, long localID)
throws IOException;

public abstract void addFinalizedBlock(Container container, long localID);

public abstract boolean isFinalizedBlockExist(Container container, long localID);

public void setClusterID(String clusterID) {
this.clusterId = clusterID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.JavaUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -376,8 +377,20 @@ public TransactionContext startTransaction(RaftClientRequest request)
ctxt.setException(ioe);
return ctxt;
}
if (proto.getCmdType() == Type.WriteChunk) {
if (proto.getCmdType() == Type.PutBlock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does PutBlock need change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to reject request from original client if there is any PUT block request after finalize block is called by recoverer client.

TransactionContext ctxt = rejectRequest(request,
proto.getContainerID(), proto.getPutBlock().getBlockData()
.getBlockID().getLocalID());
if (ctxt != null) {
return ctxt;
}
} else if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
TransactionContext ctxt = rejectRequest(request,
proto.getContainerID(), write.getBlockID().getLocalID());
if (ctxt != null) {
return ctxt;
}
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto.newBuilder()
Expand All @@ -403,16 +416,32 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
} else {
return TransactionContext.newBuilder()
} else if (proto.getCmdType() == Type.FinalizeBlock) {
containerController.addFinalizedBlock(proto.getContainerID(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot add the finalize block in startTransaction, otherwise there is chance that valid write chunk or put block in applyTransaction will be failed because of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are rejecting request only in startTransaction and not in applyTransaction. So once the request has passed in startTransaction I think it should succeed in applyTransaction. In which case this can happen?

Copy link
Contributor

@ChenSammi ChenSammi Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Currently finalized block is not checked in applyTransaction. Then it's fine.

proto.getFinalizeBlock().getBlockID().getLocalID());
}
return TransactionContext.newBuilder()
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
.setStateMachineContext(startTime)
.setLogData(proto.toByteString())
.build();
}

@Nullable
private TransactionContext rejectRequest(RaftClientRequest request,
long containerId, long localId) {
if (containerController.isFinalizedBlockExist(containerId, localId)) {
TransactionContext ctxt = TransactionContext.newBuilder()
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
.setStateMachineContext(startTime)
.setLogData(proto.toByteString())
.build();
ctxt.setException(new IOException("Block already finalized"));
return ctxt;
}

return null;
}

private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_COMPACT_DB;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_DB_SYNC;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;

Expand Down Expand Up @@ -433,6 +434,12 @@ public void quasiClose() throws StorageContainerException {

@Override
public void close() throws StorageContainerException {
try (DBHandle db = BlockUtils.getDB(containerData, config)) {
containerData.clearFinalizedBlock(db);
} catch (IOException ex) {
LOG.error("Error in deleting entry from Finalize Block table", ex);
throw new StorageContainerException(ex, IO_EXCEPTION);
}
closeAndFlushIfNeeded(containerData::closeContainer);
LOG.info("Container {} is closed with bcsId {}.",
containerData.getContainerID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Math.max;
Expand Down Expand Up @@ -92,6 +94,8 @@ public class KeyValueContainerData extends ContainerData {

private long blockCommitSequenceId;

private final Set<Long> finalizedBlockSet;

static {
// Initialize YAML fields
KV_YAML_FIELDS = Lists.newArrayList();
Expand All @@ -114,6 +118,7 @@ public KeyValueContainerData(long id, ContainerLayoutVersion layoutVersion,
size, originPipelineId, originNodeId);
this.numPendingDeletionBlocks = new AtomicLong(0);
this.deleteTransactionId = 0;
finalizedBlockSet = ConcurrentHashMap.newKeySet();
}

public KeyValueContainerData(KeyValueContainerData source) {
Expand All @@ -123,6 +128,7 @@ public KeyValueContainerData(KeyValueContainerData source) {
this.numPendingDeletionBlocks = new AtomicLong(0);
this.deleteTransactionId = 0;
this.schemaVersion = source.getSchemaVersion();
finalizedBlockSet = ConcurrentHashMap.newKeySet();
}

/**
Expand Down Expand Up @@ -275,6 +281,34 @@ public long getDeleteTransactionId() {
return deleteTransactionId;
}

/**
* Add the given localID of a block to the finalizedBlockSet.
*/
public void addToFinalizedBlockSet(long localID) {
finalizedBlockSet.add(localID);
}

public Set<Long> getFinalizedBlockSet() {
return finalizedBlockSet;
}

public boolean isFinalizedBlockExist(long localID) {
return finalizedBlockSet.contains(localID);
}

public void clearFinalizedBlock(DBHandle db) throws IOException {
if (!finalizedBlockSet.isEmpty()) {
// delete from db and clear memory
// Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here");
try (BatchOperation batch = db.getStore().getBatchHandler().initBatchOperation()) {
db.getStore().getFinalizeBlocksTable().deleteBatchWithPrefix(batch, containerPrefix());
db.getStore().getBatchHandler().commitBatchOperation(batch);
}
finalizedBlockSet.clear();
}
}

/**
* Returns a ProtoBuf Message from ContainerData.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess;
Expand Down Expand Up @@ -274,6 +275,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
return handler.handleGetSmallFile(request, kvContainer);
case GetCommittedBlockLength:
return handler.handleGetCommittedBlockLength(request, kvContainer);
case FinalizeBlock:
return handler.handleFinalizeBlock(request, kvContainer);
default:
return null;
}
Expand Down Expand Up @@ -562,6 +565,46 @@ ContainerCommandResponseProto handlePutBlock(
return putBlockResponseSuccess(request, blockDataProto);
}

ContainerCommandResponseProto handleFinalizeBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

if (!request.hasFinalizeBlock()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed Finalize block request. trace ID: {}",
request.getTraceID());
}
return malformedRequest(request);
}
ContainerProtos.BlockData responseData;

try {
checkContainerOpen(kvContainer);
BlockID blockID = BlockID.getFromProtobuf(
request.getFinalizeBlock().getBlockID());
Preconditions.checkNotNull(blockID);

LOG.info("Finalized Block request received {} ", blockID);

responseData = blockManager.getBlock(kvContainer, blockID)
.getProtoBufMessage();

chunkManager.finalizeWriteChunk(kvContainer, blockID);
blockManager.finalizeBlock(kvContainer, blockID);
kvContainer.getContainerData()
.addToFinalizedBlockSet(blockID.getLocalID());

LOG.info("Block has been finalized {} ", blockID);

} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ex) {
return ContainerUtils.logAndReturnError(LOG,
new StorageContainerException(
"Finalize Block failed", ex, IO_EXCEPTION), request);
}
return getFinalizeBlockResponse(request, responseData);
}

/**
* Handle Get Block operation. Calls BlockManager to process the request.
*/
Expand Down Expand Up @@ -1233,6 +1276,16 @@ public void deleteUnreferenced(Container container, long localID)
}
}

public void addFinalizedBlock(Container container, long localID) {
KeyValueContainer keyValueContainer = (KeyValueContainer)container;
keyValueContainer.getContainerData().addToFinalizedBlockSet(localID);
}

public boolean isFinalizedBlockExist(Container container, long localID) {
KeyValueContainer keyValueContainer = (KeyValueContainer)container;
return keyValueContainer.getContainerData().isFinalizedBlockExist(localID);
}

private String[] getFilesWithPrefix(String prefix, File chunkDir) {
FilenameFilter filter = (dir, name) -> name.startsWith(prefix);
return chunkDir.list(filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,29 @@ private static void populateContainerMetadata(
// startup. If this method is called but not as a part of startup,
// The inspectors will be unloaded and this will be a no-op.
ContainerInspectorUtil.process(kvContainerData, store);

// Load finalizeBlockLocalIds for container in memory.
populateContainerFinalizeBlock(kvContainerData, store);
}

/**
* Loads finalizeBlockLocalIds for container in memory.
* @param kvContainerData - KeyValueContainerData
* @param store - DatanodeStore
* @throws IOException
*/
private static void populateContainerFinalizeBlock(
KeyValueContainerData kvContainerData, DatanodeStore store)
throws IOException {
if (store.getFinalizeBlocksTable() != null) {
try (BlockIterator<Long> iter =
store.getFinalizeBlockIterator(kvContainerData.getContainerID(),
kvContainerData.getUnprefixedKeyFilter())) {
while (iter.hasNext()) {
kvContainerData.addToFinalizedBlockSet(iter.nextBlock());
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -214,6 +215,34 @@ public static long persistPutBlock(KeyValueContainer container,
}
}

@Override
public void finalizeBlock(Container container, BlockID blockId)
throws IOException {
Preconditions.checkNotNull(blockId, "blockId cannot " +
"be null for finalizeBlock operation.");
Preconditions.checkState(blockId.getContainerID() >= 0,
"Container Id cannot be negative");

KeyValueContainer kvContainer = (KeyValueContainer)container;
long localID = blockId.getLocalID();

kvContainer.removeFromPendingPutBlockCache(localID);

try (DBHandle db = BlockUtils.getDB(kvContainer.getContainerData(),
config)) {
// Should never fail.
Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);

// persist finalizeBlock
try (BatchOperation batch = db.getStore().getBatchHandler()
.initBatchOperation()) {
db.getStore().getFinalizeBlocksTable().putWithBatch(batch,
kvContainer.getContainerData().getBlockKey(localID), localID);
db.getStore().getBatchHandler().commitBatchOperation(batch);
}
}
}

@Override
public BlockData getBlock(Container container, BlockID blockID)
throws IOException {
Expand Down
Loading