From 7d9a5385960082db6c9db6478562ed331a474040 Mon Sep 17 00:00:00 2001 From: ashishk Date: Wed, 13 Dec 2023 11:24:10 +0530 Subject: [PATCH 01/10] Finalize Block change --- .../org/apache/hadoop/hdds/HddsUtils.java | 1 + .../ContainerCommandResponseBuilders.java | 12 + .../common/helpers/FinalizeBlockList.java | 71 ++++ .../ozone/container/ContainerTestHelper.java | 5 + .../apache/hadoop/ozone/audit/DNAction.java | 3 +- .../container/common/impl/HddsDispatcher.java | 7 + .../container/common/interfaces/Handler.java | 5 + .../server/ratis/ContainerStateMachine.java | 41 ++- .../keyvalue/KeyValueContainerData.java | 33 ++ .../container/keyvalue/KeyValueHandler.java | 64 ++++ .../helpers/KeyValueContainerUtil.java | 16 + .../keyvalue/impl/BlockManagerImpl.java | 55 +++ .../keyvalue/impl/ChunkManagerDispatcher.java | 7 + .../keyvalue/impl/FilePerBlockStrategy.java | 22 ++ .../keyvalue/interfaces/BlockManager.java | 6 + .../keyvalue/interfaces/ChunkManager.java | 5 + .../AbstractDatanodeDBDefinition.java | 4 + .../metadata/AbstractDatanodeStore.java | 14 + .../DatanodeSchemaOneDBDefinition.java | 7 + .../DatanodeSchemaThreeDBDefinition.java | 21 +- .../DatanodeSchemaTwoDBDefinition.java | 7 + .../container/metadata/DatanodeStore.java | 3 + .../ozoneimpl/ContainerController.java | 23 ++ .../keyvalue/TestKeyValueHandler.java | 8 + ...KeyValueHandlerWithUnhealthyContainer.java | 14 + .../dev-support/checkstyle/checkstyle.xml | 2 + .../main/proto/DatanodeClientProtocol.proto | 22 +- .../dev-support/intellij/ozone-style.xml | 2 +- .../commandhandler/TestFinalizeBlock.java | 324 ++++++++++++++++++ 29 files changed, 794 insertions(+), 10 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FinalizeBlockList.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index 985b1b80ee93..857cbfb6eef9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -448,6 +448,7 @@ public static boolean isReadOnly( case PutSmallFile: case StreamInit: case StreamWrite: + case FinalizeBlock: default: return false; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java index 7ef2196c99d3..e0458f034728 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java @@ -318,6 +318,18 @@ public static ContainerCommandResponseProto getReadChunkResponse( .build(); } + public static ContainerCommandResponseProto getFinalizeBlockResponse( + ContainerCommandRequestProto msg, BlockData data) { + + ContainerProtos.FinalizeBlockResponseProto.Builder blockData = + ContainerProtos.FinalizeBlockResponseProto.newBuilder() + .setBlockData(data); + + return getSuccessResponseBuilder(msg) + .setFinalizeBlock(blockData) + .build(); + } + private ContainerCommandResponseBuilders() { throw new UnsupportedOperationException("no instances"); } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FinalizeBlockList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FinalizeBlockList.java new file mode 100644 index 000000000000..dd012f56fd7d --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FinalizeBlockList.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.DelegatedCodec; +import org.apache.hadoop.hdds.utils.db.Proto3Codec; + +import java.util.Collections; +import java.util.List; + +/** + * Helper class to convert between protobuf lists and Java lists of + * {@link ContainerProtos.FinalizeBlockList} objects. + *

+ * This class is immutable. + */ +public class FinalizeBlockList { + private static final Codec CODEC = new DelegatedCodec<>( + Proto3Codec.get(ContainerProtos.FinalizeBlockList.class), + FinalizeBlockList::getFromProtoBuf, + FinalizeBlockList::getProtoBufMessage, + DelegatedCodec.CopyType.SHALLOW); + + public static Codec getCodec() { + return CODEC; + } + + private final List blockList; + + public FinalizeBlockList(List blockList) { + this.blockList = Collections.unmodifiableList(blockList); + } + + public List asList() { + return blockList; + } + + /** + * @return A new {@link FinalizeBlockList} created from protobuf data. + */ + public static FinalizeBlockList getFromProtoBuf( + ContainerProtos.FinalizeBlockList finalizeBlockProto) { + return new FinalizeBlockList(finalizeBlockProto.getLocalIDList()); + } + + /** + * @return A protobuf message of this object. + */ + public ContainerProtos.FinalizeBlockList getProtoBufMessage() { + return ContainerProtos.FinalizeBlockList.newBuilder() + .addAllLocalID(blockList) + .build(); + } +} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index d992fe9f9d08..2e788e9d4cb0 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -624,6 +624,11 @@ public static ContainerCommandRequestProto getDummyCommandRequestProto( .build()) .build()); break; + case FinalizeBlock: + builder + .setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto.newBuilder() + .setBlockID(fakeBlockId).build()); + break; default: Assert.fail("Unhandled request type " + cmdType + " in unit test"); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index 73aff9ac830c..d271e7d5d48f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -39,7 +39,8 @@ public enum DNAction implements AuditAction { GET_SMALL_FILE, CLOSE_CONTAINER, GET_COMMITTED_BLOCK_LENGTH, - STREAM_INIT; + STREAM_INIT, + FINALIZE_BLOCK; @Override public String getAction() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 1b0ef29c770c..4f4cf0a41d33 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -771,6 +771,7 @@ private static DNAction getAuditAction(Type cmdType) { case CloseContainer : return DNAction.CLOSE_CONTAINER; case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH; case StreamInit : return DNAction.STREAM_INIT; + case FinalizeBlock : return DNAction.FINALIZE_BLOCK; default : LOG.debug("Invalid command type - {}", cmdType); return null; @@ -897,6 +898,12 @@ private static Map getAuditParams( .toString()); return auditParams; + case FinalizeBlock: + auditParams.put("blockData", + BlockID.getFromProtobuf(msg.getFinalizeBlock().getBlockID()) + .toString()); + return auditParams; + default : LOG.debug("Invalid command type - {}", cmdType); return null; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 2ffb9d30d1f4..9df2948eaa26 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -213,6 +213,11 @@ public abstract void deleteBlock(Container container, BlockData blockData) public abstract void deleteUnreferenced(Container container, long localID) throws IOException; + public abstract void addFinalizedBlock(Container container, long localID); + + public abstract boolean isFinalizedBlockExist(Container container, + long localID); + public void setClusterID(String clusterID) { this.clusterId = clusterID; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 626b548a5a2a..d14b80bc5677 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -94,6 +94,7 @@ import org.apache.ratis.util.TaskQueue; import org.apache.ratis.util.function.CheckedSupplier; import org.apache.ratis.util.JavaUtils; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -376,8 +377,20 @@ public TransactionContext startTransaction(RaftClientRequest request) ctxt.setException(ioe); return ctxt; } - if (proto.getCmdType() == Type.WriteChunk) { + if (proto.getCmdType() == Type.PutBlock) { + TransactionContext ctxt = discardRequest(request, + proto.getContainerID(), proto.getPutBlock().getBlockData() + .getBlockID().getLocalID()); + if (ctxt != null) { + return ctxt; + } + } else if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); + TransactionContext ctxt = discardRequest(request, + proto.getContainerID(), write.getBlockID().getLocalID()); + if (ctxt != null) { + return ctxt; + } // create the log entry proto final WriteChunkRequestProto commitWriteChunkProto = WriteChunkRequestProto.newBuilder() @@ -403,16 +416,32 @@ public TransactionContext startTransaction(RaftClientRequest request) .setStateMachineData(write.getData()) .setLogData(commitContainerCommandProto.toByteString()) .build(); - } else { - return TransactionContext.newBuilder() + } else if (proto.getCmdType() == Type.FinalizeBlock) { + containerController.addFinalizedBlock(proto.getContainerID(), + proto.getFinalizeBlock().getBlockID().getLocalID()); + } + return TransactionContext.newBuilder() + .setClientRequest(request) + .setStateMachine(this) + .setServerRole(RaftPeerRole.LEADER) + .setStateMachineContext(startTime) + .setLogData(proto.toByteString()) + .build(); + } + + @Nullable + private TransactionContext discardRequest(RaftClientRequest request, + long containerId, long localId) { + if (containerController.isFinalizedBlockExist(containerId, localId)) { + TransactionContext ctxt = TransactionContext.newBuilder() .setClientRequest(request) .setStateMachine(this) .setServerRole(RaftPeerRole.LEADER) - .setStateMachineContext(startTime) - .setLogData(proto.toByteString()) .build(); + ctxt.setException(new IOException("Block already finalized")); + return ctxt; } - + return null; } private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index 7fce70f8e18e..b026235c19d4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -41,6 +41,8 @@ import java.io.File; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import static java.lang.Math.max; @@ -92,6 +94,8 @@ public class KeyValueContainerData extends ContainerData { private long blockCommitSequenceId; + private final Set finalizedBlockSet; + static { // Initialize YAML fields KV_YAML_FIELDS = Lists.newArrayList(); @@ -114,6 +118,7 @@ public KeyValueContainerData(long id, ContainerLayoutVersion layoutVersion, size, originPipelineId, originNodeId); this.numPendingDeletionBlocks = new AtomicLong(0); this.deleteTransactionId = 0; + finalizedBlockSet = ConcurrentHashMap.newKeySet(); } public KeyValueContainerData(KeyValueContainerData source) { @@ -123,6 +128,7 @@ public KeyValueContainerData(KeyValueContainerData source) { this.numPendingDeletionBlocks = new AtomicLong(0); this.deleteTransactionId = 0; this.schemaVersion = source.getSchemaVersion(); + finalizedBlockSet = ConcurrentHashMap.newKeySet(); } /** @@ -275,6 +281,29 @@ public long getDeleteTransactionId() { return deleteTransactionId; } + /** + * Add the given localID of a block to the pendingPutBlockCache. + */ + public void addToFinalizedBlockSet(long localID) { + finalizedBlockSet.add(localID); + } + + public void addAllToFinalizedBlockSet(List lstBlocks) { + finalizedBlockSet.addAll(lstBlocks); + } + + public Set getFinalizedBlockSet() { + return finalizedBlockSet; + } + + public boolean isFinalizedBlockExist(long localID) { + return finalizedBlockSet.contains(localID); + } + + public void clearFinalizedBlock() { + finalizedBlockSet.clear(); + } + /** * Returns a ProtoBuf Message from ContainerData. * @@ -384,6 +413,10 @@ public String getDeletingBlockKeyPrefix() { return formatKey(DELETING_KEY_PREFIX); } + public String getFinalizeBlockKey() { + return formatKey(""); + } + public KeyPrefixFilter getUnprefixedKeyFilter() { String schemaPrefix = containerPrefix(); return new KeyPrefixFilter().addFilter(schemaPrefix + "#", true); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 362c08c6a94b..0e13f8fb9805 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -102,6 +102,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess; @@ -274,6 +275,8 @@ static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler, return handler.handleGetSmallFile(request, kvContainer); case GetCommittedBlockLength: return handler.handleGetCommittedBlockLength(request, kvContainer); + case FinalizeBlock: + return handler.handleFinalizeBlock(request, kvContainer); default: return null; } @@ -562,6 +565,48 @@ ContainerCommandResponseProto handlePutBlock( return putBlockResponseSuccess(request, blockDataProto); } + ContainerCommandResponseProto handleFinalizeBlock( + ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + + if (!request.hasFinalizeBlock()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Malformed Finalize block request. trace ID: {}", + request.getTraceID()); + } + return malformedRequest(request); + } + ContainerProtos.BlockData responseData; + + try { + checkContainerOpen(kvContainer); + BlockID blockID = BlockID.getFromProtobuf( + request.getFinalizeBlock().getBlockID()); + Preconditions.checkNotNull(blockID); + + LOG.info("Finalized Block request received {} ", blockID); + + responseData = blockManager.getBlock(kvContainer, blockID) + .getProtoBufMessage(); + + BlockData blockData = BlockData.getFromProtoBuf(responseData); + + chunkManager.finalizeWriteChunk(kvContainer, blockData); + kvContainer.getContainerData() + .addToFinalizedBlockSet(blockData.getLocalID()); + blockManager.finalizeBlock(kvContainer, blockData); + + LOG.info("Block has been finalized {} ", blockID); + + } catch (StorageContainerException ex) { + return ContainerUtils.logAndReturnError(LOG, ex, request); + } catch (IOException ex) { + return ContainerUtils.logAndReturnError(LOG, + new StorageContainerException( + "Finalize Block failed", ex, IO_EXCEPTION), request); + } + return getFinalizeBlockResponse(request, responseData); + } + /** * Handle Get Block operation. Calls BlockManager to process the request. */ @@ -1160,6 +1205,15 @@ public void closeContainer(Container container) .getContainerID() + " while in " + state + " state.", error); } container.close(); + + KeyValueContainerData containerData = + (KeyValueContainerData)container.getContainerData(); + if (!containerData.getFinalizedBlockSet().isEmpty()) { + // delete from db and clear memory + containerData.clearFinalizedBlock(); + blockManager.clearFinalizeBlock(container); + } + ContainerLogger.logClosed(container.getContainerData()); sendICR(container); } finally { @@ -1233,6 +1287,16 @@ public void deleteUnreferenced(Container container, long localID) } } + public void addFinalizedBlock(Container container, long localID) { + KeyValueContainer keyValueContainer = (KeyValueContainer)container; + keyValueContainer.getContainerData().addToFinalizedBlockSet(localID); + } + + public boolean isFinalizedBlockExist(Container container, long localID) { + KeyValueContainer keyValueContainer = (KeyValueContainer)container; + return keyValueContainer.getContainerData().isFinalizedBlockExist(localID); + } + private String[] getFilesWithPrefix(String prefix, File chunkDir) { FilenameFilter filter = (dir, name) -> name.startsWith(prefix); return chunkDir.list(filter); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index f47d17d73883..0989d3ec75b9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil; @@ -244,6 +245,7 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData, try (DBHandle db = BlockUtils.getDB(kvContainerData, config)) { populateContainerMetadata(kvContainerData, db.getStore(), bCheckChunksFilePath); + populateContainerFinalizeBlock(kvContainerData, db.getStore()); } return; } @@ -365,6 +367,20 @@ private static void populateContainerMetadata( ContainerInspectorUtil.process(kvContainerData, store); } + private static void populateContainerFinalizeBlock( + KeyValueContainerData kvContainerData, DatanodeStore store) + throws IOException { + Table finalizeBlocksTable = + store.getFinalizeBlocksTable(); + + FinalizeBlockList finalizeBlockList = + finalizeBlocksTable.get(kvContainerData.getFinalizeBlockKey()); + + if (finalizeBlockList != null) { + kvContainerData.addAllToFinalizedBlockSet(finalizeBlockList.asList()); + } + } + /** * Initialize bytes used and block count. * @param kvData diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 449fe46ae07d..87ec44373fb5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; @@ -214,6 +215,60 @@ public static long persistPutBlock(KeyValueContainer container, } } + @Override + public void finalizeBlock(Container container, BlockData data) + throws IOException { + Preconditions.checkNotNull(data, "BlockData cannot " + + "be null for finalizeBlock operation."); + Preconditions.checkState(data.getContainerID() >= 0, + "Container Id cannot be negative"); + + KeyValueContainer kvContainer = (KeyValueContainer)container; + long localID = data.getLocalID(); + + kvContainer.removeFromPendingPutBlockCache(localID); + + try (DBHandle db = BlockUtils.getDB(kvContainer.getContainerData(), + config)) { + // Should never fail. + Preconditions.checkNotNull(db, DB_NULL_ERR_MSG); + + // persist finalizeBlock + try (BatchOperation batch = db.getStore().getBatchHandler() + .initBatchOperation()) { + + FinalizeBlockList finalBlockList = new FinalizeBlockList( + new ArrayList<>( + kvContainer.getContainerData().getFinalizedBlockSet())); + db.getStore().getFinalizeBlocksTable().putWithBatch(batch, + kvContainer.getContainerData().getFinalizeBlockKey(), + finalBlockList); + + db.getStore().getBatchHandler().commitBatchOperation(batch); + } + } + } + + @Override + public void clearFinalizeBlock(Container container) + throws IOException { + KeyValueContainer kvContainer = (KeyValueContainer)container; + try (DBHandle db = BlockUtils.getDB( + kvContainer.getContainerData(), config)) { + // Should never fail. + Preconditions.checkNotNull(db, DB_NULL_ERR_MSG); + + // delete finalizeBlock + try (BatchOperation batch = db.getStore().getBatchHandler() + .initBatchOperation()) { + + db.getStore().getFinalizeBlocksTable().deleteWithBatch(batch, + kvContainer.getContainerData().getFinalizeBlockKey()); + db.getStore().getBatchHandler().commitBatchOperation(batch); + } + } + } + @Override public BlockData getBlock(Container container, BlockID blockID) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java index 92f6327447ab..4fdac17793aa 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java @@ -97,6 +97,13 @@ public void finishWriteChunks(KeyValueContainer kvContainer, .finishWriteChunks(kvContainer, blockData); } + @Override + public void finalizeWriteChunk(KeyValueContainer kvContainer, + BlockData blockData) throws IOException { + selectHandler(kvContainer) + .finalizeWriteChunk(kvContainer, blockData); + } + @Override public ChunkBuffer readChunk(Container container, BlockID blockID, ChunkInfo info, DispatcherContext dispatcherContext) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index ccab7f35e81f..fc40e0ad74cf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -221,6 +221,23 @@ public void finishWriteChunks(KeyValueContainer container, } } + @Override + public void finalizeWriteChunk(KeyValueContainer container, + BlockData blockData) throws IOException { + synchronized (container) { + File chunkFile = getChunkFile(container, blockData.getBlockID(), null); + try { + if (files.isOpen(chunkFile)) { + files.close(chunkFile); + } + verifyChunkFileExists(chunkFile); + } catch (IOException e) { + onFailure(container.getContainerData().getVolume()); + throw e; + } + } + } + private void deleteChunk(Container container, BlockID blockID, ChunkInfo info, boolean verifyLength) throws StorageContainerException { @@ -304,6 +321,11 @@ public void close(File file) { } } + public boolean isOpen(File file) { + return file != null && + files.getIfPresent(file.getPath()) != null; + } + private static void close(String filename, OpenFile openFile) { if (openFile != null) { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index aa7285a232cc..62d8dfe27c0a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -91,6 +91,12 @@ List listBlock(Container container, long startLocalID, int count) long getCommittedBlockLength(Container container, BlockID blockID) throws IOException; + void finalizeBlock(Container container, BlockData data) + throws IOException; + + void clearFinalizeBlock(Container container) + throws IOException; + int getDefaultReadBufferCapacity(); /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index 151c15f35676..6f148c823b15 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -106,6 +106,11 @@ default void finishWriteChunks(KeyValueContainer kvContainer, // no-op } + default void finalizeWriteChunk(KeyValueContainer container, + BlockData blockData) throws IOException { + // no-op + } + default String streamInit(Container container, BlockID blockID) throws StorageContainerException { return null; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index 2c1c3c214d56..3c4e800a4a70 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.utils.db.DBDefinition; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import java.io.File; @@ -73,4 +74,7 @@ public ConfigurationSource getConfig() { public abstract DBColumnFamilyDefinition getDeletedBlocksColumnFamily(); + + public abstract DBColumnFamilyDefinition + getFinalizeBlocksColumnFamily(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java index 50303bd99ba6..0964978bc36e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedStatistics; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.db.DatanodeDBProfile; @@ -67,6 +68,8 @@ public abstract class AbstractDatanodeStore implements DatanodeStore { private Table deletedBlocksTable; + private Table finalizeBlocksTable; + static final Logger LOG = LoggerFactory.getLogger(AbstractDatanodeStore.class); private volatile DBStore store; @@ -173,6 +176,12 @@ public void start(ConfigurationSource config) deletedBlocksTable = new DatanodeTable<>( dbDef.getDeletedBlocksColumnFamily().getTable(this.store)); checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName()); + + if (dbDef.getFinalizeBlocksColumnFamily() != null) { + finalizeBlocksTable = new DatanodeTable<>( + dbDef.getFinalizeBlocksColumnFamily().getTable(this.store)); + checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName()); + } } } @@ -209,6 +218,11 @@ public Table getDeletedBlocksTable() { return deletedBlocksTable; } + @Override + public Table getFinalizeBlocksTable() { + return finalizeBlocksTable; + } + @Override public BlockIterator getBlockIterator(long containerID) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java index a002eef3f72a..0354b8c0b43d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.utils.CollectionUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import java.util.List; import java.util.Map; @@ -111,4 +112,10 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { public Iterable> getColumnFamilies() { return () -> CollectionUtils.newIterator(COLUMN_FAMILIES.values()); } + + @Override + public DBColumnFamilyDefinition + getFinalizeBlocksColumnFamily() { + return null; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java index 745e1153daca..08b525c20d56 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.db.DatanodeDBProfile; @@ -92,6 +93,15 @@ public class DatanodeSchemaThreeDBDefinition DeletedBlocksTransaction.class, Proto2Codec.get(DeletedBlocksTransaction.class)); + public static final DBColumnFamilyDefinition + FINALIZE_BLOCKS = + new DBColumnFamilyDefinition<>( + "finalize_blocks", + String.class, + FixedLengthStringCodec.get(), + FinalizeBlockList.class, + FinalizeBlockList.getCodec()); + private static String separator = ""; private static final Map> @@ -99,7 +109,9 @@ public class DatanodeSchemaThreeDBDefinition BLOCK_DATA, METADATA, DELETED_BLOCKS, - DELETE_TRANSACTION); + DELETE_TRANSACTION, + FINALIZE_BLOCKS); + public DatanodeSchemaThreeDBDefinition(String dbPath, ConfigurationSource config) { @@ -122,6 +134,7 @@ public DatanodeSchemaThreeDBDefinition(String dbPath, METADATA.setCfOptions(cfOptions); DELETED_BLOCKS.setCfOptions(cfOptions); DELETE_TRANSACTION.setCfOptions(cfOptions); + FINALIZE_BLOCKS.setCfOptions(cfOptions); } @Override @@ -151,6 +164,12 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { return DELETE_TRANSACTION; } + @Override + public DBColumnFamilyDefinition + getFinalizeBlocksColumnFamily() { + return FINALIZE_BLOCKS; + } + public static int getContainerKeyPrefixLength() { return FixedLengthStringCodec.string2Bytes( getContainerKeyPrefix(0L)).length; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java index 50e181147e55..9836c133d9c4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import java.util.Map; @@ -114,4 +115,10 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { getDeleteTransactionsColumnFamily() { return DELETE_TRANSACTION; } + + @Override + public DBColumnFamilyDefinition + getFinalizeBlocksColumnFamily() { + return null; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java index dd2aa5234b92..c3b5d1580202 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; +import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; import java.io.Closeable; @@ -77,6 +78,8 @@ public interface DatanodeStore extends Closeable { */ Table getDeletedBlocksTable(); + Table getFinalizeBlocksTable(); + /** * Helper to create and write batch transactions. */ diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index feb580538747..198371e24c3e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -159,6 +159,29 @@ public void closeContainer(final long containerId) throws IOException { getHandler(container).closeContainer(container); } + /** + * Returns the Container given a container id. + * + * @param containerId ID of the container + * @return Container + */ + public void addFinalizedBlock(final long containerId, + final long localId) { + Container container = containerSet.getContainer(containerId); + if (container != null) { + getHandler(container).addFinalizedBlock(container, localId); + } + } + + public boolean isFinalizedBlockExist(final long containerId, + final long localId) { + Container container = containerSet.getContainer(containerId); + if (container != null) { + return getHandler(container).isFinalizedBlockExist(container, localId); + } + return false; + } + public Container importContainer( final ContainerData containerData, final InputStream rawContainerStream, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index bfaf7b1fca87..9e83ed6aa643 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -258,6 +258,14 @@ public void testHandlerCommandHandling() throws Exception { .dispatchRequest(handler, getSmallFileRequest, container, null); Mockito.verify(handler, times(1)).handleGetSmallFile( any(ContainerCommandRequestProto.class), any()); + + // Test Finalize Block Request handling + ContainerCommandRequestProto finalizeBlock = + getDummyCommandRequestProto(ContainerProtos.Type.FinalizeBlock); + KeyValueHandler + .dispatchRequest(handler, finalizeBlock, container, null); + Mockito.verify(handler, times(1)).handleFinalizeBlock( + any(ContainerCommandRequestProto.class), any()); } @Test diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java index 8fd8d08f24fd..d3e337055a9d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java @@ -42,6 +42,7 @@ import java.util.UUID; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; import static org.apache.hadoop.ozone.container.ContainerTestHelper.DATANODE_UUID; @@ -118,6 +119,19 @@ public void testReadChunk() { assertEquals(UNKNOWN_BCSID, response.getResult()); } + @Test + public void testFinalizeBlock() { + KeyValueContainer container = getMockUnhealthyContainer(); + KeyValueHandler handler = getDummyHandler(); + + ContainerProtos.ContainerCommandResponseProto response = + handler.handleFinalizeBlock( + getDummyCommandRequestProto( + ContainerProtos.Type.FinalizeBlock), + container); + assertEquals(CONTAINER_UNHEALTHY, response.getResult()); + } + @Test public void testGetSmallFile() { KeyValueContainer container = getMockUnhealthyContainer(); diff --git a/hadoop-hdds/dev-support/checkstyle/checkstyle.xml b/hadoop-hdds/dev-support/checkstyle/checkstyle.xml index 41e69632bf14..baa9fa4e67d2 100644 --- a/hadoop-hdds/dev-support/checkstyle/checkstyle.xml +++ b/hadoop-hdds/dev-support/checkstyle/checkstyle.xml @@ -73,6 +73,8 @@ + + diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 718e2a108c77..c7cafa2a46ab 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -75,6 +75,8 @@ package hadoop.hdds.datanode; * 17. CloseContainer - Closes an open container and makes it immutable. * * 18. CopyContainer - Copies a container from a remote machine. + * + * 19. FinalizeBlock - Finalize recovery block request from client. */ enum Type { @@ -103,6 +105,8 @@ enum Type { StreamInit = 19; StreamWrite = 20; + + FinalizeBlock = 21; } @@ -208,6 +212,8 @@ message ContainerCommandRequestProto { optional string encodedToken = 23; optional uint32 version = 24; + + optional FinalizeBlockRequestProto finalizeBlock = 25; } message ContainerCommandResponseProto { @@ -237,7 +243,9 @@ message ContainerCommandResponseProto { optional PutSmallFileResponseProto putSmallFile = 19; optional GetSmallFileResponseProto getSmallFile = 20; - optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21; + optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21; + + optional FinalizeBlockResponseProto finalizeBlock = 22; } message ContainerDataProto { @@ -338,6 +346,14 @@ message PutBlockResponseProto { required GetCommittedBlockLengthResponseProto committedBlockLength = 1; } +message FinalizeBlockRequestProto { + required DatanodeBlockID blockID = 1; +} + +message FinalizeBlockResponseProto { + required BlockData blockData = 1; +} + message GetBlockRequestProto { required DatanodeBlockID blockID = 1; } @@ -373,6 +389,10 @@ message ListBlockResponseProto { repeated BlockData blockData = 1; } +message FinalizeBlockList { + repeated int64 localID = 1; +} + // Chunk Operations message ChunkInfo { diff --git a/hadoop-ozone/dev-support/intellij/ozone-style.xml b/hadoop-ozone/dev-support/intellij/ozone-style.xml index c1d1dad140bf..7ad620607249 100644 --- a/hadoop-ozone/dev-support/intellij/ozone-style.xml +++ b/hadoop-ozone/dev-support/intellij/ozone-style.xml @@ -18,7 +18,7 @@ - -

- * This class is immutable. - */ -public class FinalizeBlockList { - private static final Codec CODEC = new DelegatedCodec<>( - Proto3Codec.get(ContainerProtos.FinalizeBlockList.class), - FinalizeBlockList::getFromProtoBuf, - FinalizeBlockList::getProtoBufMessage, - DelegatedCodec.CopyType.SHALLOW); - - public static Codec getCodec() { - return CODEC; - } - - private final List blockList; - - public FinalizeBlockList(List blockList) { - this.blockList = Collections.unmodifiableList(blockList); - } - - public List asList() { - return blockList; - } - - /** - * @return A new {@link FinalizeBlockList} created from protobuf data. - */ - public static FinalizeBlockList getFromProtoBuf( - ContainerProtos.FinalizeBlockList finalizeBlockProto) { - return new FinalizeBlockList(finalizeBlockProto.getLocalIDList()); - } - - /** - * @return A protobuf message of this object. - */ - public ContainerProtos.FinalizeBlockList getProtoBufMessage() { - return ContainerProtos.FinalizeBlockList.newBuilder() - .addAllLocalID(blockList) - .build(); - } -} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 8319911e3af6..51bc1c4bbe6a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -80,6 +80,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST; import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure; +import static org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition.getContainerKeyPrefix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -452,11 +453,12 @@ private void clearFinalizeBlock() throws StorageContainerException { containerData, config)) { // Should never fail. Preconditions.checkNotNull(db, DB_NULL_ERR_MSG); - // delete finalizeBlock + + String prefix = getContainerKeyPrefix(containerData.getContainerID()); try (BatchOperation batch = db.getStore().getBatchHandler() .initBatchOperation()) { - db.getStore().getFinalizeBlocksTable().deleteWithBatch(batch, - containerData.getFinalizeBlockKey()); + db.getStore().getFinalizeBlocksTable() + .deleteBatchWithPrefix(batch, prefix); db.getStore().getBatchHandler().commitBatchOperation(batch); } } catch (IOException ex) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index b026235c19d4..1e414e4b1c27 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -413,15 +413,16 @@ public String getDeletingBlockKeyPrefix() { return formatKey(DELETING_KEY_PREFIX); } - public String getFinalizeBlockKey() { - return formatKey(""); - } - public KeyPrefixFilter getUnprefixedKeyFilter() { String schemaPrefix = containerPrefix(); return new KeyPrefixFilter().addFilter(schemaPrefix + "#", true); } + public KeyPrefixFilter getContainerPrefixFilter() { + String schemaPrefix = containerPrefix(); + return new KeyPrefixFilter().addFilter(schemaPrefix, false); + } + public KeyPrefixFilter getDeletingBlockKeyFilter() { return new KeyPrefixFilter().addFilter(getDeletingBlockKeyPrefix()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 53092e90adfb..94cd2bf0ce50 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -591,7 +591,8 @@ ContainerCommandResponseProto handleFinalizeBlock( chunkManager.finalizeWriteChunk(kvContainer, blockID); kvContainer.getContainerData() .addToFinalizedBlockSet(blockID.getLocalID()); - blockManager.finalizeBlock(kvContainer, blockID); + blockManager.finalizeBlock(kvContainer, + BlockData.getFromProtoBuf(responseData)); LOG.info("Block has been finalized {} ", blockID); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index 0989d3ec75b9..26ecf3790d82 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -23,6 +23,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -34,7 +35,6 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.ContainerInspectorUtil; @@ -370,14 +370,17 @@ private static void populateContainerMetadata( private static void populateContainerFinalizeBlock( KeyValueContainerData kvContainerData, DatanodeStore store) throws IOException { - Table finalizeBlocksTable = - store.getFinalizeBlocksTable(); + List lstLocalId = new ArrayList<>(); - FinalizeBlockList finalizeBlockList = - finalizeBlocksTable.get(kvContainerData.getFinalizeBlockKey()); - - if (finalizeBlockList != null) { - kvContainerData.addAllToFinalizedBlockSet(finalizeBlockList.asList()); + try (BlockIterator iter = + store.getFinalizeBlockIterator(kvContainerData.getContainerID(), + kvContainerData.getContainerPrefixFilter())) { + while (iter.hasNext()) { + lstLocalId.add(iter.nextBlock().getLocalID()); + } + } + if (!lstLocalId.isEmpty()) { + kvContainerData.addAllToFinalizedBlockSet(lstLocalId); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 627fc294444a..bf6660abac38 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.container.common.helpers.BlockData; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; @@ -42,6 +41,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,15 +216,15 @@ public static long persistPutBlock(KeyValueContainer container, } @Override - public void finalizeBlock(Container container, BlockID blockID) + public void finalizeBlock(Container container, BlockData blockData) throws IOException { - Preconditions.checkNotNull(blockID, "BlockID cannot " + + Preconditions.checkNotNull(blockData, "blockData cannot " + "be null for finalizeBlock operation."); - Preconditions.checkState(blockID.getContainerID() >= 0, + Preconditions.checkState(blockData.getContainerID() >= 0, "Container Id cannot be negative"); KeyValueContainer kvContainer = (KeyValueContainer)container; - long localID = blockID.getLocalID(); + long localID = blockData.getLocalID(); kvContainer.removeFromPendingPutBlockCache(localID); @@ -236,14 +236,8 @@ public void finalizeBlock(Container container, BlockID blockID) // persist finalizeBlock try (BatchOperation batch = db.getStore().getBatchHandler() .initBatchOperation()) { - - FinalizeBlockList finalBlockList = new FinalizeBlockList( - new ArrayList<>( - kvContainer.getContainerData().getFinalizedBlockSet())); db.getStore().getFinalizeBlocksTable().putWithBatch(batch, - kvContainer.getContainerData().getFinalizeBlockKey(), - finalBlockList); - + kvContainer.getContainerData().getBlockKey(localID), blockData); db.getStore().getBatchHandler().commitBatchOperation(batch); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 52e942246bd2..93b01d4da24a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -91,7 +91,7 @@ List listBlock(Container container, long startLocalID, int count) long getCommittedBlockLength(Container container, BlockID blockID) throws IOException; - void finalizeBlock(Container container, BlockID blockID) + void finalizeBlock(Container container, BlockData blockData) throws IOException; int getDefaultReadBufferCapacity(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index 3c4e800a4a70..fd6740af9d4c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.utils.db.DBDefinition; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import java.io.File; @@ -75,6 +74,6 @@ public ConfigurationSource getConfig() { public abstract DBColumnFamilyDefinition getDeletedBlocksColumnFamily(); - public abstract DBColumnFamilyDefinition + public abstract DBColumnFamilyDefinition getFinalizeBlocksColumnFamily(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java index 0964978bc36e..4c8c452464c3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedStatistics; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.db.DatanodeDBProfile; @@ -68,7 +67,9 @@ public abstract class AbstractDatanodeStore implements DatanodeStore { private Table deletedBlocksTable; - private Table finalizeBlocksTable; + private Table finalizeBlocksTable; + + private Table finalizeBlockDataTableWithIterator; static final Logger LOG = LoggerFactory.getLogger(AbstractDatanodeStore.class); @@ -178,8 +179,11 @@ public void start(ConfigurationSource config) checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName()); if (dbDef.getFinalizeBlocksColumnFamily() != null) { + finalizeBlockDataTableWithIterator = + dbDef.getFinalizeBlocksColumnFamily().getTable(this.store); + finalizeBlocksTable = new DatanodeTable<>( - dbDef.getFinalizeBlocksColumnFamily().getTable(this.store)); + finalizeBlockDataTableWithIterator); checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName()); } } @@ -219,7 +223,7 @@ public Table getDeletedBlocksTable() { } @Override - public Table getFinalizeBlocksTable() { + public Table getFinalizeBlocksTable() { return finalizeBlocksTable; } @@ -237,6 +241,13 @@ public BlockIterator getBlockIterator(long containerID, blockDataTableWithIterator.iterator(), filter); } + @Override + public BlockIterator getFinalizeBlockIterator(long containerID, + KeyPrefixFilter filter) throws IOException { + return new KeyValueBlockIterator(containerID, + finalizeBlockDataTableWithIterator.iterator(), filter); + } + @Override public synchronized boolean isClosed() { if (this.store == null) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java index 0354b8c0b43d..a40fb24074b5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.utils.CollectionUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import java.util.List; import java.util.Map; @@ -114,7 +113,7 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { } @Override - public DBColumnFamilyDefinition + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { return null; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java index 08b525c20d56..c02bf409c626 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.db.DatanodeDBProfile; @@ -93,14 +92,14 @@ public class DatanodeSchemaThreeDBDefinition DeletedBlocksTransaction.class, Proto2Codec.get(DeletedBlocksTransaction.class)); - public static final DBColumnFamilyDefinition + public static final DBColumnFamilyDefinition FINALIZE_BLOCKS = new DBColumnFamilyDefinition<>( "finalize_blocks", String.class, FixedLengthStringCodec.get(), - FinalizeBlockList.class, - FinalizeBlockList.getCodec()); + BlockData.class, + BlockData.getCodec()); private static String separator = ""; @@ -165,7 +164,7 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { } @Override - public DBColumnFamilyDefinition + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { return FINALIZE_BLOCKS; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java index 9836c133d9c4..49d222782ab4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java @@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import java.util.Map; @@ -117,7 +116,7 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { } @Override - public DBColumnFamilyDefinition + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { return null; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java index c3b5d1580202..85a76d61d16e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList; -import org.apache.hadoop.ozone.container.common.helpers.FinalizeBlockList; import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator; import java.io.Closeable; @@ -78,7 +77,12 @@ public interface DatanodeStore extends Closeable { */ Table getDeletedBlocksTable(); - Table getFinalizeBlocksTable(); + /** + * A Table that keeps finalize blocks requested from client. + * + * @return Table + */ + Table getFinalizeBlocksTable(); /** * Helper to create and write batch transactions. @@ -97,6 +101,9 @@ BlockIterator getBlockIterator(long containerID) BlockIterator getBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException; + BlockIterator getFinalizeBlockIterator(long containerID, + KeyPrefixFilter filter) throws IOException; + /** * Returns if the underlying DB is closed. This call is thread safe. * @return true if the DB is closed. From d073ba3e986a3e823bd78cf5d7ead448b4992b19 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 22 Dec 2023 10:43:24 +0530 Subject: [PATCH 07/10] Fix review comments, including update store change value type to Long --- .../keyvalue/KeyValueContainerData.java | 11 +- .../container/keyvalue/KeyValueHandler.java | 3 +- .../helpers/KeyValueContainerUtil.java | 18 ++-- .../keyvalue/impl/BlockManagerImpl.java | 10 +- .../keyvalue/interfaces/BlockManager.java | 2 +- .../AbstractDatanodeDBDefinition.java | 2 +- .../metadata/AbstractDatanodeStore.java | 100 +++++++++++++++++- .../DatanodeSchemaOneDBDefinition.java | 2 +- .../DatanodeSchemaThreeDBDefinition.java | 8 +- .../DatanodeSchemaTwoDBDefinition.java | 2 +- .../container/metadata/DatanodeStore.java | 4 +- .../main/proto/DatanodeClientProtocol.proto | 4 - 12 files changed, 121 insertions(+), 45 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index 1e414e4b1c27..506e24a23afd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -282,16 +282,12 @@ public long getDeleteTransactionId() { } /** - * Add the given localID of a block to the pendingPutBlockCache. + * Add the given localID of a block to the finalizedBlockSet. */ public void addToFinalizedBlockSet(long localID) { finalizedBlockSet.add(localID); } - public void addAllToFinalizedBlockSet(List lstBlocks) { - finalizedBlockSet.addAll(lstBlocks); - } - public Set getFinalizedBlockSet() { return finalizedBlockSet; } @@ -418,11 +414,6 @@ public KeyPrefixFilter getUnprefixedKeyFilter() { return new KeyPrefixFilter().addFilter(schemaPrefix + "#", true); } - public KeyPrefixFilter getContainerPrefixFilter() { - String schemaPrefix = containerPrefix(); - return new KeyPrefixFilter().addFilter(schemaPrefix, false); - } - public KeyPrefixFilter getDeletingBlockKeyFilter() { return new KeyPrefixFilter().addFilter(getDeletingBlockKeyPrefix()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 94cd2bf0ce50..8b6ecb43f4b9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -589,10 +589,9 @@ ContainerCommandResponseProto handleFinalizeBlock( .getProtoBufMessage(); chunkManager.finalizeWriteChunk(kvContainer, blockID); + blockManager.finalizeBlock(kvContainer, blockID); kvContainer.getContainerData() .addToFinalizedBlockSet(blockID.getLocalID()); - blockManager.finalizeBlock(kvContainer, - BlockData.getFromProtoBuf(responseData)); LOG.info("Block has been finalized {} ", blockID); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index 26ecf3790d82..70e8895153a2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -23,7 +23,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -367,21 +366,22 @@ private static void populateContainerMetadata( ContainerInspectorUtil.process(kvContainerData, store); } + /** + * Loads finalizeBlockLocalIds for container in memory. + * @param kvContainerData - KeyValueContainerData + * @param store - DatanodeStore + * @throws IOException + */ private static void populateContainerFinalizeBlock( KeyValueContainerData kvContainerData, DatanodeStore store) throws IOException { - List lstLocalId = new ArrayList<>(); - - try (BlockIterator iter = + try (BlockIterator iter = store.getFinalizeBlockIterator(kvContainerData.getContainerID(), - kvContainerData.getContainerPrefixFilter())) { + kvContainerData.getUnprefixedKeyFilter())) { while (iter.hasNext()) { - lstLocalId.add(iter.nextBlock().getLocalID()); + kvContainerData.addToFinalizedBlockSet(iter.nextBlock()); } } - if (!lstLocalId.isEmpty()) { - kvContainerData.addAllToFinalizedBlockSet(lstLocalId); - } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index bf6660abac38..38ca691ec121 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -216,15 +216,15 @@ public static long persistPutBlock(KeyValueContainer container, } @Override - public void finalizeBlock(Container container, BlockData blockData) + public void finalizeBlock(Container container, BlockID blockId) throws IOException { - Preconditions.checkNotNull(blockData, "blockData cannot " + + Preconditions.checkNotNull(blockId, "blockId cannot " + "be null for finalizeBlock operation."); - Preconditions.checkState(blockData.getContainerID() >= 0, + Preconditions.checkState(blockId.getContainerID() >= 0, "Container Id cannot be negative"); KeyValueContainer kvContainer = (KeyValueContainer)container; - long localID = blockData.getLocalID(); + long localID = blockId.getLocalID(); kvContainer.removeFromPendingPutBlockCache(localID); @@ -237,7 +237,7 @@ public void finalizeBlock(Container container, BlockData blockData) try (BatchOperation batch = db.getStore().getBatchHandler() .initBatchOperation()) { db.getStore().getFinalizeBlocksTable().putWithBatch(batch, - kvContainer.getContainerData().getBlockKey(localID), blockData); + kvContainer.getContainerData().getBlockKey(localID), localID); db.getStore().getBatchHandler().commitBatchOperation(batch); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 93b01d4da24a..a815c668d829 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -91,7 +91,7 @@ List listBlock(Container container, long startLocalID, int count) long getCommittedBlockLength(Container container, BlockID blockID) throws IOException; - void finalizeBlock(Container container, BlockData blockData) + void finalizeBlock(Container container, BlockID blockId) throws IOException; int getDefaultReadBufferCapacity(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index fd6740af9d4c..4564dc483cef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -74,6 +74,6 @@ public ConfigurationSource getConfig() { public abstract DBColumnFamilyDefinition getDeletedBlocksColumnFamily(); - public abstract DBColumnFamilyDefinition + public abstract DBColumnFamilyDefinition getFinalizeBlocksColumnFamily(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java index 4c8c452464c3..01427652bd4d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java @@ -67,9 +67,9 @@ public abstract class AbstractDatanodeStore implements DatanodeStore { private Table deletedBlocksTable; - private Table finalizeBlocksTable; + private Table finalizeBlocksTable; - private Table finalizeBlockDataTableWithIterator; + private Table finalizeBlockDataTableWithIterator; static final Logger LOG = LoggerFactory.getLogger(AbstractDatanodeStore.class); @@ -223,7 +223,7 @@ public Table getDeletedBlocksTable() { } @Override - public Table getFinalizeBlocksTable() { + public Table getFinalizeBlocksTable() { return finalizeBlocksTable; } @@ -242,9 +242,9 @@ public BlockIterator getBlockIterator(long containerID, } @Override - public BlockIterator getFinalizeBlockIterator(long containerID, + public BlockIterator getFinalizeBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException { - return new KeyValueBlockIterator(containerID, + return new KeyValueBlockLocalIdIterator(containerID, finalizeBlockDataTableWithIterator.iterator(), filter); } @@ -405,4 +405,94 @@ public void close() throws IOException { blockIterator.close(); } } + + /** + * Block localId Iterator for KeyValue Container. + * This Block localId iterator returns localIds + * which match with the {@link MetadataKeyFilters.KeyPrefixFilter}. If no + * filter is specified, then default filter used is + * {@link MetadataKeyFilters#getUnprefixedKeyFilter()} + */ + @InterfaceAudience.Public + public static class KeyValueBlockLocalIdIterator implements + BlockIterator, Closeable { + + private static final Logger LOG = LoggerFactory.getLogger( + KeyValueBlockLocalIdIterator.class); + + private final TableIterator> blockLocalIdIterator; + private final KeyPrefixFilter localIdFilter; + private Long nextLocalId; + private final long containerID; + + /** + * KeyValueBlockLocalIdIterator to iterate block localIds in a container. + * @param iterator - The iterator to apply the blockLocalId filter to. + * @param filter - BlockLocalId filter to be applied for block localIds. + */ + KeyValueBlockLocalIdIterator(long containerID, + TableIterator> + iterator, KeyPrefixFilter filter) { + this.containerID = containerID; + this.blockLocalIdIterator = iterator; + this.localIdFilter = filter; + } + + /** + * This method returns blocks matching with the filter. + * @return next block local Id or null if no more block localIds + * @throws IOException + */ + @Override + public Long nextBlock() throws IOException, NoSuchElementException { + if (nextLocalId != null) { + Long currentLocalId = nextLocalId; + nextLocalId = null; + return currentLocalId; + } + if (hasNext()) { + return nextBlock(); + } + throw new NoSuchElementException("Block Local ID Iterator " + + "reached end for ContainerID " + containerID); + } + + @Override + public boolean hasNext() throws IOException { + if (nextLocalId != null) { + return true; + } + while (blockLocalIdIterator.hasNext()) { + Table.KeyValue keyValue = blockLocalIdIterator.next(); + byte[] keyBytes = StringUtils.string2Bytes(keyValue.getKey()); + if (localIdFilter.filterKey(null, keyBytes, null)) { + nextLocalId = keyValue.getValue(); + if (LOG.isTraceEnabled()) { + LOG.trace("Block matching with filter found: LocalID is : " + + "{} for containerID {}", nextLocalId, containerID); + } + return true; + } + } + return false; + } + + @Override + public void seekToFirst() { + nextLocalId = null; + blockLocalIdIterator.seekToFirst(); + } + + @Override + public void seekToLast() { + nextLocalId = null; + blockLocalIdIterator.seekToLast(); + } + + @Override + public void close() throws IOException { + blockLocalIdIterator.close(); + } + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java index a40fb24074b5..eccb549ea402 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java @@ -113,7 +113,7 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { } @Override - public DBColumnFamilyDefinition + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { return null; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java index c02bf409c626..87a283e45836 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java @@ -92,14 +92,14 @@ public class DatanodeSchemaThreeDBDefinition DeletedBlocksTransaction.class, Proto2Codec.get(DeletedBlocksTransaction.class)); - public static final DBColumnFamilyDefinition + public static final DBColumnFamilyDefinition FINALIZE_BLOCKS = new DBColumnFamilyDefinition<>( "finalize_blocks", String.class, FixedLengthStringCodec.get(), - BlockData.class, - BlockData.getCodec()); + Long.class, + LongCodec.get()); private static String separator = ""; @@ -164,7 +164,7 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { } @Override - public DBColumnFamilyDefinition + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { return FINALIZE_BLOCKS; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java index 49d222782ab4..1533b678a585 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java @@ -116,7 +116,7 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { } @Override - public DBColumnFamilyDefinition + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { return null; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java index 85a76d61d16e..4ca81a03722e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java @@ -82,7 +82,7 @@ public interface DatanodeStore extends Closeable { * * @return Table */ - Table getFinalizeBlocksTable(); + Table getFinalizeBlocksTable(); /** * Helper to create and write batch transactions. @@ -101,7 +101,7 @@ BlockIterator getBlockIterator(long containerID) BlockIterator getBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException; - BlockIterator getFinalizeBlockIterator(long containerID, + BlockIterator getFinalizeBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException; /** diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 561e18882185..367238a28552 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -389,10 +389,6 @@ message ListBlockResponseProto { repeated BlockData blockData = 1; } -message FinalizeBlockList { - repeated int64 localID = 1; -} - // Chunk Operations message ChunkInfo { From 5a91a61ab729fe326cb0ac8e22f11348508de025 Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 22 Dec 2023 11:28:26 +0530 Subject: [PATCH 08/10] Extract finalizeBlock request method --- .../commandhandler/TestFinalizeBlock.java | 85 +++++++------------ 1 file changed, 30 insertions(+), 55 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java index b17e3a8ca65a..60cc6b014db5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java @@ -44,6 +44,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.JUnit5AwareTimeout; +import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -144,29 +145,12 @@ public void testFinalizeBlock() Pipeline pipeline = cluster.getStorageContainerManager() .getPipelineManager().getPipeline(container.getPipelineID()); - - final ContainerProtos.ContainerCommandRequestProto.Builder builder = - ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.FinalizeBlock) - .setContainerID(container.getContainerID()) - .setDatanodeUuid(cluster.getHddsDatanodes() - .get(0).getDatanodeDetails().getUuidString()); - - final ContainerProtos.DatanodeBlockID blockId = - ContainerProtos.DatanodeBlockID.newBuilder() - .setContainerID(container.getContainerID()).setLocalID( - omKeyLocationInfoGroupList.get(0) - .getLocationList().get(0).getLocalID()) - .setBlockCommitSequenceId(0).build(); - - builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto - .newBuilder().setBlockID(blockId).build()); + final ContainerProtos.ContainerCommandRequestProto request = + getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); XceiverClientSpi xceiverClient = xceiverClientManager.acquireClient(pipeline); - - ContainerProtos.ContainerCommandRequestProto request = builder.build(); ContainerProtos.ContainerCommandResponseProto response = xceiverClient.sendCommand(request); @@ -202,28 +186,12 @@ public void testFinalizeBlockReloadAfterDNRestart() Pipeline pipeline = cluster.getStorageContainerManager() .getPipelineManager().getPipeline(container.getPipelineID()); - final ContainerProtos.ContainerCommandRequestProto.Builder builder = - ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.FinalizeBlock) - .setContainerID(container.getContainerID()) - .setDatanodeUuid(cluster.getHddsDatanodes() - .get(0).getDatanodeDetails().getUuidString()); - - final ContainerProtos.DatanodeBlockID blockId = - ContainerProtos.DatanodeBlockID.newBuilder() - .setContainerID(container.getContainerID()).setLocalID( - omKeyLocationInfoGroupList.get(0) - .getLocationList().get(0).getLocalID()) - .setBlockCommitSequenceId(0).build(); - - builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto - .newBuilder().setBlockID(blockId).build()); + final ContainerProtos.ContainerCommandRequestProto request = + getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); XceiverClientSpi xceiverClient = xceiverClientManager.acquireClient(pipeline); - - ContainerProtos.ContainerCommandRequestProto request = builder.build(); ContainerProtos.ContainerCommandResponseProto response = xceiverClient.sendCommand(request); @@ -270,28 +238,12 @@ public void testFinalizeBlockClearAfterCloseContainer() Pipeline pipeline = cluster.getStorageContainerManager() .getPipelineManager().getPipeline(container.getPipelineID()); - - final ContainerProtos.ContainerCommandRequestProto.Builder builder = - ContainerProtos.ContainerCommandRequestProto.newBuilder() - .setCmdType(ContainerProtos.Type.FinalizeBlock) - .setContainerID(container.getContainerID()) - .setDatanodeUuid(cluster.getHddsDatanodes() - .get(0).getDatanodeDetails().getUuidString()); - - final ContainerProtos.DatanodeBlockID blockId = - ContainerProtos.DatanodeBlockID.newBuilder() - .setContainerID(container.getContainerID()).setLocalID( - omKeyLocationInfoGroupList.get(0) - .getLocationList().get(0).getLocalID()) - .setBlockCommitSequenceId(0).build(); - - builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto - .newBuilder().setBlockID(blockId).build()); + final ContainerProtos.ContainerCommandRequestProto request = + getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); XceiverClientSpi xceiverClient = xceiverClientManager.acquireClient(pipeline); - ContainerProtos.ContainerCommandRequestProto request = builder.build(); ContainerProtos.ContainerCommandResponseProto response = xceiverClient.sendCommand(request); @@ -330,6 +282,29 @@ public void testFinalizeBlockClearAfterCloseContainer() .getFinalizedBlockSet().size() == 0); } + @NotNull + private ContainerProtos.ContainerCommandRequestProto getFinalizeBlockRequest( + List omKeyLocationInfoGroupList, + ContainerInfo container) { + final ContainerProtos.ContainerCommandRequestProto.Builder builder = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.FinalizeBlock) + .setContainerID(container.getContainerID()) + .setDatanodeUuid(cluster.getHddsDatanodes() + .get(0).getDatanodeDetails().getUuidString()); + + final ContainerProtos.DatanodeBlockID blockId = + ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(container.getContainerID()).setLocalID( + omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID()) + .setBlockCommitSequenceId(0).build(); + + builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto + .newBuilder().setBlockID(blockId).build()); + return builder.build(); + } + /** * create a key with specified name. * @param keyName From 7a35a03bd4f169d4a6953b2c22675c318cb364ff Mon Sep 17 00:00:00 2001 From: ashishk Date: Fri, 22 Dec 2023 12:20:51 +0530 Subject: [PATCH 09/10] Add test case for PUT/WRITE chunk rejection after finalize block --- .../commandhandler/TestFinalizeBlock.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java index 60cc6b014db5..fd1a203c7bac 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -37,6 +38,7 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -65,6 +67,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests FinalizeBlock. @@ -282,6 +285,68 @@ public void testFinalizeBlockClearAfterCloseContainer() .getFinalizedBlockSet().size() == 0); } + @Test + public void testRejectPutAndWriteChunkAfterFinalizeBlock() + throws IOException { + String keyName = UUID.randomUUID().toString(); + // create key + createKey(keyName); + + ContainerID containerId = cluster.getStorageContainerManager() + .getContainerManager().getContainers().get(0).containerID(); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName).setKeyName(keyName).setDataSize(0) + .build(); + List omKeyLocationInfoGroupList = + cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions(); + + ContainerInfo container = cluster.getStorageContainerManager() + .getContainerManager().getContainer(containerId); + Pipeline pipeline = cluster.getStorageContainerManager() + .getPipelineManager().getPipeline(container.getPipelineID()); + + XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + + // Before finalize block WRITE chunk on the same block should pass through + ContainerProtos.ContainerCommandRequestProto request = + ContainerTestHelper.getWriteChunkRequest(pipeline, ( + new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID())), 100); + xceiverClient.sendCommand(request); + + // Before finalize block PUT block on the same block should pass through + request = ContainerTestHelper.getPutBlockRequest(request); + xceiverClient.sendCommand(request); + + // Now Finalize Block + request = getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); + xceiverClient.sendCommand(request); + + // Try doing WRITE chunk on the already finalized block + request = ContainerTestHelper.getWriteChunkRequest(pipeline, ( + new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID())), 100); + + try { + xceiverClient.sendCommand(request); + } catch (IOException e) { + assertTrue(e.getCause().getMessage() + .contains("Block already finalized")); + } + + // Try doing PUT block on the already finalized block + request = ContainerTestHelper.getPutBlockRequest(request); + try { + xceiverClient.sendCommand(request); + } catch (IOException e) { + assertTrue(e.getCause().getMessage() + .contains("Block already finalized")); + } + } + @NotNull private ContainerProtos.ContainerCommandRequestProto getFinalizeBlockRequest( List omKeyLocationInfoGroupList, From 89a02acdf4fef3e0be8fb42b653bbca006ab42eb Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Tue, 26 Dec 2023 17:04:38 +0800 Subject: [PATCH 10/10] refactor TestFinalizeBlock to reduce execution time --- .../container/common/interfaces/Handler.java | 3 +- .../container/keyvalue/KeyValueContainer.java | 34 +-- .../keyvalue/KeyValueContainerData.java | 13 +- .../helpers/KeyValueContainerUtil.java | 16 +- .../keyvalue/interfaces/ChunkManager.java | 2 +- .../AbstractDatanodeDBDefinition.java | 5 +- .../metadata/AbstractDatanodeStore.java | 18 +- .../DatanodeSchemaOneDBDefinition.java | 6 - .../DatanodeSchemaTwoDBDefinition.java | 19 +- .../DatanodeStoreSchemaThreeImpl.java | 7 + .../commandhandler/TestFinalizeBlock.java | 223 +++++++----------- 11 files changed, 146 insertions(+), 200 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 9df2948eaa26..bfdff69be46f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -215,8 +215,7 @@ public abstract void deleteUnreferenced(Container container, long localID) public abstract void addFinalizedBlock(Container container, long localID); - public abstract boolean isFinalizedBlockExist(Container container, - long localID); + public abstract boolean isFinalizedBlockExist(Container container, long localID); public void setClusterID(String clusterID) { this.clusterId = clusterID; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java index 51bc1c4bbe6a..98d81c15d0ad 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; -import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.nativeio.NativeIO; @@ -80,7 +79,6 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST; import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure; -import static org.apache.hadoop.ozone.container.metadata.DatanodeSchemaThreeDBDefinition.getContainerKeyPrefix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,8 +115,6 @@ public class KeyValueContainer implements Container { private boolean bCheckChunksFilePath; - private static final String DB_NULL_ERR_MSG = "DB cannot be null here"; - public KeyValueContainer(KeyValueContainerData containerData, ConfigurationSource ozoneConfig) { Preconditions.checkNotNull(containerData, @@ -438,36 +434,18 @@ public void quasiClose() throws StorageContainerException { @Override public void close() throws StorageContainerException { - clearFinalizeBlock(); + try (DBHandle db = BlockUtils.getDB(containerData, config)) { + containerData.clearFinalizedBlock(db); + } catch (IOException ex) { + LOG.error("Error in deleting entry from Finalize Block table", ex); + throw new StorageContainerException(ex, IO_EXCEPTION); + } closeAndFlushIfNeeded(containerData::closeContainer); LOG.info("Container {} is closed with bcsId {}.", containerData.getContainerID(), containerData.getBlockCommitSequenceId()); } - private void clearFinalizeBlock() throws StorageContainerException { - if (!containerData.getFinalizedBlockSet().isEmpty()) { - // delete from db and clear memory - containerData.clearFinalizedBlock(); - try (DBHandle db = BlockUtils.getDB( - containerData, config)) { - // Should never fail. - Preconditions.checkNotNull(db, DB_NULL_ERR_MSG); - - String prefix = getContainerKeyPrefix(containerData.getContainerID()); - try (BatchOperation batch = db.getStore().getBatchHandler() - .initBatchOperation()) { - db.getStore().getFinalizeBlocksTable() - .deleteBatchWithPrefix(batch, prefix); - db.getStore().getBatchHandler().commitBatchOperation(batch); - } - } catch (IOException ex) { - LOG.error("Error in deleting entry from Finalize Block table", ex); - throw new StorageContainerException(ex, IO_EXCEPTION); - } - } - } - @Override public void updateDataScanTimestamp(Instant time) throws StorageContainerException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index 506e24a23afd..47d4f3f9e70a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -296,8 +296,17 @@ public boolean isFinalizedBlockExist(long localID) { return finalizedBlockSet.contains(localID); } - public void clearFinalizedBlock() { - finalizedBlockSet.clear(); + public void clearFinalizedBlock(DBHandle db) throws IOException { + if (!finalizedBlockSet.isEmpty()) { + // delete from db and clear memory + // Should never fail. + Preconditions.checkNotNull(db, "DB cannot be null here"); + try (BatchOperation batch = db.getStore().getBatchHandler().initBatchOperation()) { + db.getStore().getFinalizeBlocksTable().deleteBatchWithPrefix(batch, containerPrefix()); + db.getStore().getBatchHandler().commitBatchOperation(batch); + } + finalizedBlockSet.clear(); + } } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java index 70e8895153a2..13b380a89f2f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java @@ -244,7 +244,6 @@ public static void parseKVContainerData(KeyValueContainerData kvContainerData, try (DBHandle db = BlockUtils.getDB(kvContainerData, config)) { populateContainerMetadata(kvContainerData, db.getStore(), bCheckChunksFilePath); - populateContainerFinalizeBlock(kvContainerData, db.getStore()); } return; } @@ -364,6 +363,9 @@ private static void populateContainerMetadata( // startup. If this method is called but not as a part of startup, // The inspectors will be unloaded and this will be a no-op. ContainerInspectorUtil.process(kvContainerData, store); + + // Load finalizeBlockLocalIds for container in memory. + populateContainerFinalizeBlock(kvContainerData, store); } /** @@ -375,11 +377,13 @@ private static void populateContainerMetadata( private static void populateContainerFinalizeBlock( KeyValueContainerData kvContainerData, DatanodeStore store) throws IOException { - try (BlockIterator iter = - store.getFinalizeBlockIterator(kvContainerData.getContainerID(), - kvContainerData.getUnprefixedKeyFilter())) { - while (iter.hasNext()) { - kvContainerData.addToFinalizedBlockSet(iter.nextBlock()); + if (store.getFinalizeBlocksTable() != null) { + try (BlockIterator iter = + store.getFinalizeBlockIterator(kvContainerData.getContainerID(), + kvContainerData.getUnprefixedKeyFilter())) { + while (iter.hasNext()) { + kvContainerData.addToFinalizedBlockSet(iter.nextBlock()); + } } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index b3820b4636ba..ed3142c8570d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -108,7 +108,7 @@ default void finishWriteChunks(KeyValueContainer kvContainer, default void finalizeWriteChunk(KeyValueContainer container, BlockID blockId) throws IOException { - throw new IOException("finalizeWriteChunk is not supported"); + // no-op } default String streamInit(Container container, BlockID blockID) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java index 4564dc483cef..e1b10e6df571 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java @@ -74,6 +74,7 @@ public ConfigurationSource getConfig() { public abstract DBColumnFamilyDefinition getDeletedBlocksColumnFamily(); - public abstract DBColumnFamilyDefinition - getFinalizeBlocksColumnFamily(); + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { + return null; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java index 01427652bd4d..b949a191453a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java @@ -67,9 +67,9 @@ public abstract class AbstractDatanodeStore implements DatanodeStore { private Table deletedBlocksTable; - private Table finalizeBlocksTable; + private Table finalizeBlocksTable; - private Table finalizeBlockDataTableWithIterator; + private Table finalizeBlocksTableWithIterator; static final Logger LOG = LoggerFactory.getLogger(AbstractDatanodeStore.class); @@ -179,11 +179,11 @@ public void start(ConfigurationSource config) checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName()); if (dbDef.getFinalizeBlocksColumnFamily() != null) { - finalizeBlockDataTableWithIterator = + finalizeBlocksTableWithIterator = dbDef.getFinalizeBlocksColumnFamily().getTable(this.store); finalizeBlocksTable = new DatanodeTable<>( - finalizeBlockDataTableWithIterator); + finalizeBlocksTableWithIterator); checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName()); } } @@ -231,21 +231,21 @@ public Table getFinalizeBlocksTable() { public BlockIterator getBlockIterator(long containerID) throws IOException { return new KeyValueBlockIterator(containerID, - blockDataTableWithIterator.iterator()); + blockDataTableWithIterator.iterator()); } @Override public BlockIterator getBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException { return new KeyValueBlockIterator(containerID, - blockDataTableWithIterator.iterator(), filter); + blockDataTableWithIterator.iterator(), filter); } @Override public BlockIterator getFinalizeBlockIterator(long containerID, KeyPrefixFilter filter) throws IOException { return new KeyValueBlockLocalIdIterator(containerID, - finalizeBlockDataTableWithIterator.iterator(), filter); + finalizeBlocksTableWithIterator.iterator(), filter); } @Override @@ -290,6 +290,10 @@ protected Table getBlockDataTableWithIterator() { return this.blockDataTableWithIterator; } + protected Table getFinalizeBlocksTableWithIterator() { + return this.finalizeBlocksTableWithIterator; + } + private static void checkTableStatus(Table table, String name) throws IOException { String logMessage = "Unable to get a reference to %s table. Cannot " + diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java index eccb549ea402..a002eef3f72a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java @@ -111,10 +111,4 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { public Iterable> getColumnFamilies() { return () -> CollectionUtils.newIterator(COLUMN_FAMILIES.values()); } - - @Override - public DBColumnFamilyDefinition - getFinalizeBlocksColumnFamily() { - return null; - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java index 1533b678a585..e0e491a9ea65 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; import org.apache.hadoop.hdds.utils.db.DBDefinition; +import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec; import org.apache.hadoop.hdds.utils.db.LongCodec; import org.apache.hadoop.hdds.utils.db.Proto2Codec; import org.apache.hadoop.hdds.utils.db.StringCodec; @@ -76,6 +77,15 @@ public class DatanodeSchemaTwoDBDefinition StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class, Proto2Codec.get(DeletedBlocksTransaction.class)); + public static final DBColumnFamilyDefinition + FINALIZE_BLOCKS = + new DBColumnFamilyDefinition<>( + "finalize_blocks", + String.class, + FixedLengthStringCodec.get(), + Long.class, + LongCodec.get()); + public DatanodeSchemaTwoDBDefinition(String dbPath, ConfigurationSource config) { super(dbPath, config); @@ -86,7 +96,8 @@ public DatanodeSchemaTwoDBDefinition(String dbPath, BLOCK_DATA, METADATA, DELETED_BLOCKS, - DELETE_TRANSACTION); + DELETE_TRANSACTION, + FINALIZE_BLOCKS); @Override public Map> getMap() { @@ -115,9 +126,7 @@ public DBColumnFamilyDefinition getMetadataColumnFamily() { return DELETE_TRANSACTION; } - @Override - public DBColumnFamilyDefinition - getFinalizeBlocksColumnFamily() { - return null; + public DBColumnFamilyDefinition getFinalizeBlocksColumnFamily() { + return FINALIZE_BLOCKS; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java index ee8580defa0c..7f37c9ae51ca 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java @@ -94,6 +94,13 @@ public BlockIterator getBlockIterator(long containerID, .iterator(getContainerKeyPrefix(containerID)), filter); } + @Override + public BlockIterator getFinalizeBlockIterator(long containerID, + MetadataKeyFilters.KeyPrefixFilter filter) throws IOException { + return new KeyValueBlockLocalIdIterator(containerID, + getFinalizeBlocksTableWithIterator().iterator(getContainerKeyPrefix(containerID)), filter); + } + public void removeKVContainerData(long containerID) throws IOException { String prefix = getContainerKeyPrefix(containerID); try (BatchOperation batch = getBatchHandler().initBatchOperation()) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java index fd1a203c7bac..2a0376ba3d14 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -39,6 +40,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; @@ -54,24 +56,39 @@ import org.junit.Test; import org.junit.rules.TestRule; import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK; +import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Tests FinalizeBlock. */ +@RunWith(Parameterized.class) public class TestFinalizeBlock { private OzoneClient client; @@ -85,6 +102,23 @@ public class TestFinalizeBlock { private ObjectStore objectStore; private static String volumeName = UUID.randomUUID().toString(); private static String bucketName = UUID.randomUUID().toString(); + private boolean schemaV3; + private ContainerLayoutVersion layoutVersion; + + public TestFinalizeBlock(boolean enableSchemaV3, ContainerLayoutVersion version) { + this.schemaV3 = enableSchemaV3; + this.layoutVersion = version; + } + + @Parameterized.Parameters + public static Iterable parameters() { + return Arrays.asList(new Object[][]{ + {false, FILE_PER_CHUNK}, + {true, FILE_PER_CHUNK}, + {false, FILE_PER_BLOCK}, + {true, FILE_PER_BLOCK}, + }); + } @Before public void setup() throws Exception { @@ -93,8 +127,14 @@ public void setup() throws Exception { conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, 0, StorageUnit.MB); conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false); - conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, - 3, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3); + conf.setEnum(ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY, layoutVersion); DatanodeConfiguration datanodeConfiguration = conf.getObject( DatanodeConfiguration.class); @@ -128,8 +168,7 @@ public void shutdown() { } @Test - public void testFinalizeBlock() - throws IOException { + public void testFinalizeBlock() throws IOException, InterruptedException, TimeoutException { String keyName = UUID.randomUUID().toString(); // create key createKey(keyName); @@ -148,12 +187,23 @@ public void testFinalizeBlock() Pipeline pipeline = cluster.getStorageContainerManager() .getPipelineManager().getPipeline(container.getPipelineID()); - final ContainerProtos.ContainerCommandRequestProto request = - getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); - XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); XceiverClientSpi xceiverClient = xceiverClientManager.acquireClient(pipeline); + + // Before finalize block WRITE chunk on the same block should pass through + ContainerProtos.ContainerCommandRequestProto request = + ContainerTestHelper.getWriteChunkRequest(pipeline, ( + new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID())), 100); + xceiverClient.sendCommand(request); + + // Before finalize block PUT block on the same block should pass through + request = ContainerTestHelper.getPutBlockRequest(request); + xceiverClient.sendCommand(request); + + // Now Finalize Block + request = getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); ContainerProtos.ContainerCommandResponseProto response = xceiverClient.sendCommand(request); @@ -166,116 +216,41 @@ public void testFinalizeBlock() cluster.getHddsDatanodes().get(0), containerId.getId()).getContainerData()) .getFinalizedBlockSet().size() == 1); - } - - @Test - public void testFinalizeBlockReloadAfterDNRestart() - throws IOException { - String keyName = UUID.randomUUID().toString(); - // create key - createKey(keyName); - - ContainerID containerId = cluster.getStorageContainerManager() - .getContainerManager().getContainers().get(0).containerID(); - - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName).setKeyName(keyName).setDataSize(0) - .build(); - List omKeyLocationInfoGroupList = - cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions(); - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager().getContainer(containerId); - Pipeline pipeline = cluster.getStorageContainerManager() - .getPipelineManager().getPipeline(container.getPipelineID()); - - final ContainerProtos.ContainerCommandRequestProto request = - getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); - - XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(pipeline); - ContainerProtos.ContainerCommandResponseProto response = - xceiverClient.sendCommand(request); - - Assert.assertTrue(response.getFinalizeBlock() - .getBlockData().getBlockID().getLocalID() - == omKeyLocationInfoGroupList.get(0) - .getLocationList().get(0).getLocalID()); - Assert.assertTrue(((KeyValueContainerData) - getContainerfromDN(cluster.getHddsDatanodes().get(0), - containerId.getId()).getContainerData()) - .getFinalizedBlockSet().size() == 1); + testRejectPutAndWriteChunkAfterFinalizeBlock(containerId, pipeline, xceiverClient, omKeyLocationInfoGroupList); + testFinalizeBlockReloadAfterDNRestart(containerId); + testFinalizeBlockClearAfterCloseContainer(containerId); + } + private void testFinalizeBlockReloadAfterDNRestart(ContainerID containerId) { try { - cluster.restartHddsDatanode(0, false); + cluster.restartHddsDatanode(0, true); } catch (Exception e) { + fail("Fail to restart Datanode"); } // After restart DN, finalizeBlock should be loaded into memory Assert.assertTrue(((KeyValueContainerData) getContainerfromDN(cluster.getHddsDatanodes().get(0), - containerId.getId()).getContainerData()) + containerId.getId()).getContainerData()) .getFinalizedBlockSet().size() == 1); } - @Test - public void testFinalizeBlockClearAfterCloseContainer() - throws IOException { - String keyName = UUID.randomUUID().toString(); - // create key - createKey(keyName); - - ContainerID containerId = cluster.getStorageContainerManager() - .getContainerManager().getContainers().get(0).containerID(); - - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName).setKeyName(keyName).setDataSize(0) - .build(); - List omKeyLocationInfoGroupList = - cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions(); - - - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager().getContainer(containerId); - Pipeline pipeline = cluster.getStorageContainerManager() - .getPipelineManager().getPipeline(container.getPipelineID()); - - final ContainerProtos.ContainerCommandRequestProto request = - getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); - - XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(pipeline); - - ContainerProtos.ContainerCommandResponseProto response = - xceiverClient.sendCommand(request); - Assert.assertTrue(response.getFinalizeBlock() - .getBlockData().getBlockID().getLocalID() - == omKeyLocationInfoGroupList.get(0) - .getLocationList().get(0).getLocalID()); - - Assert.assertTrue(((KeyValueContainerData) - getContainerfromDN(cluster.getHddsDatanodes().get(0), - containerId.getId()).getContainerData()) - .getFinalizedBlockSet().size() == 1); - - OzoneTestUtils.closeAllContainers(cluster - .getStorageContainerManager().getEventQueue(), + private void testFinalizeBlockClearAfterCloseContainer(ContainerID containerId) + throws InterruptedException, TimeoutException { + OzoneTestUtils.closeAllContainers(cluster.getStorageContainerManager().getEventQueue(), cluster.getStorageContainerManager()); + // Finalize Block should be cleared from container data. + GenericTestUtils.waitFor(() -> ( + (KeyValueContainerData) getContainerfromDN(cluster.getHddsDatanodes().get(0), + containerId.getId()).getContainerData()).getFinalizedBlockSet().size() == 0, + 100, 10 * 1000); try { - // Finalize Block should be cleared from container data. - GenericTestUtils.waitFor(() -> - ((KeyValueContainerData) - getContainerfromDN(cluster.getHddsDatanodes().get(0), - containerId.getId()).getContainerData()) - .getFinalizedBlockSet().size() == 0, - 500, 5 * 1000); - // Restart DataNode cluster.restartHddsDatanode(0, true); } catch (Exception e) { + fail("Fail to restart Datanode"); } // After DN restart also there should not be any finalizeBlock @@ -285,53 +260,18 @@ public void testFinalizeBlockClearAfterCloseContainer() .getFinalizedBlockSet().size() == 0); } - @Test - public void testRejectPutAndWriteChunkAfterFinalizeBlock() + private void testRejectPutAndWriteChunkAfterFinalizeBlock(ContainerID containerId, Pipeline pipeline, + XceiverClientSpi xceiverClient, List omKeyLocationInfoGroupList) throws IOException { - String keyName = UUID.randomUUID().toString(); - // create key - createKey(keyName); - - ContainerID containerId = cluster.getStorageContainerManager() - .getContainerManager().getContainers().get(0).containerID(); - - OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) - .setBucketName(bucketName).setKeyName(keyName).setDataSize(0) - .build(); - List omKeyLocationInfoGroupList = - cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions(); - - ContainerInfo container = cluster.getStorageContainerManager() - .getContainerManager().getContainer(containerId); - Pipeline pipeline = cluster.getStorageContainerManager() - .getPipelineManager().getPipeline(container.getPipelineID()); - - XceiverClientManager xceiverClientManager = new XceiverClientManager(conf); - XceiverClientSpi xceiverClient = - xceiverClientManager.acquireClient(pipeline); - - // Before finalize block WRITE chunk on the same block should pass through - ContainerProtos.ContainerCommandRequestProto request = - ContainerTestHelper.getWriteChunkRequest(pipeline, ( - new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) - .getLocationList().get(0).getLocalID())), 100); - xceiverClient.sendCommand(request); - - // Before finalize block PUT block on the same block should pass through - request = ContainerTestHelper.getPutBlockRequest(request); - xceiverClient.sendCommand(request); - - // Now Finalize Block - request = getFinalizeBlockRequest(omKeyLocationInfoGroupList, container); - xceiverClient.sendCommand(request); - // Try doing WRITE chunk on the already finalized block - request = ContainerTestHelper.getWriteChunkRequest(pipeline, ( - new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) - .getLocationList().get(0).getLocalID())), 100); + ContainerProtos.ContainerCommandRequestProto request = + ContainerTestHelper.getWriteChunkRequest(pipeline, + (new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0) + .getLocationList().get(0).getLocalID())), 100); try { xceiverClient.sendCommand(request); + fail("Write chunk should fail."); } catch (IOException e) { assertTrue(e.getCause().getMessage() .contains("Block already finalized")); @@ -341,6 +281,7 @@ public void testRejectPutAndWriteChunkAfterFinalizeBlock() request = ContainerTestHelper.getPutBlockRequest(request); try { xceiverClient.sendCommand(request); + fail("Put block should fail."); } catch (IOException e) { assertTrue(e.getCause().getMessage() .contains("Block already finalized"));