diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index 985b1b80ee93..857cbfb6eef9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -448,6 +448,7 @@ public static boolean isReadOnly( case PutSmallFile: case StreamInit: case StreamWrite: + case FinalizeBlock: default: return false; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java index 7ef2196c99d3..e0458f034728 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java @@ -318,6 +318,18 @@ public static ContainerCommandResponseProto getReadChunkResponse( .build(); } + public static ContainerCommandResponseProto getFinalizeBlockResponse( + ContainerCommandRequestProto msg, BlockData data) { + + ContainerProtos.FinalizeBlockResponseProto.Builder blockData = + ContainerProtos.FinalizeBlockResponseProto.newBuilder() + .setBlockData(data); + + return getSuccessResponseBuilder(msg) + .setFinalizeBlock(blockData) + .build(); + } + private ContainerCommandResponseBuilders() { throw new UnsupportedOperationException("no instances"); } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index d992fe9f9d08..ee742d5f9a0c 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -624,6 +624,12 @@ public static ContainerCommandRequestProto getDummyCommandRequestProto( .build()) .build()); break; + case FinalizeBlock: + builder + .setFinalizeBlock(ContainerProtos + .FinalizeBlockRequestProto.newBuilder() + .setBlockID(fakeBlockId).build()); + break; default: Assert.fail("Unhandled request type " + cmdType + " in unit test"); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index 73aff9ac830c..d271e7d5d48f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -39,7 +39,8 @@ public enum DNAction implements AuditAction { GET_SMALL_FILE, CLOSE_CONTAINER, GET_COMMITTED_BLOCK_LENGTH, - STREAM_INIT; + STREAM_INIT, + FINALIZE_BLOCK; @Override public String getAction() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 1b0ef29c770c..4f4cf0a41d33 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -771,6 +771,7 @@ private static DNAction getAuditAction(Type cmdType) { case CloseContainer : return DNAction.CLOSE_CONTAINER; case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH; case StreamInit : return DNAction.STREAM_INIT; + case FinalizeBlock : return DNAction.FINALIZE_BLOCK; default : LOG.debug("Invalid command type - {}", cmdType); return null; @@ -897,6 +898,12 @@ private static Map getAuditParams( .toString()); return auditParams; + case FinalizeBlock: + auditParams.put("blockData", + BlockID.getFromProtobuf(msg.getFinalizeBlock().getBlockID()) + .toString()); + return auditParams; + default : LOG.debug("Invalid command type - {}", cmdType); return null; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 2ffb9d30d1f4..bfdff69be46f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -213,6 +213,10 @@ public abstract void deleteBlock(Container container, BlockData blockData) public abstract void deleteUnreferenced(Container container, long localID) throws IOException; + public abstract void addFinalizedBlock(Container container, long localID); + + public abstract boolean isFinalizedBlockExist(Container container, long localID); + public void setClusterID(String clusterID) { this.clusterId = clusterID; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 626b548a5a2a..348ded49f2f1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -94,6 +94,7 @@ import org.apache.ratis.util.TaskQueue; import org.apache.ratis.util.function.CheckedSupplier; import org.apache.ratis.util.JavaUtils; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -376,8 +377,20 @@ public TransactionContext startTransaction(RaftClientRequest request) ctxt.setException(ioe); return ctxt; } - if (proto.getCmdType() == Type.WriteChunk) { + if (proto.getCmdType() == Type.PutBlock) { + TransactionContext ctxt = rejectRequest(request, + proto.getContainerID(), proto.getPutBlock().getBlockData() + .getBlockID().getLocalID()); + if (ctxt != null) { + return ctxt; + } + } else if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); + TransactionContext ctxt = rejectRequest(request, + proto.getContainerID(), write.getBlockID().getLocalID()); + if (ctxt != null) { + return ctxt; + } // create the log entry proto final WriteChunkRequestProto commitWriteChunkProto = WriteChunkRequestProto.newBuilder() @@ -403,16 +416,32 @@ public TransactionContext startTransaction(RaftClientRequest request) .setStateMachineData(write.getData()) .setLogData(commitContainerCommandProto.toByteString()) .build(); - } else { - return TransactionContext.newBuilder() + } else if (proto.getCmdType() == Type.FinalizeBlock) { + containerController.addFinalizedBlock(proto.getContainerID(), + proto.getFinalizeBlock().getBlockID().getLocalID()); + } + return TransactionContext.newBuilder() + .setClientRequest(request) + .setStateMachine(this) + .setServerRole(RaftPeerRole.LEADER) + .setStateMachineContext(startTime) + .setLogData(proto.toByteString()) + .build(); + } + + @Nullable + private TransactionContext rejectRequest(RaftClientRequest request, + long containerId, long localId) { + if (containerController.isFinalizedBlockExist(containerId, localId)) { + TransactionContext ctxt = TransactionContext.newBuilder() .setClientRequest(request) .setStateMachine(this) .setServerRole(RaftPeerRole.LEADER) - .setStateMachineContext(startTime) - .setLogData(proto.toByteString()) .build(); + ctxt.setException(new IOException("Block already finalized")); + return ctxt; } - + return null; } private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 838818266757..98d81c15d0ad 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -76,6 +76,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_COMPACT_DB; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_DB_SYNC; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST; import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure; @@ -433,6 +434,12 @@ public void quasiClose() throws StorageContainerException { @Override public void close() throws StorageContainerException { + try (DBHandle db = BlockUtils.getDB(containerData, config)) { + containerData.clearFinalizedBlock(db); + } catch (IOException ex) { + LOG.error("Error in deleting entry from Finalize Block table", ex); + throw new StorageContainerException(ex, IO_EXCEPTION); + } closeAndFlushIfNeeded(containerData::closeContainer); LOG.info("Container {} is closed with bcsId {}.", containerData.getContainerID(), diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index 7fce70f8e18e..47d4f3f9e70a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -41,6 +41,8 @@ import java.io.File; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import static java.lang.Math.max; @@ -92,6 +94,8 @@ public class KeyValueContainerData extends ContainerData { private long blockCommitSequenceId; + private final Set finalizedBlockSet; + static { // Initialize YAML fields KV_YAML_FIELDS = Lists.newArrayList(); @@ -114,6 +118,7 @@ public KeyValueContainerData(long id, ContainerLayoutVersion layoutVersion, size, originPipelineId, originNodeId); this.numPendingDeletionBlocks = new AtomicLong(0); this.deleteTransactionId = 0; + finalizedBlockSet = ConcurrentHashMap.newKeySet(); } public KeyValueContainerData(KeyValueContainerData source) { @@ -123,6 +128,7 @@ public KeyValueContainerData(KeyValueContainerData source) { this.numPendingDeletionBlocks = new AtomicLong(0); this.deleteTransactionId = 0; this.schemaVersion = source.getSchemaVersion(); + finalizedBlockSet = ConcurrentHashMap.newKeySet(); } /** @@ -275,6 +281,34 @@ public long getDeleteTransactionId() { return deleteTransactionId; } + /** + * Add the given localID of a block to the finalizedBlockSet. + */ + public void addToFinalizedBlockSet(long localID) { + finalizedBlockSet.add(localID); + } + + public Set getFinalizedBlockSet() { + return finalizedBlockSet; + } + + public boolean isFinalizedBlockExist(long localID) { + return finalizedBlockSet.contains(localID); + } + + public void clearFinalizedBlock(DBHandle db) throws IOException { + if (!finalizedBlockSet.isEmpty()) { + // delete from db and clear memory + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + try (BatchOperation batch = db.getStore().getBatchHandler().initBatchOperation()) { + db.getStore().getFinalizeBlocksTable().deleteBatchWithPrefix(batch, containerPrefix()); + db.getStore().getBatchHandler().commitBatchOperation(batch); + } + finalizedBlockSet.clear(); + } + } + /** * Returns a ProtoBuf Message from ContainerData. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 362c08c6a94b..8b6ecb43f4b9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -102,6 +102,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess; @@ -274,6 +275,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, return handler.handleGetSmallFile(request, kvContainer); case GetCommittedBlockLength: return handler.handleGetCommittedBlockLength(request, kvContainer); + case FinalizeBlock: + return handler.handleFinalizeBlock(request, kvContainer); default: return null; } @@ -562,6 +565,46 @@ ContainerCommandResponseProto handlePutBlock( return putBlockResponseSuccess(request, blockDataProto); } + ContainerCommandResponseProto handleFinalizeBlock( + ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + + if (!request.hasFinalizeBlock()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Malformed Finalize block request. trace ID: {}", + request.getTraceID()); + } + return malformedRequest(request); + } + ContainerProtos.BlockData responseData; + + try { + checkContainerOpen(kvContainer); + BlockID blockID = BlockID.getFromProtobuf( + request.getFinalizeBlock().getBlockID()); + Preconditions.checkNotNull(blockID); + + LOG.info("Finalized Block request received {} ", blockID); + + responseData = blockManager.getBlock(kvContainer, blockID) + .getProtoBufMessage(); + + chunkManager.finalizeWriteChunk(kvContainer, blockID); + blockManager.finalizeBlock(kvContainer, blockID); + kvContainer.getContainerData() + .addToFinalizedBlockSet(blockID.getLocalID()); + + LOG.info("Block has been finalized {} ", blockID); + + } catch (StorageContainerException ex) { + return ContainerUtils.logAndReturnError(LOG, ex, request); + } catch (IOException ex) { + return ContainerUtils.logAndReturnError(LOG, + new StorageContainerException( + "Finalize Block failed", ex, IO_EXCEPTION), request); + } + return getFinalizeBlockResponse(request, responseData); + } + /** * Handle Get Block operation. Calls BlockManager to process the request. */ @@ -1233,6 +1276,16 @@ public void deleteUnreferenced(Container container, long localID) } } + public void addFinalizedBlock(Container container, long localID) { + KeyValueContainer keyValueContainer = (KeyValueContainer)container; + keyValueContainer.getContainerData().addToFinalizedBlockSet(localID); + } + + public boolean isFinalizedBlockExist(Container container, long localID) { + KeyValueContainer keyValueContainer = (KeyValueContainer)container; + return keyValueContainer.getContainerData().isFinalizedBlockExist(localID); + } + private String[] getFilesWithPrefix(String prefix, File chunkDir) { FilenameFilter filter = (dir, name) -> name.startsWith(prefix); return chunkDir.list(filter); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index f47d17d73883..13b380a89f2f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -363,6 +363,29 @@ private static void populateContainerMetadata( // startup. If this method is called but not as a part of startup, // The inspectors will be unloaded and this will be a no-op. ContainerInspectorUtil.process(kvContainerData, store); + + // Load finalizeBlockLocalIds for container in memory. + populateContainerFinalizeBlock(kvContainerData, store); + } + + /** + * Loads finalizeBlockLocalIds for container in memory. + * @param kvContainerData - KeyValueContainerData + * @param store - DatanodeStore + * @throws IOException + */ + private static void populateContainerFinalizeBlock( + KeyValueContainerData kvContainerData, DatanodeStore store) + throws IOException { + if (store.getFinalizeBlocksTable() != null) { + try (BlockIterator iter = + store.getFinalizeBlockIterator(kvContainerData.getContainerID(), + kvContainerData.getUnprefixedKeyFilter())) { + while (iter.hasNext()) { + kvContainerData.addToFinalizedBlockSet(iter.nextBlock()); + } + } + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 449fe46ae07d..38ca691ec121 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -214,6 +215,34 @@ public static long persistPutBlock(KeyValueContainer container, } } + @Override + public void finalizeBlock(Container container, BlockID blockId) + throws IOException { + Preconditions.checkNotNull(blockId, "blockId cannot " + + "be null for finalizeBlock operation."); + Preconditions.checkState(blockId.getContainerID() >= 0, + "Container Id cannot be negative"); + + KeyValueContainer kvContainer = (KeyValueContainer)container; + long localID = blockId.getLocalID(); + + kvContainer.removeFromPendingPutBlockCache(localID); + + try (DBHandle db = BlockUtils.getDB(kvContainer.getContainerData(), + config)) { + // Should never fail. + Preconditions.checkNotNull(db, DB_NULL_ERR_MSG); + + // persist finalizeBlock + try (BatchOperation batch = db.getStore().getBatchHandler() + .initBatchOperation()) { + db.getStore().getFinalizeBlocksTable().putWithBatch(batch, + kvContainer.getContainerData().getBlockKey(localID), localID); + db.getStore().getBatchHandler().commitBatchOperation(batch); + } + } + } + @Override public BlockData getBlock(Container container, BlockID blockID) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java index 92f6327447ab..49d54b78c907 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java @@ -97,6 +97,12 @@ public void finishWriteChunks(KeyValueContainer kvContainer, .finishWriteChunks(kvContainer, blockData); } + @Override + public void finalizeWriteChunk(KeyValueContainer kvContainer, + BlockID blockId) throws IOException { + selectHandler(kvContainer).finalizeWriteChunk(kvContainer, blockId); + } + @Override public ChunkBuffer readChunk(Container container, BlockID blockID, ChunkInfo info, DispatcherContext dispatcherContext) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index ccab7f35e81f..cae16e98708f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -221,6 +221,23 @@ public void finishWriteChunks(KeyValueContainer container, } } + @Override + public void finalizeWriteChunk(KeyValueContainer container, + BlockID blockId) throws IOException { + synchronized (container) { + File chunkFile = getChunkFile(container, blockId, null); + try { + if (files.isOpen(chunkFile)) { + files.close(chunkFile); + } + verifyChunkFileExists(chunkFile); + } catch (IOException e) { + onFailure(container.getContainerData().getVolume()); + throw e; + } + } + } + private void deleteChunk(Container container, BlockID blockID, ChunkInfo info, boolean verifyLength) throws StorageContainerException { @@ -304,6 +321,11 @@ public void close(File file) { } } + public boolean isOpen(File file) { + return file != null && + files.getIfPresent(file.getPath()) != null; + } + private static void close(String filename, OpenFile openFile) { if (openFile != null) { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index aa7285a232cc..a815c668d829 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -91,6 +91,9 @@ List listBlock(Container container, long startLocalID, int count) long getCommittedBlockLength(Container container, BlockID blockID) throws IOException; + void finalizeBlock(Container container, BlockID blockId) + throws IOException; + int getDefaultReadBufferCapacity(); /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index 151c15f35676..ed3142c8570d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -106,6 +106,11 @@ default void finishWriteChunks(KeyValueContainer kvContainer, // no-op } + default void finalizeWriteChunk(KeyValueContainer container, + BlockID blockId) throws IOException { + // no-op + } + default String streamInit(Container container, BlockID blockID) throws StorageContainerException { return null; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index 2c1c3c214d56..e1b10e6df571 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -73,4 +73,8 @@ public ConfigurationSource getConfig() { public abstract DBColumnFamilyDefinition getDeletedBlocksColumnFamily(); + + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { + return null; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java index 50303bd99ba6..b949a191453a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java @@ -67,6 +67,10 @@ public abstract class AbstractDatanodeStore implements DatanodeStore { private Table deletedBlocksTable; + private Table finalizeBlocksTable; + + private Table finalizeBlocksTableWithIterator; + static final Logger LOG = LoggerFactory.getLogger(AbstractDatanodeStore.class); private volatile DBStore store; @@ -173,6 +177,15 @@ public void start(ConfigurationSource config) deletedBlocksTable = new DatanodeTable<>( dbDef.getDeletedBlocksColumnFamily().getTable(this.store)); checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName()); + + if (dbDef.getFinalizeBlocksColumnFamily() != null) { + finalizeBlocksTableWithIterator = + dbDef.getFinalizeBlocksColumnFamily().getTable(this.store); + + finalizeBlocksTable = new DatanodeTable<>( + finalizeBlocksTableWithIterator); + checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName()); + } } } @@ -209,18 +222,30 @@ public Table getDeletedBlocksTable() { return deletedBlocksTable; } + @Override + public Table getFinalizeBlocksTable() { + return finalizeBlocksTable; + } + @Override public BlockIterator getBlockIterator(long containerID) throws IOException { return new KeyValueBlockIterator(containerID, - blockDataTableWithIterator.iterator()); + blockDataTableWithIterator.iterator()); } @Override public BlockIterator getBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException { return new KeyValueBlockIterator(containerID, - blockDataTableWithIterator.iterator(), filter); + blockDataTableWithIterator.iterator(), filter); + } + + @Override + public BlockIterator getFinalizeBlockIterator(long containerID, + KeyPrefixFilter filter) throws IOException { + return new KeyValueBlockLocalIdIterator(containerID, + finalizeBlocksTableWithIterator.iterator(), filter); } @Override @@ -265,6 +290,10 @@ protected Table getBlockDataTableWithIterator() { return this.blockDataTableWithIterator; } + protected Table getFinalizeBlocksTableWithIterator() { + return this.finalizeBlocksTableWithIterator; + } + private static void checkTableStatus(Table table, String name) throws IOException { String logMessage = "Unable to get a reference to %s table. Cannot " + @@ -380,4 +409,94 @@ public void close() throws IOException { blockIterator.close(); } } + + /** + * Block localId Iterator for KeyValue Container. + * This Block localId iterator returns localIds + * which match with the {@link MetadataKeyFilters.KeyPrefixFilter}. If no + * filter is specified, then default filter used is + * {@link MetadataKeyFilters#getUnprefixedKeyFilter()} + */ + @InterfaceAudience.Public + public static class KeyValueBlockLocalIdIterator implements + BlockIterator, Closeable { + + private static final Logger LOG = LoggerFactory.getLogger( + KeyValueBlockLocalIdIterator.class); + + private final TableIterator> blockLocalIdIterator; + private final KeyPrefixFilter localIdFilter; + private Long nextLocalId; + private final long containerID; + + /** + * KeyValueBlockLocalIdIterator to iterate block localIds in a container. + * @param iterator - The iterator to apply the blockLocalId filter to. + * @param filter - BlockLocalId filter to be applied for block localIds. + */ + KeyValueBlockLocalIdIterator(long containerID, + TableIterator> + iterator, KeyPrefixFilter filter) { + this.containerID = containerID; + this.blockLocalIdIterator = iterator; + this.localIdFilter = filter; + } + + /** + * This method returns blocks matching with the filter. + * @return next block local Id or null if no more block localIds + * @throws IOException + */ + @Override + public Long nextBlock() throws IOException, NoSuchElementException { + if (nextLocalId != null) { + Long currentLocalId = nextLocalId; + nextLocalId = null; + return currentLocalId; + } + if (hasNext()) { + return nextBlock(); + } + throw new NoSuchElementException("Block Local ID Iterator " + + "reached end for ContainerID " + containerID); + } + + @Override + public boolean hasNext() throws IOException { + if (nextLocalId != null) { + return true; + } + while (blockLocalIdIterator.hasNext()) { + Table.KeyValue keyValue = blockLocalIdIterator.next(); + byte[] keyBytes = StringUtils.string2Bytes(keyValue.getKey()); + if (localIdFilter.filterKey(null, keyBytes, null)) { + nextLocalId = keyValue.getValue(); + if (LOG.isTraceEnabled()) { + LOG.trace("Block matching with filter found: LocalID is : " + + "{} for containerID {}", nextLocalId, containerID); + } + return true; + } + } + return false; + } + + @Override + public void seekToFirst() { + nextLocalId = null; + blockLocalIdIterator.seekToFirst(); + } + + @Override + public void seekToLast() { + nextLocalId = null; + blockLocalIdIterator.seekToLast(); + } + + @Override + public void close() throws IOException { + blockLocalIdIterator.close(); + } + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java index 745e1153daca..87a283e45836 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java @@ -92,6 +92,15 @@ public class DatanodeSchemaThreeDBDefinition DeletedBlocksTransaction.class, Proto2Codec.get(DeletedBlocksTransaction.class)); + public static final DBColumnFamilyDefinition + FINALIZE_BLOCKS = + new DBColumnFamilyDefinition<>( + "finalize_blocks", + String.class, + FixedLengthStringCodec.get(), + Long.class, + LongCodec.get()); + private static String separator = ""; private static final Map> @@ -99,7 +108,9 @@ public class DatanodeSchemaThreeDBDefinition BLOCK_DATA, METADATA, DELETED_BLOCKS, - DELETE_TRANSACTION); + DELETE_TRANSACTION, + FINALIZE_BLOCKS); + public DatanodeSchemaThreeDBDefinition(String dbPath, ConfigurationSource config) { @@ -122,6 +133,7 @@ public DatanodeSchemaThreeDBDefinition(String dbPath, METADATA.setCfOptions(cfOptions); DELETED_BLOCKS.setCfOptions(cfOptions); DELETE_TRANSACTION.setCfOptions(cfOptions); + FINALIZE_BLOCKS.setCfOptions(cfOptions); } @Override @@ -151,6 +163,12 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { return DELETE_TRANSACTION; } + @Override + public DBColumnFamilyDefinition + getFinalizeBlocksColumnFamily() { + return FINALIZE_BLOCKS; + } + public static int getContainerKeyPrefixLength() { return FixedLengthStringCodec.string2Bytes( getContainerKeyPrefix(0L)).length; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java index 50e181147e55..e0e491a9ea65 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; import org.apache.hadoop.hdds.utils.db.DBDefinition; +import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec; import org.apache.hadoop.hdds.utils.db.LongCodec; import org.apache.hadoop.hdds.utils.db.Proto2Codec; import org.apache.hadoop.hdds.utils.db.StringCodec; @@ -76,6 +77,15 @@ public class DatanodeSchemaTwoDBDefinition StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class, Proto2Codec.get(DeletedBlocksTransaction.class)); + public static final DBColumnFamilyDefinition + FINALIZE_BLOCKS = + new DBColumnFamilyDefinition<>( + "finalize_blocks", + String.class, + FixedLengthStringCodec.get(), + Long.class, + LongCodec.get()); + public DatanodeSchemaTwoDBDefinition(String dbPath, ConfigurationSource config) { super(dbPath, config); @@ -86,7 +96,8 @@ public DatanodeSchemaTwoDBDefinition(String dbPath, BLOCK_DATA, METADATA, DELETED_BLOCKS, - DELETE_TRANSACTION); + DELETE_TRANSACTION, + FINALIZE_BLOCKS); @Override public Map> getMap() { @@ -114,4 +125,8 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { getDeleteTransactionsColumnFamily() { return DELETE_TRANSACTION; } + + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { + return FINALIZE_BLOCKS; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java index dd2aa5234b92..4ca81a03722e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java @@ -77,6 +77,13 @@ public interface DatanodeStore extends Closeable { */ Table getDeletedBlocksTable(); + /** + * A Table that keeps finalize blocks requested from client. + * + * @return Table + */ + Table getFinalizeBlocksTable(); + /** * Helper to create and write batch transactions. */ @@ -94,6 +101,9 @@ BlockIterator getBlockIterator(long containerID) BlockIterator getBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException; + BlockIterator getFinalizeBlockIterator(long containerID, + KeyPrefixFilter filter) throws IOException; + /** * Returns if the underlying DB is closed. This call is thread safe. * @return true if the DB is closed. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java index ee8580defa0c..7f37c9ae51ca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java @@ -94,6 +94,13 @@ public BlockIterator getBlockIterator(long containerID, .iterator(getContainerKeyPrefix(containerID)), filter); } + @Override + public BlockIterator getFinalizeBlockIterator(long containerID, + MetadataKeyFilters.KeyPrefixFilter filter) throws IOException { + return new KeyValueBlockLocalIdIterator(containerID, + getFinalizeBlocksTableWithIterator().iterator(getContainerKeyPrefix(containerID)), filter); + } + public void removeKVContainerData(long containerID) throws IOException { String prefix = getContainerKeyPrefix(containerID); try (BatchOperation batch = getBatchHandler().initBatchOperation()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index feb580538747..ecc4e80b4bff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -159,6 +159,29 @@ public void closeContainer(final long containerId) throws IOException { getHandler(container).closeContainer(container); } + /** + * Returns the Container given a container id. + * + * @param containerId ID of the container + * @return Container + */ + public void addFinalizedBlock(final long containerId, + final long localId) { + Container container = containerSet.getContainer(containerId); + if (container != null) { + getHandler(container).addFinalizedBlock(container, localId); + } + } + + public boolean isFinalizedBlockExist(final long containerId, + final long localId) { + Container container = containerSet.getContainer(containerId); + if (container != null) { + return getHandler(container).isFinalizedBlockExist(container, localId); + } + return false; + } + public Container importContainer( final ContainerData containerData, final InputStream rawContainerStream, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index bfaf7b1fca87..9e83ed6aa643 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -258,6 +258,14 @@ public void testHandlerCommandHandling() throws Exception { .dispatchRequest(handler, getSmallFileRequest, container, null); Mockito.verify(handler, times(1)).handleGetSmallFile( any(ContainerCommandRequestProto.class), any()); + + // Test Finalize Block Request handling + ContainerCommandRequestProto finalizeBlock = + getDummyCommandRequestProto(ContainerProtos.Type.FinalizeBlock); + KeyValueHandler + .dispatchRequest(handler, finalizeBlock, container, null); + Mockito.verify(handler, times(1)).handleFinalizeBlock( + any(ContainerCommandRequestProto.class), any()); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java index 8fd8d08f24fd..d3e337055a9d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java @@ -42,6 +42,7 @@ import java.util.UUID; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; import static org.apache.hadoop.ozone.container.ContainerTestHelper.DATANODE_UUID; @@ -118,6 +119,19 @@ public void testReadChunk() { assertEquals(UNKNOWN_BCSID, response.getResult()); } + @Test + public void testFinalizeBlock() { + KeyValueContainer container = getMockUnhealthyContainer(); + KeyValueHandler handler = getDummyHandler(); + + ContainerProtos.ContainerCommandResponseProto response = + handler.handleFinalizeBlock( + getDummyCommandRequestProto( + ContainerProtos.Type.FinalizeBlock), + container); + assertEquals(CONTAINER_UNHEALTHY, response.getResult()); + } + @Test public void testGetSmallFile() { KeyValueContainer container = getMockUnhealthyContainer(); diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 718e2a108c77..367238a28552 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -75,6 +75,8 @@ package hadoop.hdds.datanode; * 17. CloseContainer - Closes an open container and makes it immutable. * * 18. CopyContainer - Copies a container from a remote machine. + * + * 19. FinalizeBlock - Finalize block request from client. */ enum Type { @@ -103,6 +105,8 @@ enum Type { StreamInit = 19; StreamWrite = 20; + + FinalizeBlock = 21; } @@ -208,6 +212,8 @@ message ContainerCommandRequestProto { optional string encodedToken = 23; optional uint32 version = 24; + + optional FinalizeBlockRequestProto finalizeBlock = 25; } message ContainerCommandResponseProto { @@ -237,7 +243,9 @@ message ContainerCommandResponseProto { optional PutSmallFileResponseProto putSmallFile = 19; optional GetSmallFileResponseProto getSmallFile = 20; - optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21; + optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21; + + optional FinalizeBlockResponseProto finalizeBlock = 22; } message ContainerDataProto { @@ -338,6 +346,14 @@ message PutBlockResponseProto { required GetCommittedBlockLengthResponseProto committedBlockLength = 1; } +message FinalizeBlockRequestProto { + required DatanodeBlockID blockID = 1; +} + +message FinalizeBlockResponseProto { + required BlockData blockData = 1; +} + message GetBlockRequestProto { required DatanodeBlockID blockID = 1; } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java new file mode 100644 index 000000000000..2a0376ba3d14 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneTestUtils; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.apache.ozone.test.GenericTestUtils; +import org.apache.ozone.test.JUnit5AwareTimeout; +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Tests FinalizeBlock. + */ +@RunWith(Parameterized.class) +public class TestFinalizeBlock { + + private OzoneClient client; + /** + * Set a timeout for each test. + */ + @Rule + public TestRule timeout = new JUnit5AwareTimeout(Timeout.seconds(300)); + private MiniOzoneCluster cluster; + private OzoneConfiguration conf; + private ObjectStore objectStore; + private static String volumeName = UUID.randomUUID().toString(); + private static String bucketName = UUID.randomUUID().toString(); + private boolean schemaV3; + private ContainerLayoutVersion layoutVersion; + + public TestFinalizeBlock(boolean enableSchemaV3, ContainerLayoutVersion version) { + this.schemaV3 = enableSchemaV3; + this.layoutVersion = version; + } + + @Parameterized.Parameters + public static Iterable parameters() { + return Arrays.asList(new Object[][]{ + {false, FILE_PER_CHUNK}, + {true, FILE_PER_CHUNK}, + {false, FILE_PER_BLOCK}, + {true, FILE_PER_BLOCK}, + }); + } + + @Before + public void setup() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB"); + conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, + 0, StorageUnit.MB); + conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3); + conf.setEnum(ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY, layoutVersion); + + DatanodeConfiguration datanodeConfiguration = conf.getObject( + DatanodeConfiguration.class); + datanodeConfiguration.setBlockDeletionInterval(Duration.ofMillis(100)); + conf.setFromObject(datanodeConfiguration); + ScmConfig scmConfig = conf.getObject(ScmConfig.class); + scmConfig.setBlockDeletionInterval(Duration.ofMillis(100)); + conf.setFromObject(scmConfig); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(1).build(); + cluster.waitForClusterToBeReady(); + cluster.waitForPipelineTobeReady(ONE, 30000); + + client = OzoneClientFactory.getRpcClient(conf); + objectStore = client.getObjectStore(); + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + @After + public void shutdown() { + IOUtils.closeQuietly(client); + if (cluster != null) { + try { + cluster.shutdown(); + } catch (Exception e) { + // do nothing. + } + } + } + + @Test + public void testFinalizeBlock() throws IOException, InterruptedException, TimeoutException { + String keyName = UUID.randomUUID().toString(); + // create key + createKey(keyName); + + ContainerID containerId = cluster.getStorageContainerManager() + .getContainerManager().getContainers().get(0).containerID(); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setKeyName(keyName).setDataSize(0) + .build(); + List omKeyLocationInfoGroupList = + cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions(); + + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager().getContainer(containerId); + Pipeline pipeline = cluster.getStorageContainerManager() + .getPipelineManager().getPipeline(container.getPipelineID()); + + XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + + // Before finalize block WRITE chunk on the same block should pass through + ContainerProtos.ContainerCommandRequestProto request = + ContainerTestHelper.getWriteChunkRequest(pipeline, ( + new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID())), 100); + xceiverClient.sendCommand(request); + + // Before finalize block PUT block on the same block should pass through + request = ContainerTestHelper.getPutBlockRequest(request); + xceiverClient.sendCommand(request); + + // Now Finalize Block + request = getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); + ContainerProtos.ContainerCommandResponseProto response = + xceiverClient.sendCommand(request); + + Assert.assertTrue(response.getFinalizeBlock() + .getBlockData().getBlockID().getLocalID() + == omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID()); + + Assert.assertTrue(((KeyValueContainerData)getContainerfromDN( + cluster.getHddsDatanodes().get(0), + containerId.getId()).getContainerData()) + .getFinalizedBlockSet().size() == 1); + + testRejectPutAndWriteChunkAfterFinalizeBlock(containerId, pipeline, xceiverClient, omKeyLocationInfoGroupList); + testFinalizeBlockReloadAfterDNRestart(containerId); + testFinalizeBlockClearAfterCloseContainer(containerId); + } + + private void testFinalizeBlockReloadAfterDNRestart(ContainerID containerId) { + try { + cluster.restartHddsDatanode(0, true); + } catch (Exception e) { + fail("Fail to restart Datanode"); + } + + // After restart DN, finalizeBlock should be loaded into memory + Assert.assertTrue(((KeyValueContainerData) + getContainerfromDN(cluster.getHddsDatanodes().get(0), + containerId.getId()).getContainerData()) + .getFinalizedBlockSet().size() == 1); + } + + private void testFinalizeBlockClearAfterCloseContainer(ContainerID containerId) + throws InterruptedException, TimeoutException { + OzoneTestUtils.closeAllContainers(cluster.getStorageContainerManager().getEventQueue(), + cluster.getStorageContainerManager()); + + // Finalize Block should be cleared from container data. + GenericTestUtils.waitFor(() -> ( + (KeyValueContainerData) getContainerfromDN(cluster.getHddsDatanodes().get(0), + containerId.getId()).getContainerData()).getFinalizedBlockSet().size() == 0, + 100, 10 * 1000); + try { + // Restart DataNode + cluster.restartHddsDatanode(0, true); + } catch (Exception e) { + fail("Fail to restart Datanode"); + } + + // After DN restart also there should not be any finalizeBlock + Assert.assertTrue(((KeyValueContainerData)getContainerfromDN( + cluster.getHddsDatanodes().get(0), + containerId.getId()).getContainerData()) + .getFinalizedBlockSet().size() == 0); + } + + private void testRejectPutAndWriteChunkAfterFinalizeBlock(ContainerID containerId, Pipeline pipeline, + XceiverClientSpi xceiverClient, List omKeyLocationInfoGroupList) + throws IOException { + // Try doing WRITE chunk on the already finalized block + ContainerProtos.ContainerCommandRequestProto request = + ContainerTestHelper.getWriteChunkRequest(pipeline, + (new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID())), 100); + + try { + xceiverClient.sendCommand(request); + fail("Write chunk should fail."); + } catch (IOException e) { + assertTrue(e.getCause().getMessage() + .contains("Block already finalized")); + } + + // Try doing PUT block on the already finalized block + request = ContainerTestHelper.getPutBlockRequest(request); + try { + xceiverClient.sendCommand(request); + fail("Put block should fail."); + } catch (IOException e) { + assertTrue(e.getCause().getMessage() + .contains("Block already finalized")); + } + } + + @NotNull + private ContainerProtos.ContainerCommandRequestProto getFinalizeBlockRequest( + List omKeyLocationInfoGroupList, + ContainerInfo container) { + final ContainerProtos.ContainerCommandRequestProto.Builder builder = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.FinalizeBlock) + .setContainerID(container.getContainerID()) + .setDatanodeUuid(cluster.getHddsDatanodes() + .get(0).getDatanodeDetails().getUuidString()); + + final ContainerProtos.DatanodeBlockID blockId = + ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(container.getContainerID()).setLocalID( + omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID()) + .setBlockCommitSequenceId(0).build(); + + builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto + .newBuilder().setBlockID(blockId).build()); + return builder.build(); + } + + /** + * create a key with specified name. + * @param keyName + * @throws IOException + */ + private void createKey(String keyName) throws IOException { + OzoneOutputStream key = objectStore.getVolume(volumeName) + .getBucket(bucketName) + .createKey(keyName, 1024, ReplicationType.RATIS, + ReplicationFactor.ONE, new HashMap<>()); + key.write("test".getBytes(UTF_8)); + key.close(); + } + + /** + * Return the container for the given containerID from the given DN. + */ + private Container getContainerfromDN(HddsDatanodeService hddsDatanodeService, + long containerID) { + return hddsDatanodeService.getDatanodeStateMachine().getContainer() + .getContainerSet().getContainer(containerID); + } +}