diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index 857cbfb6eef9..06885ed3dc64 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -487,6 +487,7 @@ public static boolean requireBlockToken( case PutSmallFile: case ReadChunk: case WriteChunk: + case FinalizeBlock: return true; default: return false; @@ -566,6 +567,11 @@ public static BlockID getBlockID(ContainerCommandRequestProtoOrBuilder msg) { blockID = msg.getWriteChunk().getBlockID(); } break; + case FinalizeBlock: + if (msg.hasFinalizeBlock()) { + blockID = msg.getFinalizeBlock().getBlockID(); + } + break; default: break; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index d20b5e8f7aac..c85405566ca5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator; @@ -263,6 +264,36 @@ public static XceiverClientReply putBlockAsync(XceiverClientSpi xceiverClient, return xceiverClient.sendCommandAsync(request); } + /** + * Calls the container protocol to finalize a container block. + * + * @param xceiverClient client to perform call + * @param blockID block ID to identify block + * @param token a token for this block (may be null) + * @return FinalizeBlockResponseProto + * @throws IOException if there is an I/O error while performing the call + */ + public static ContainerProtos.FinalizeBlockResponseProto finalizeBlock( + XceiverClientSpi xceiverClient, DatanodeBlockID blockID, + Token token) + throws IOException { + FinalizeBlockRequestProto.Builder finalizeBlockRequest = + FinalizeBlockRequestProto.newBuilder().setBlockID(blockID); + String id = xceiverClient.getPipeline().getFirstNode().getUuidString(); + ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.FinalizeBlock) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(id) + .setFinalizeBlock(finalizeBlockRequest); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); + } + ContainerCommandRequestProto request = builder.build(); + ContainerCommandResponseProto response = + xceiverClient.sendCommand(request, getValidatorList()); + return response.getFinalizeBlock(); + } + public static ContainerCommandRequestProto getPutBlockRequest( Pipeline pipeline, BlockData containerBlockData, boolean eof, String tokenString) throws IOException { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 6da77e4602a9..37741f8cff9e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -398,6 +398,7 @@ private OzoneConsts() { /** Metadata stored in OmKeyInfo. */ public static final String HSYNC_CLIENT_ID = "hsyncClientId"; public static final String LEASE_RECOVERY = "leaseRecovery"; + public static final String FORCE_LEASE_RECOVERY_ENV = "OZONE.CLIENT.RECOVER.LEASE.FORCE"; //GDPR public static final String GDPR_FLAG = "gdprEnabled"; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index c36b00791002..30496ce51a02 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -97,7 +97,6 @@ import org.apache.ratis.util.TaskQueue; import org.apache.ratis.util.function.CheckedSupplier; import org.apache.ratis.util.JavaUtils; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -423,68 +422,60 @@ public TransactionContext startTransaction(RaftClientRequest request) } return builder.build().setException(ioe); } + + boolean blockAlreadyFinalized = false; if (proto.getCmdType() == Type.PutBlock) { - TransactionContext ctxt = rejectRequest(request, - proto.getContainerID(), proto.getPutBlock().getBlockData() - .getBlockID().getLocalID()); - if (ctxt != null) { - return ctxt; - } + blockAlreadyFinalized = shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID()); } else if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); - TransactionContext ctxt = rejectRequest(request, - proto.getContainerID(), write.getBlockID().getLocalID()); - if (ctxt != null) { - return ctxt; + blockAlreadyFinalized = shouldRejectRequest(write.getBlockID()); + if (!blockAlreadyFinalized) { + // create the log entry proto + final WriteChunkRequestProto commitWriteChunkProto = + WriteChunkRequestProto.newBuilder() + .setBlockID(write.getBlockID()) + .setChunkData(write.getChunkData()) + // skipping the data field as it is + // already set in statemachine data proto + .build(); + ContainerCommandRequestProto commitContainerCommandProto = + ContainerCommandRequestProto + .newBuilder(proto) + .setPipelineID(gid.getUuid().toString()) + .setWriteChunk(commitWriteChunkProto) + .setTraceID(proto.getTraceID()) + .build(); + Preconditions.checkArgument(write.hasData()); + Preconditions.checkArgument(!write.getData().isEmpty()); + + final Context context = new Context(proto, commitContainerCommandProto); + return builder + .setStateMachineContext(context) + .setStateMachineData(write.getData()) + .setLogData(commitContainerCommandProto.toByteString()) + .build(); } - // create the log entry proto - final WriteChunkRequestProto commitWriteChunkProto = - WriteChunkRequestProto.newBuilder() - .setBlockID(write.getBlockID()) - .setChunkData(write.getChunkData()) - // skipping the data field as it is - // already set in statemachine data proto - .build(); - ContainerCommandRequestProto commitContainerCommandProto = - ContainerCommandRequestProto - .newBuilder(proto) - .setPipelineID(gid.getUuid().toString()) - .setWriteChunk(commitWriteChunkProto) - .setTraceID(proto.getTraceID()) - .build(); - Preconditions.checkArgument(write.hasData()); - Preconditions.checkArgument(!write.getData().isEmpty()); - - final Context context = new Context(proto, commitContainerCommandProto); - return builder - .setStateMachineContext(context) - .setStateMachineData(write.getData()) - .setLogData(commitContainerCommandProto.toByteString()) - .build(); } else if (proto.getCmdType() == Type.FinalizeBlock) { containerController.addFinalizedBlock(proto.getContainerID(), proto.getFinalizeBlock().getBlockID().getLocalID()); } - final Context context = new Context(proto, proto); - return builder - .setStateMachineContext(context) - .setLogData(proto.toByteString()) - .build(); - } - @Nullable - private TransactionContext rejectRequest(RaftClientRequest request, - long containerId, long localId) { - if (containerController.isFinalizedBlockExist(containerId, localId)) { - TransactionContext ctxt = TransactionContext.newBuilder() - .setClientRequest(request) - .setStateMachine(this) - .setServerRole(RaftPeerRole.LEADER) + if (blockAlreadyFinalized) { + TransactionContext transactionContext = builder.build(); + transactionContext.setException(new StorageContainerException("Block already finalized", + ContainerProtos.Result.BLOCK_ALREADY_FINALIZED)); + return transactionContext; + } else { + final Context context = new Context(proto, proto); + return builder + .setStateMachineContext(context) + .setLogData(proto.toByteString()) .build(); - ctxt.setException(new IOException("Block already finalized")); - return ctxt; } - return null; + } + + private boolean shouldRejectRequest(ContainerProtos.DatanodeBlockID blockID) { + return containerController.isFinalizedBlockExist(blockID.getContainerID(), blockID.getLocalID()); } private static ContainerCommandRequestProto getContainerCommandRequestProto( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 8b6ecb43f4b9..6e817fdce98a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.utils.FaultInjector; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Checksum; @@ -139,6 +140,7 @@ public class KeyValueHandler extends Handler { private final boolean validateChunkChecksumData; // A striped lock that is held during container creation. private final Striped containerCreationLocks; + private static FaultInjector injector; public KeyValueHandler(ConfigurationSource config, String datanodeId, @@ -567,6 +569,10 @@ ContainerCommandResponseProto handlePutBlock( ContainerCommandResponseProto handleFinalizeBlock( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + ContainerCommandResponseProto responseProto = checkFaultInjector(request); + if (responseProto != null) { + return responseProto; + } if (!request.hasFinalizeBlock()) { if (LOG.isDebugEnabled()) { @@ -646,6 +652,12 @@ ContainerCommandResponseProto handleGetBlock( */ ContainerCommandResponseProto handleGetCommittedBlockLength( ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + + ContainerCommandResponseProto responseProto = checkFaultInjector(request); + if (responseProto != null) { + return responseProto; + } + if (!request.hasGetCommittedBlockLength()) { if (LOG.isDebugEnabled()) { LOG.debug("Malformed Get Key request. trace ID: {}", @@ -1444,8 +1456,34 @@ private void triggerVolumeScanAndThrowException(Container container, throw new StorageContainerException(msg, result); } + private ContainerCommandResponseProto checkFaultInjector(ContainerCommandRequestProto request) { + if (injector != null) { + Throwable ex = injector.getException(); + if (ex != null) { + // reset injector + injector = null; + return ContainerUtils.logAndReturnError(LOG, (StorageContainerException) ex, request); + } + try { + injector.pause(); + } catch (IOException e) { + // do nothing + } + } + return null; + } + public static Logger getLogger() { return LOG; } + @VisibleForTesting + public static FaultInjector getInjector() { + return injector; + } + + @VisibleForTesting + public static void setInjector(FaultInjector instance) { + injector = instance; + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java index 27957d162a55..32076abb3fb3 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java @@ -40,4 +40,13 @@ public void resume() throws IOException { @VisibleForTesting public void reset() throws IOException { } + + @VisibleForTesting + public void setException(Throwable e) { + } + + @VisibleForTesting + public Throwable getException() { + return null; + } } diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 367238a28552..0206a8ea71d4 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -156,6 +156,7 @@ enum Result { DELETE_ON_NON_EMPTY_CONTAINER = 44; EXPORT_CONTAINER_METADATA_FAILED = 45; IMPORT_CONTAINER_METADATA_FAILED = 46; + BLOCK_ALREADY_FINALIZED = 47; } /** diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java index 824346058dec..1436c69bad47 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java @@ -1100,10 +1100,11 @@ EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp, * @param volumeName - The volume name. * @param bucketName - The bucket name. * @param keyName - The key user want to recover. - * @return OmKeyInfo KeyInfo is file under recovery + * @param force - force recover the file. + * @return OmKeyInfo KeyInfo of file under recovery * @throws IOException if an error occurs */ - List recoverLease(String volumeName, String bucketName, String keyName) throws IOException; + OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) throws IOException; /** * Update modification time and access time of a file. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index 84c33db925eb..21e2f7455ca0 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -2447,12 +2447,14 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp, } @Override - public List recoverLease(String volumeName, String bucketName, String keyName) throws IOException { + public OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) + throws IOException { RecoverLeaseRequest recoverLeaseRequest = RecoverLeaseRequest.newBuilder() .setVolumeName(volumeName) .setBucketName(bucketName) .setKeyName(keyName) + .setForce(force) .build(); OMRequest omRequest = createOMRequest(Type.RecoverLease) @@ -2460,10 +2462,8 @@ public List recoverLease(String volumeName, String bucketName, String RecoverLeaseResponse recoverLeaseResponse = handleError(submitRequest(omRequest)).getRecoverLeaseResponse(); - ArrayList list = new ArrayList(); - list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo())); - list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo())); - return list; + + return OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java index 2285c4a59022..68c2d43471db 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java @@ -17,31 +17,50 @@ */ package org.apache.hadoop.fs.ozone; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.XceiverClientGrpc; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneTestUtils; import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.utils.FaultInjectorImpl; +import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.event.Level; import java.io.IOException; import java.io.OutputStream; +import java.net.ConnectException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY; +import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; @@ -65,17 +84,18 @@ public class TestLeaseRecovery { private OzoneClient client; private final OzoneConfiguration conf = new OzoneConfiguration(); + private String dir; + private Path file; /** * Closing the output stream after lease recovery throws because the key * is no longer open in OM. This is currently expected (see HDDS-9358). */ - public static void closeIgnoringKeyNotFound(OutputStream stream) - throws IOException { + public static void closeIgnoringKeyNotFound(OutputStream stream) { try { stream.close(); - } catch (OMException e) { - assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, e.getResult()); + } catch (IOException e) { + assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ((OMException)e).getResult()); } } @@ -92,8 +112,10 @@ public void init() throws IOException, InterruptedException, conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name()); conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s"); + // make sure flush will write data to DN + conf.setBoolean("ozone.client.stream.buffer.flush.delay", false); cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(5) + .setNumDatanodes(3) .setTotalPipelineNumLimit(10) .setBlockSize(blockSize) .setChunkSize(chunkSize) @@ -109,6 +131,14 @@ public void init() throws IOException, InterruptedException, // create a volume and a bucket to be used by OzoneFileSystem bucket = TestDataUtil.createVolumeAndBucket(client, layout); + + GenericTestUtils.setLogLevel(XceiverClientGrpc.getLogger(), Level.DEBUG); + + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY)); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); + file = new Path(dir, "file"); } @AfterEach @@ -119,21 +149,12 @@ public void tearDown() { } } - @Test - public void testRecovery() throws Exception { - // Set the fs.defaultFS - final String rootPath = String.format("%s://%s/", - OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY)); - conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - - final String dir = OZONE_ROOT + bucket.getVolumeName() - + OZONE_URI_DELIMITER + bucket.getName(); - final Path file = new Path(dir, "file"); - + @ParameterizedTest + @ValueSource(ints = {1 << 20, (1 << 20) + 1, (1 << 20) - 1}) + public void testRecovery(int dataSize) throws Exception { RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf); - final byte[] data = new byte[1 << 20]; - ThreadLocalRandom.current().nextBytes(data); + final byte[] data = getData(dataSize); final FSDataOutputStream stream = fs.create(file, true); try { @@ -141,6 +162,10 @@ public void testRecovery() throws Exception { stream.hsync(); assertFalse(fs.isFileClosed(file)); + // write more data without hsync + stream.write(data); + stream.flush(); + int count = 0; while (count++ < 15 && !fs.recoverLease(file)) { Thread.sleep(1000); @@ -155,12 +180,7 @@ public void testRecovery() throws Exception { } // open it again, make sure the data is correct - byte[] readData = new byte[1 << 20]; - try (FSDataInputStream fdis = fs.open(file)) { - int readBytes = fdis.read(readData); - assertEquals(readBytes, 1 << 20); - assertArrayEquals(readData, data); - } + verifyData(data, dataSize * 2, file, fs); } @Test @@ -172,11 +192,263 @@ public void testOBSRecoveryShouldFail() throws Exception { conf.get(OZONE_OM_ADDRESS_KEY)); conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); - final String dir = OZONE_ROOT + bucket.getVolumeName() + + final String directory = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); - final Path file = new Path(dir, "file"); + final Path f = new Path(directory, "file"); RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf); - assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(file)); + assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(f)); + } + + @Test + public void testFinalizeBlockFailure() throws Exception { + RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf); + int dataSize = 100; + final byte[] data = getData(dataSize); + + final FSDataOutputStream stream = fs.create(file, true); + try { + stream.write(data); + stream.hsync(); + assertFalse(fs.isFileClosed(file)); + + // write more data without hsync + stream.write(data); + stream.flush(); + + FaultInjectorImpl injector = new FaultInjectorImpl(); + KeyValueHandler.setInjector(injector); + StorageContainerException sce = new StorageContainerException( + "Requested operation not allowed as ContainerState is CLOSED", + ContainerProtos.Result.CLOSED_CONTAINER_IO); + injector.setException(sce); + GenericTestUtils.LogCapturer logs = + GenericTestUtils.LogCapturer.captureLogs(BasicRootedOzoneClientAdapterImpl.LOG); + + fs.recoverLease(file); + assertTrue(logs.getOutput().contains("Failed to execute finalizeBlock command")); + assertTrue(logs.getOutput().contains("Requested operation not allowed as ContainerState is CLOSED")); + + // The lease should have been recovered. + assertTrue(fs.isFileClosed(file), "File should be closed"); + FileStatus fileStatus = fs.getFileStatus(file); + assertEquals(dataSize * 2, fileStatus.getLen()); + } finally { + closeIgnoringKeyNotFound(stream); + } + + // open it again, make sure the data is correct + verifyData(data, dataSize * 2, file, fs); + } + + @Test + public void testBlockPipelineClosed() throws Exception { + RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf); + int dataSize = 100; + final byte[] data = getData(dataSize); + + final FSDataOutputStream stream = fs.create(file, true); + try { + stream.write(data); + stream.hsync(); + assertFalse(fs.isFileClosed(file)); + + // write more data without hsync + stream.write(data); + stream.flush(); + + // close the pipeline + StorageContainerManager scm = cluster.getStorageContainerManager(); + ContainerInfo container = scm.getContainerManager().getContainers().get(0); + OzoneTestUtils.closeContainer(scm, container); + GenericTestUtils.waitFor(() -> { + try { + return scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed(); + } catch (PipelineNotFoundException e) { + throw new RuntimeException(e); + } + }, 200, 30000); + + fs.recoverLease(file); + + // The lease should have been recovered. + assertTrue(fs.isFileClosed(file), "File should be closed"); + FileStatus fileStatus = fs.getFileStatus(file); + assertEquals(dataSize * 2, fileStatus.getLen()); + } finally { + closeIgnoringKeyNotFound(stream); + } + + // open it again, make sure the data is correct + verifyData(data, dataSize * 2, file, fs); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testGetCommittedBlockLengthTimeout(boolean forceRecovery) throws Exception { + // reduce read timeout + conf.set(OZONE_CLIENT_READ_TIMEOUT, "2s"); + // set force recovery + System.setProperty(FORCE_LEASE_RECOVERY_ENV, String.valueOf(forceRecovery)); + RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf); + int dataSize = 100; + final byte[] data = getData(dataSize); + + final FSDataOutputStream stream = fs.create(file, true); + try { + stream.write(data); + stream.hsync(); + assertFalse(fs.isFileClosed(file)); + + // write more data without hsync + stream.write(data); + stream.flush(); + + // close the pipeline and container + ContainerInfo container = cluster.getStorageContainerManager().getContainerManager().getContainers().get(0); + OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), container); + // pause getCommittedBlockLength handling on all DNs to make sure all getCommittedBlockLength will time out + FaultInjectorImpl injector = new FaultInjectorImpl(); + KeyValueHandler.setInjector(injector); + GenericTestUtils.LogCapturer logs = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger()); + if (!forceRecovery) { + assertThrows(IOException.class, () -> fs.recoverLease(file)); + return; + } else { + fs.recoverLease(file); + } + assertEquals(3, StringUtils.countMatches(logs.getOutput(), + "Executing command cmdType: GetCommittedBlockLength")); + + // The lease should have been recovered. + assertTrue(fs.isFileClosed(file), "File should be closed"); + FileStatus fileStatus = fs.getFileStatus(file); + // Since all DNs are out, then the length in OM keyInfo will be used as the final file length + assertEquals(dataSize, fileStatus.getLen()); + } finally { + closeIgnoringKeyNotFound(stream); + KeyValueHandler.setInjector(null); + } + + // open it again, make sure the data is correct + verifyData(data, dataSize, file, fs); + } + + @Test + public void testGetCommittedBlockLengthWithException() throws Exception { + RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf); + int dataSize = 100; + final byte[] data = getData(dataSize); + + final FSDataOutputStream stream = fs.create(file, true); + try { + stream.write(data); + stream.hsync(); + assertFalse(fs.isFileClosed(file)); + + // write more data without hsync + stream.write(data); + stream.flush(); + + // close the pipeline and container + ContainerInfo container = cluster.getStorageContainerManager().getContainerManager().getContainers().get(0); + OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), container); + // throw exception on first DN getCommittedBlockLength handling + FaultInjectorImpl injector = new FaultInjectorImpl(); + KeyValueHandler.setInjector(injector); + StorageContainerException sce = new StorageContainerException( + "ContainerID " + container.getContainerID() + " does not exist", + ContainerProtos.Result.CONTAINER_NOT_FOUND); + injector.setException(sce); + + GenericTestUtils.LogCapturer logs = + GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger()); + fs.recoverLease(file); + + assertEquals(2, StringUtils.countMatches(logs.getOutput(), + "Executing command cmdType: GetCommittedBlockLength")); + assertEquals(1, StringUtils.countMatches(logs.getOutput(), + "Failed to execute command cmdType: GetCommittedBlockLength")); + + // The lease should have been recovered. + assertTrue(fs.isFileClosed(file), "File should be closed"); + FileStatus fileStatus = fs.getFileStatus(file); + assertEquals(dataSize * 2, fileStatus.getLen()); + } finally { + closeIgnoringKeyNotFound(stream); + KeyValueHandler.setInjector(null); + } + + // open it again, make sure the data is correct + verifyData(data, dataSize * 2, file, fs); + } + + @Test + public void testOMConnectionFailure() throws Exception { + // reduce hadoop RPC retry max attempts + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 5); + conf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100); + RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf); + int dataSize = 100; + final byte[] data = getData(dataSize); + + final FSDataOutputStream stream = fs.create(file, true); + try { + stream.write(data); + stream.hsync(); + assertFalse(fs.isFileClosed(file)); + + // close OM + cluster.getOzoneManager().stop(); + assertThrows(ConnectException.class, () -> fs.recoverLease(file)); + } finally { + try { + stream.close(); + } catch (Throwable e) { + } + cluster.getOzoneManager().restart(); + cluster.waitForClusterToBeReady(); + assertTrue(fs.recoverLease(file)); + } + } + + @Test + public void testRecoverWrongFile() throws Exception { + final Path notExistFile = new Path(dir, "file1"); + + RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf); + int dataSize = 100; + final byte[] data = getData(dataSize); + + final FSDataOutputStream stream = fs.create(file, true); + try { + stream.write(data); + stream.hsync(); + assertFalse(fs.isFileClosed(file)); + + assertThrows(OMException.class, () -> fs.recoverLease(notExistFile)); + } finally { + closeIgnoringKeyNotFound(stream); + } + } + + private void verifyData(byte[] data, int dataSize, Path filePath, RootedOzoneFileSystem fs) throws IOException { + try (FSDataInputStream fdis = fs.open(filePath)) { + int bufferSize = dataSize > data.length ? dataSize / 2 : dataSize; + while (dataSize > 0) { + byte[] readData = new byte[bufferSize]; + int readBytes = fdis.read(readData); + assertEquals(readBytes, bufferSize); + assertArrayEquals(readData, data); + dataSize -= bufferSize; + } + } + } + + private byte[] getData(int dataSize) { + final byte[] data = new byte[dataSize]; + ThreadLocalRandom.current().nextBytes(data); + return data; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java index d89e6a6c3600..c4b027074ff4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java @@ -158,7 +158,7 @@ public static void closeContainer(StorageContainerManager scm, throws IOException, TimeoutException, InterruptedException { Pipeline pipeline = scm.getPipelineManager() .getPipeline(container.getPipelineID()); - scm.getPipelineManager().closePipeline(pipeline, false); + scm.getPipelineManager().closePipeline(pipeline, true); GenericTestUtils.waitFor(() -> container.getState() == HddsProtos.LifeCycleState.CLOSED, 200, 30000); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java index e8aece20f4fd..3301320c0055 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java @@ -18,6 +18,11 @@ package org.apache.hadoop.ozone.client.rpc; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ozone.RootedOzoneFileSystem; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -47,10 +52,13 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.S3SecretManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -67,15 +75,27 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED; +import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -126,6 +146,7 @@ public static void init() throws Exception { // constructed. conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT, OMConfigKeys.OZONE_BUCKET_LAYOUT_OBJECT_STORE); + conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(14) .setScmId(SCM_ID) @@ -159,7 +180,7 @@ public static void init() throws Exception { public void testPutKeySuccessWithBlockToken() throws Exception { testPutKeySuccessWithBlockTokenWithBucketLayout(BucketLayout.OBJECT_STORE); testPutKeySuccessWithBlockTokenWithBucketLayout( - BucketLayout.FILE_SYSTEM_OPTIMIZED); + FILE_SYSTEM_OPTIMIZED); } private void testPutKeySuccessWithBlockTokenWithBucketLayout( @@ -230,6 +251,148 @@ private void testPutKeySuccessWithBlockTokenWithBucketLayout( } } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFileRecovery(boolean forceRecovery) throws Exception { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + String value = "sample value"; + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName, + new BucketArgs.Builder().setBucketLayout(FILE_SYSTEM_OPTIMIZED).build()); + OzoneBucket bucket = volume.getBucket(bucketName); + + String keyName = UUID.randomUUID().toString(); + final String dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); + final Path file = new Path(dir, keyName); + + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY)); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + // force recovery file + System.setProperty(FORCE_LEASE_RECOVERY_ENV, String.valueOf(forceRecovery)); + conf.setBoolean(String.format("fs.%s.impl.disable.cache", OZONE_OFS_URI_SCHEME), true); + RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf); + OzoneOutputStream out = null; + try { + out = bucket.createKey(keyName, value.getBytes(UTF_8).length, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + out.write(value.getBytes(UTF_8)); + out.hsync(); + + if (forceRecovery) { + fs.recoverLease(file); + } else { + assertThrows(OMException.class, () -> fs.recoverLease(file)); + } + } finally { + if (out != null) { + if (forceRecovery) { + // close failure because the key is already committed + assertThrows(OMException.class, out::close); + } else { + out.close(); + } + } + } + } + + @ParameterizedTest + @ValueSource(ints = {1 << 24, (1 << 24) + 1, (1 << 24) - 1}) + public void testPreallocateFileRecovery(long dataSize) throws Exception { + cleanupDeletedTable(); + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + + final byte[] data = new byte[(int) dataSize]; + ThreadLocalRandom.current().nextBytes(data); + + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + long nsQuota = 100; + long spaceQuota = 1 * 1024 * 1024 * 1024; + volume.createBucket(bucketName, new BucketArgs.Builder().setBucketLayout(FILE_SYSTEM_OPTIMIZED) + .setQuotaInNamespace(nsQuota).setQuotaInBytes(spaceQuota).build()); + OzoneBucket bucket = volume.getBucket(bucketName); + + String keyName = UUID.randomUUID().toString(); + final String dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + bucket.getName(); + final Path file = new Path(dir, keyName); + + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY)); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf); + OzoneOutputStream out = null; + long totalBlock = 10; + long usedBlock = (dataSize - 1) / fs.getDefaultBlockSize() + 1; + long fileSize = fs.getDefaultBlockSize() * totalBlock; + OMMetrics metrics = ozoneManager.getMetrics(); + long committedBytes = metrics.getDataCommittedBytes(); + try { + out = bucket.createKey(keyName, fileSize, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + // init used quota check + bucket = volume.getBucket(bucketName); + assertEquals(0, bucket.getUsedNamespace()); + assertEquals(0, bucket.getUsedBytes()); + + out.write(data); + out.hsync(); + fs.recoverLease(file); + + // check file length + FileStatus fileStatus = fs.getFileStatus(file); + assertEquals(dataSize, fileStatus.getLen()); + // check committed bytes + assertEquals(committedBytes + dataSize, + ozoneManager.getMetrics().getDataCommittedBytes()); + // check used quota + bucket = volume.getBucket(bucketName); + assertEquals(1, bucket.getUsedNamespace()); + assertEquals(dataSize * ReplicationFactor.THREE.getValue(), bucket.getUsedBytes()); + + // check unused pre-allocated blocks are reclaimed + Table deletedTable = ozoneManager.getMetadataManager().getDeletedTable(); + try (TableIterator> + keyIter = deletedTable.iterator()) { + while (keyIter.hasNext()) { + Table.KeyValue kv = keyIter.next(); + OmKeyInfo key = kv.getValue().getOmKeyInfoList().get(0); + assertEquals(totalBlock - usedBlock, key.getKeyLocationVersions().get(0).getLocationListCount()); + } + } + } finally { + if (out != null) { + // close failure because the key is already committed + assertThrows(OMException.class, out::close); + } + } + } + + private void cleanupDeletedTable() throws IOException { + Table deletedTable = ozoneManager.getMetadataManager().getDeletedTable(); + List nameList = new ArrayList<>(); + try (TableIterator> + keyIter = deletedTable.iterator()) { + while (keyIter.hasNext()) { + Table.KeyValue kv = keyIter.next(); + nameList.add(kv.getKey()); + } + } + nameList.forEach(k -> { + try { + deletedTable.delete(k); + } catch (IOException e) { + // do nothing + } + }); + } + private void assertTokenIsNull(OmKeyInfo value) { value.getKeyLocationVersions() .forEach( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java index 1c066902eedc..784c7df8937e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java @@ -23,6 +23,8 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.LeaseRecoverable; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -41,7 +43,6 @@ import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; @@ -49,6 +50,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for LeaseRecoverer. @@ -71,6 +73,9 @@ public static void init() throws Exception { conf = new OzoneConfiguration(); conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true); conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s"); + OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setStreamBufferFlushDelay(false); + conf.setFromObject(clientConfig); String clusterId = UUID.randomUUID().toString(); String scmId = UUID.randomUUID().toString(); String omId = UUID.randomUUID().toString(); @@ -128,18 +133,21 @@ public void testCLI() throws IOException { // make sure file is visible and closed FileStatus fileStatus = fs.getFileStatus(file); assertEquals(dataSize, fileStatus.getLen()); - // make sure the writer can not write again. - // TODO: write does not fail here. Looks like a bug. HDDS-8439 to fix it. + // write data os.write(data); + // flush should fail since flush will call writeChunk and putBlock + assertThrows(IOException.class, os::flush); + fileStatus = fs.getFileStatus(file); assertEquals(dataSize, fileStatus.getLen()); // make sure hsync fails - assertThrows(OMException.class, os::hsync); + assertThrows(IOException.class, os::hsync); // make sure length remains the same fileStatus = fs.getFileStatus(file); assertEquals(dataSize, fileStatus.getLen()); - // make sure close fails - assertThrows(OMException.class, os::close); + // close succeeds since it's already closed in failure handling of flush + assertTrue(((LeaseRecoverable)fs).isFileClosed(file)); + os.close(); // make sure length remains the same fileStatus = fs.getFileStatus(file); assertEquals(dataSize, fileStatus.getLen()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index 71c6bc1c9754..fdb363dbc712 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -50,6 +50,7 @@ import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.utils.FaultInjectorImpl; import org.apache.ozone.test.GenericTestUtils; import org.apache.ozone.test.tag.Unhealthy; import org.apache.ratis.server.protocol.TermIndex; @@ -79,7 +80,6 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -402,7 +402,7 @@ public void testInstallIncrementalSnapshot(@TempDir Path tempDir) OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); // Set fault injector to pause before install - FaultInjector faultInjector = new SnapshotPauseInjector(); + FaultInjector faultInjector = new FaultInjectorImpl(); followerOM.getOmSnapshotProvider().setInjector(faultInjector); // Do some transactions so that the log index increases @@ -611,7 +611,7 @@ public void testInstallIncrementalSnapshotWithFailure() throws Exception { OzoneManager followerOM = cluster.getOzoneManager(followerNodeId); // Set fault injector to pause before install - FaultInjector faultInjector = new SnapshotPauseInjector(); + FaultInjector faultInjector = new FaultInjectorImpl(); followerOM.getOmSnapshotProvider().setInjector(faultInjector); // Do some transactions so that the log index increases @@ -1127,48 +1127,6 @@ public void exitSystem(int status, String message, Throwable throwable, } } - private static class SnapshotPauseInjector extends FaultInjector { - private CountDownLatch ready; - private CountDownLatch wait; - - SnapshotPauseInjector() { - init(); - } - - @Override - public void init() { - this.ready = new CountDownLatch(1); - this.wait = new CountDownLatch(1); - } - - @Override - public void pause() throws IOException { - ready.countDown(); - try { - wait.await(); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - @Override - public void resume() throws IOException { - // Make sure injector pauses before resuming. - try { - ready.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - assertTrue(Fail.fail("resume interrupted")); - } - wait.countDown(); - } - - @Override - public void reset() throws IOException { - init(); - } - } - // Interrupts the tarball download process to test creation of // multiple tarballs as needed when the tarball size exceeds the // max. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java new file mode 100644 index 000000000000..8656811fa87c --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.utils; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.utils.FaultInjector; +import org.assertj.core.api.Fail; +import org.junit.jupiter.api.Assertions; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** + * A general FaultInjector implementation. + */ +public class FaultInjectorImpl extends FaultInjector { + private CountDownLatch ready; + private CountDownLatch wait; + private Throwable ex; + + public FaultInjectorImpl() { + init(); + } + + @Override + public void init() { + this.ready = new CountDownLatch(1); + this.wait = new CountDownLatch(1); + } + + @Override + public void pause() throws IOException { + ready.countDown(); + try { + wait.await(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public void resume() throws IOException { + // Make sure injector pauses before resuming. + try { + ready.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + Assertions.assertTrue(Fail.fail("resume interrupted")); + } + wait.countDown(); + } + + @Override + public void reset() throws IOException { + init(); + } + + @VisibleForTesting + public void setException(Throwable e) { + ex = e; + } + + @VisibleForTesting + public Throwable getException() { + return ex; + } +} + diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index cf5630eceeb6..c8d804ae30b7 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -2082,7 +2082,6 @@ message RecoverLeaseRequest { message RecoverLeaseResponse { optional KeyInfo keyInfo = 1; - optional KeyInfo openKeyInfo = 2; } message SetTimesRequest { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 142b211b7386..516138607c7a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -4592,7 +4592,7 @@ public EchoRPCResponse echoRPCReq(byte[] payloadReq, int payloadSizeResp, } @Override - public List recoverLease(String volumeName, String bucketName, String keyName) { + public OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, boolean force) { return null; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java index f79baaf1cb79..addcc54977fc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java @@ -20,6 +20,8 @@ import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; @@ -31,6 +33,8 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmFSOFile; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; import org.apache.hadoop.ozone.om.response.OMClientResponse; @@ -40,6 +44,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT_DEFAULT; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverLease; @@ -55,7 +61,9 @@ import java.io.IOException; import java.nio.file.InvalidPathException; +import java.util.EnumSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -200,7 +208,7 @@ private RecoverLeaseResponse doWork(OzoneManager ozoneManager, keyInfo = getKey(dbFileKey); if (keyInfo == null) { - throw new OMException("Key:" + keyName + " not found in keyTable", KEY_NOT_FOUND); + throw new OMException("Key:" + keyName + " not found in keyTable.", KEY_NOT_FOUND); } final String writerId = keyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID); @@ -215,6 +223,7 @@ private RecoverLeaseResponse doWork(OzoneManager ozoneManager, throw new OMException("Open Key " + dbOpenFileKey + " not found in openKeyTable", KEY_NOT_FOUND); } + long openKeyModificationTime = openKeyInfo.getModificationTime(); if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) { LOG.debug("Key: " + keyName + " is already under recovery"); } else { @@ -227,15 +236,44 @@ private RecoverLeaseResponse doWork(OzoneManager ozoneManager, openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true"); openKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled()); openKeyInfo.setModificationTime(Time.now()); - // Add to cache. + // add to cache. omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry( new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex, openKeyInfo)); } + // override key name with normalizedKeyPath keyInfo.setKeyName(keyName); openKeyInfo.setKeyName(keyName); - RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder() - .setOpenKeyInfo(openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true)) - .setKeyInfo(keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true)); + + OmKeyLocationInfoGroup keyLatestVersionLocations = keyInfo.getLatestVersionLocations(); + List keyLocationInfoList = keyLatestVersionLocations.getLocationList(); + OmKeyLocationInfoGroup openKeyLatestVersionLocations = openKeyInfo.getLatestVersionLocations(); + List openKeyLocationInfoList = openKeyLatestVersionLocations.getLocationList(); + OmKeyLocationInfo finalBlock; + boolean returnKeyInfo = true; + if (openKeyLocationInfoList.size() > keyLocationInfoList.size() && + openKeyModificationTime > keyInfo.getModificationTime()) { + finalBlock = openKeyLocationInfoList.get(openKeyLocationInfoList.size() - 1); + returnKeyInfo = false; + } else { + finalBlock = keyLocationInfoList.get(keyLocationInfoList.size() - 1); + } + + // set token to last block if enabled + if (ozoneManager.isGrpcBlockTokenEnabled()) { + String remoteUser = getRemoteUser().getShortUserName(); + OzoneBlockTokenSecretManager secretManager = ozoneManager.getBlockTokenSecretManager(); + finalBlock.setToken(secretManager.generateToken(remoteUser, finalBlock.getBlockID(), + EnumSet.of(READ, WRITE), finalBlock.getLength())); + } + + // refresh last block pipeline + ContainerWithPipeline containerWithPipeline = + ozoneManager.getScmClient().getContainerClient().getContainerWithPipeline(finalBlock.getContainerID()); + finalBlock.setPipeline(containerWithPipeline.getPipeline()); + + RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder(); + rb.setKeyInfo(returnKeyInfo ? keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true) : + openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true)); return rb.build(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java index 3e87984ac09f..c06aa186cc75 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java @@ -231,7 +231,7 @@ protected List< OmKeyLocationInfo > allocateBlock(ScmClient scmClient, /* Optimize ugi lookup for RPC operations to avoid a trip through * UGI.getCurrentUser which is synch'ed. */ - private UserGroupInformation getRemoteUser() throws IOException { + protected UserGroupInformation getRemoteUser() throws IOException { UserGroupInformation ugi = Server.getRemoteUser(); return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java index 128dca57ee1c..2f28c54516e5 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java @@ -26,7 +26,10 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.IOmMetadataReader; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; @@ -71,6 +74,7 @@ import org.apache.hadoop.ozone.om.ScmClient; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager; import org.apache.hadoop.util.Time; +import org.mockito.Mockito; import org.slf4j.event.Level; import static org.apache.hadoop.ozone.om.request.OMRequestTestUtils.setupReplicationConfigValidation; @@ -102,6 +106,7 @@ public class TestOMKeyRequest { protected ScmClient scmClient; protected OzoneBlockTokenSecretManager ozoneBlockTokenSecretManager; protected ScmBlockLocationProtocol scmBlockLocationProtocol; + protected StorageContainerLocationProtocol scmContainerLocationProtocol; protected OMPerformanceMetrics metrics; protected static final long CONTAINER_ID = 1000L; @@ -165,6 +170,9 @@ public void setup() throws Exception { when(ozoneManager.getOMServiceId()).thenReturn( UUID.randomUUID().toString()); when(scmClient.getBlockClient()).thenReturn(scmBlockLocationProtocol); + scmContainerLocationProtocol = Mockito.mock(StorageContainerLocationProtocol.class); + when(scmClient.getContainerClient()).thenReturn(scmContainerLocationProtocol); + when(ozoneManager.getKeyManager()).thenReturn(keyManager); when(ozoneManager.getAccessAuthorizer()) .thenReturn(new OzoneNativeAuthorizer()); @@ -205,6 +213,9 @@ public void setup() throws Exception { return allocatedBlocks; }); + ContainerWithPipeline containerWithPipeline = + new ContainerWithPipeline(Mockito.mock(ContainerInfo.class), pipeline); + when(scmContainerLocationProtocol.getContainerWithPipeline(anyLong())).thenReturn(containerWithPipeline); volumeName = UUID.randomUUID().toString(); bucketName = UUID.randomUUID().toString(); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java index 44b7a2c61e54..28812a5a1a9d 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java @@ -36,12 +36,20 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.fs.SafeModeAction; +import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.security.SecurityConfig; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.io.Text; @@ -56,6 +64,8 @@ import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.client.rpc.RpcClient; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -681,11 +691,11 @@ private SnapshotDiffReportOzone getSnapshotDiffReportOnceComplete( } @Override - public List recoverFilePrepare(final String pathStr) throws IOException { + public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force) throws IOException { incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1); return ozoneClient.getProxy().getOzoneManagerClient().recoverLease( - volume.getName(), bucket.getName(), pathStr); + volume.getName(), bucket.getName(), pathStr, force); } @Override @@ -695,6 +705,52 @@ public void recoverFile(OmKeyArgs keyArgs) throws IOException { ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L); } + @Override + public long finalizeBlock(OmKeyLocationInfo block) throws IOException { + incrementCounter(Statistic.INVOCATION_FINALIZE_BLOCK, 1); + RpcClient rpcClient = (RpcClient) ozoneClient.getProxy(); + XceiverClientFactory xceiverClientFactory = rpcClient.getXceiverClientManager(); + Pipeline pipeline = block.getPipeline(); + XceiverClientSpi client = null; + try { + // If pipeline is still open + if (pipeline.isOpen()) { + client = xceiverClientFactory.acquireClient(pipeline); + ContainerProtos.FinalizeBlockResponseProto finalizeBlockResponseProto = + ContainerProtocolCalls.finalizeBlock(client, block.getBlockID().getDatanodeBlockIDProtobuf(), + block.getToken()); + return BlockData.getFromProtoBuf(finalizeBlockResponseProto.getBlockData()).getSize(); + } + } catch (IOException e) { + LOG.warn("Failed to execute finalizeBlock command", e); + } finally { + if (client != null) { + xceiverClientFactory.releaseClient(client, false); + } + } + + // Try fetch block committed length from DN + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + if (!(replicationConfig instanceof ReplicatedReplicationConfig)) { + throw new IOException("ReplicationConfig type " + replicationConfig.getClass().getSimpleName() + + " is not supported in finalizeBlock"); + } + StandaloneReplicationConfig newConfig = StandaloneReplicationConfig.getInstance( + ((ReplicatedReplicationConfig) replicationConfig).getReplicationFactor()); + Pipeline.Builder builder = Pipeline.newBuilder().setReplicationConfig(newConfig).setId(PipelineID.randomId()) + .setNodes(block.getPipeline().getNodes()).setState(Pipeline.PipelineState.OPEN); + try { + client = xceiverClientFactory.acquireClientForReadData(builder.build()); + ContainerProtos.GetCommittedBlockLengthResponseProto responseProto = + ContainerProtocolCalls.getCommittedBlockLength(client, block.getBlockID(), block.getToken()); + return responseProto.getBlockLength(); + } finally { + if (client != null) { + xceiverClientFactory.releaseClient(client, false); + } + } + } + @Override public void setTimes(String key, long mtime, long atime) throws IOException { incrementCounter(Statistic.INVOCATION_SET_TIMES, 1); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java index ebedb716e6ca..e1ed85cff171 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java @@ -43,13 +43,21 @@ import org.apache.hadoop.fs.PathPermissionException; import org.apache.hadoop.fs.SafeModeAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.security.SecurityConfig; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.io.Text; @@ -67,6 +75,8 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; import org.apache.hadoop.ozone.client.BucketArgs; +import org.apache.hadoop.ozone.client.rpc.RpcClient; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -1354,14 +1364,14 @@ public boolean isFileClosed(String pathStr) throws IOException { } @Override - public List recoverFilePrepare(final String pathStr) throws IOException { + public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force) throws IOException { incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1); OFSPath ofsPath = new OFSPath(pathStr, config); OzoneVolume volume = objectStore.getVolume(ofsPath.getVolumeName()); OzoneBucket bucket = getBucket(ofsPath, false); return ozoneClient.getProxy().getOzoneManagerClient().recoverLease( - volume.getName(), bucket.getName(), ofsPath.getKeyName()); + volume.getName(), bucket.getName(), ofsPath.getKeyName(), force); } @Override @@ -1371,6 +1381,52 @@ public void recoverFile(OmKeyArgs keyArgs) throws IOException { ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L); } + @Override + public long finalizeBlock(OmKeyLocationInfo block) throws IOException { + incrementCounter(Statistic.INVOCATION_FINALIZE_BLOCK, 1); + RpcClient rpcClient = (RpcClient) ozoneClient.getProxy(); + XceiverClientFactory xceiverClientFactory = rpcClient.getXceiverClientManager(); + Pipeline pipeline = block.getPipeline(); + XceiverClientSpi client = null; + try { + // If pipeline is still open + if (pipeline.isOpen()) { + client = xceiverClientFactory.acquireClient(pipeline); + ContainerProtos.FinalizeBlockResponseProto finalizeBlockResponseProto = + ContainerProtocolCalls.finalizeBlock(client, block.getBlockID().getDatanodeBlockIDProtobuf(), + block.getToken()); + return BlockData.getFromProtoBuf(finalizeBlockResponseProto.getBlockData()).getSize(); + } + } catch (IOException e) { + LOG.warn("Failed to execute finalizeBlock command", e); + } finally { + if (client != null) { + xceiverClientFactory.releaseClient(client, false); + } + } + + // Try fetch block committed length from DN + ReplicationConfig replicationConfig = pipeline.getReplicationConfig(); + if (!(replicationConfig instanceof ReplicatedReplicationConfig)) { + throw new IOException("ReplicationConfig type " + replicationConfig.getClass().getSimpleName() + + " is not supported in finalizeBlock"); + } + StandaloneReplicationConfig newConfig = StandaloneReplicationConfig.getInstance( + ((ReplicatedReplicationConfig) replicationConfig).getReplicationFactor()); + Pipeline.Builder builder = Pipeline.newBuilder().setReplicationConfig(newConfig).setId(PipelineID.randomId()) + .setNodes(block.getPipeline().getNodes()).setState(Pipeline.PipelineState.OPEN); + try { + client = xceiverClientFactory.acquireClientForReadData(builder.build()); + ContainerProtos.GetCommittedBlockLengthResponseProto responseProto = + ContainerProtocolCalls.getCommittedBlockLength(client, block.getBlockID(), block.getToken()); + return responseProto.getBlockLength(); + } finally { + if (client != null) { + xceiverClientFactory.releaseClient(client, false); + } + } + } + @Override public void setTimes(String key, long mtime, long atime) throws IOException { incrementCounter(Statistic.INVOCATION_SET_TIMES, 1); diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java index dbce02dd5665..c7444a389d9b 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -97,10 +98,12 @@ SnapshotDiffReport getSnapshotDiffReport(Path snapshotDir, String fromSnapshot, String toSnapshot) throws IOException, InterruptedException; - List recoverFilePrepare(String pathStr) throws IOException; + OmKeyInfo recoverFilePrepare(String pathStr, boolean force) throws IOException; void recoverFile(OmKeyArgs keyArgs) throws IOException; + long finalizeBlock(OmKeyLocationInfo block) throws IOException; + void setTimes(String key, long mtime, long atime) throws IOException; boolean isFileClosed(String pathStr) throws IOException; diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java index 10abc570918b..f28f2b7d437c 100644 --- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java +++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java @@ -80,6 +80,7 @@ public enum Statistic { INVOCATION_RECOVER_FILE_PREPARE("op_recover_file_prepare", "Calls of recoverFilePrepare()"), INVOCATION_RECOVER_FILE("op_recover_file", "Calls of recoverFile()"), + INVOCATION_FINALIZE_BLOCK("op_finalize_block", "Calls of finalizeBlock()"), INVOCATION_SET_SAFE_MODE("op_set_safe_mode", "Calls of setSafeMode()"); diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index 1369fcc32105..415801a78970 100644 --- a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.List; +import com.google.common.base.Strings; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.FileSystem; @@ -37,8 +38,11 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; +import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV; + /** * The Ozone Filesystem implementation. *

@@ -53,9 +57,12 @@ public class OzoneFileSystem extends BasicOzoneFileSystem implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode { private OzoneFSStorageStatistics storageStatistics; + private boolean forceRecovery; public OzoneFileSystem() { this.storageStatistics = new OzoneFSStorageStatistics(); + String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV); + forceRecovery = Strings.isNullOrEmpty(force) ? false : Boolean.parseBoolean(force); } @Override @@ -130,12 +137,14 @@ public boolean hasPathCapability(final Path path, final String capability) @Override public boolean recoverLease(Path f) throws IOException { + statistics.incrementWriteOps(1); LOG.trace("recoverLease() path:{}", f); + Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - List infoList = null; + OmKeyInfo keyInfo = null; try { - infoList = getAdapter().recoverFilePrepare(key); + keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery); } catch (OMException e) { if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) { // key is already closed, let's just return success @@ -143,12 +152,26 @@ public boolean recoverLease(Path f) throws IOException { } throw e; } - // TODO: query DN to get the final block length - OmKeyInfo keyInfo = infoList.get(0); + + // finalize the final block and get block length + List locationInfoList = keyInfo.getLatestVersionLocations().getLocationList(); + OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() - 1); + try { + block.setLength(getAdapter().finalizeBlock(block)); + } catch (Throwable e) { + if (!forceRecovery) { + throw e; + } + LOG.warn("Failed to finalize block. Continue to recover the file since {} is enabled.", + FORCE_LEASE_RECOVERY_ENV, e); + } + + // recover and commit file + long keyLength = locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum(); OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) - .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) - .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength) + .setLocationInfoList(locationInfoList) .build(); getAdapter().recoverFile(keyArgs); return true; diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index 71d55f434835..f5216cb4516a 100644 --- a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.ozone; +import com.google.common.base.Strings; import org.apache.hadoop.fs.LeaseRecoverable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.SafeMode; @@ -32,6 +33,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; import java.io.IOException; @@ -39,6 +41,8 @@ import java.net.URI; import java.util.List; +import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV; + /** * The Rooted Ozone Filesystem (OFS) implementation. *

@@ -53,9 +57,12 @@ public class RootedOzoneFileSystem extends BasicRootedOzoneFileSystem implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode { private OzoneFSStorageStatistics storageStatistics; + private boolean forceRecovery; public RootedOzoneFileSystem() { this.storageStatistics = new OzoneFSStorageStatistics(); + String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV); + forceRecovery = Strings.isNullOrEmpty(force) ? false : Boolean.parseBoolean(force); } @Override @@ -139,9 +146,9 @@ public boolean recoverLease(final Path f) throws IOException { LOG.trace("recoverLease() path:{}", f); Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - List infoList = null; + OmKeyInfo keyInfo = null; try { - infoList = getAdapter().recoverFilePrepare(key); + keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery); } catch (OMException e) { if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) { // key is already closed, let's just return success @@ -149,12 +156,26 @@ public boolean recoverLease(final Path f) throws IOException { } throw e; } - // TODO: query DN to get the final block length - OmKeyInfo keyInfo = infoList.get(0); + + // finalize the final block and get block length + List locationInfoList = keyInfo.getLatestVersionLocations().getLocationList(); + OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() - 1); + try { + block.setLength(getAdapter().finalizeBlock(block)); + } catch (Throwable e) { + if (!forceRecovery) { + throw e; + } + LOG.warn("Failed to finalize block. Continue to recover the file since {} is enabled.", + FORCE_LEASE_RECOVERY_ENV, e); + } + + // recover and commit file + long keyLength = locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum(); OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) - .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) - .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength) + .setLocationInfoList(locationInfoList) .build(); getAdapter().recoverFile(keyArgs); return true; diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index 488d45da1c19..415801a78970 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.List; +import com.google.common.base.Strings; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.FileSystem; @@ -37,8 +38,11 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; +import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV; + /** * The Ozone Filesystem implementation. *

@@ -53,9 +57,12 @@ public class OzoneFileSystem extends BasicOzoneFileSystem implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode { private OzoneFSStorageStatistics storageStatistics; + private boolean forceRecovery; public OzoneFileSystem() { this.storageStatistics = new OzoneFSStorageStatistics(); + String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV); + forceRecovery = Strings.isNullOrEmpty(force) ? false : Boolean.parseBoolean(force); } @Override @@ -130,12 +137,14 @@ public boolean hasPathCapability(final Path path, final String capability) @Override public boolean recoverLease(Path f) throws IOException { - LOG.trace("isFileClosed() path:{}", f); + statistics.incrementWriteOps(1); + LOG.trace("recoverLease() path:{}", f); + Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - List infoList = null; + OmKeyInfo keyInfo = null; try { - infoList = getAdapter().recoverFilePrepare(key); + keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery); } catch (OMException e) { if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) { // key is already closed, let's just return success @@ -143,12 +152,26 @@ public boolean recoverLease(Path f) throws IOException { } throw e; } - // TODO: query DN to get the final block length - OmKeyInfo keyInfo = infoList.get(0); + + // finalize the final block and get block length + List locationInfoList = keyInfo.getLatestVersionLocations().getLocationList(); + OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() - 1); + try { + block.setLength(getAdapter().finalizeBlock(block)); + } catch (Throwable e) { + if (!forceRecovery) { + throw e; + } + LOG.warn("Failed to finalize block. Continue to recover the file since {} is enabled.", + FORCE_LEASE_RECOVERY_ENV, e); + } + + // recover and commit file + long keyLength = locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum(); OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) - .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) - .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength) + .setLocationInfoList(locationInfoList) .build(); getAdapter().recoverFile(keyArgs); return true; diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java index c501e0652eaa..278462628709 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.ozone; +import com.google.common.base.Strings; import org.apache.hadoop.fs.LeaseRecoverable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.SafeMode; @@ -32,6 +33,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.security.token.DelegationTokenIssuer; import java.io.IOException; @@ -39,6 +41,8 @@ import java.net.URI; import java.util.List; +import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV; + /** * The Rooted Ozone Filesystem (OFS) implementation. *

@@ -53,9 +57,12 @@ public class RootedOzoneFileSystem extends BasicRootedOzoneFileSystem implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode { private OzoneFSStorageStatistics storageStatistics; + private boolean forceRecovery; public RootedOzoneFileSystem() { this.storageStatistics = new OzoneFSStorageStatistics(); + String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV); + forceRecovery = Strings.isNullOrEmpty(force) ? false : Boolean.parseBoolean(force); } @Override @@ -132,9 +139,9 @@ public boolean recoverLease(final Path f) throws IOException { LOG.trace("recoverLease() path:{}", f); Path qualifiedPath = makeQualified(f); String key = pathToKey(qualifiedPath); - List infoList = null; + OmKeyInfo keyInfo = null; try { - infoList = getAdapter().recoverFilePrepare(key); + keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery); } catch (OMException e) { if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) { // key is already closed, let's just return success @@ -142,12 +149,26 @@ public boolean recoverLease(final Path f) throws IOException { } throw e; } - // TODO: query DN to get the final block length - OmKeyInfo keyInfo = infoList.get(0); + + // finalize the final block and get block length + List keyLocationInfoList = keyInfo.getLatestVersionLocations().getLocationList(); + OmKeyLocationInfo block = keyLocationInfoList.get(keyLocationInfoList.size() - 1); + try { + block.setLength(getAdapter().finalizeBlock(block)); + } catch (Throwable e) { + if (!forceRecovery) { + throw e; + } + LOG.warn("Failed to finalize block. Continue to recover the file since {} is enabled.", + FORCE_LEASE_RECOVERY_ENV, e); + } + + // recover and commit file + long keyLength = keyLocationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum(); OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName()) .setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName()) - .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize()) - .setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList()) + .setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength) + .setLocationInfoList(keyLocationInfoList) .build(); getAdapter().recoverFile(keyArgs); return true;