diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java index bd40dfcf0240..7d029ba044de 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java @@ -229,6 +229,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_CALLER_CONTEXT_PREFIX; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.SCM_IN_SAFE_MODE; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelPrepareRequest; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelPrepareResponse; @@ -258,6 +259,10 @@ public final class OzoneManagerProtocolClientSideTranslatorPB = new ThreadLocal<>(); private boolean s3AuthCheck; + + public static final int BLOCK_ALLOCATION_RETRY_COUNT = 5; + public static final int BLOCK_ALLOCATION_RETRY_WAIT_TIME_MS = 3000; + public OzoneManagerProtocolClientSideTranslatorPB(OmTransport omTransport, String clientId) { this.clientID = clientId; @@ -728,8 +733,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException { .setCreateKeyRequest(req) .build(); - CreateKeyResponse keyResponse = - handleError(submitRequest(omRequest)).getCreateKeyResponse(); + CreateKeyResponse keyResponse = handleSubmitRequestAndSCMSafeModeRetry(omRequest).getCreateKeyResponse(); return new OpenKeySession(keyResponse.getID(), OmKeyInfo.getFromProtobuf(keyResponse.getKeyInfo()), keyResponse.getOpenVersion()); @@ -774,8 +778,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId, .setAllocateBlockRequest(req) .build(); - AllocateBlockResponse resp = handleError(submitRequest(omRequest)) - .getAllocateBlockResponse(); + AllocateBlockResponse resp = handleSubmitRequestAndSCMSafeModeRetry(omRequest).getAllocateBlockResponse(); return OmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation()); } @@ -2243,12 +2246,38 @@ public OpenKeySession createFile(OmKeyArgs args, OMRequest omRequest = createOMRequest(Type.CreateFile) .setCreateFileRequest(createFileRequest) .build(); - CreateFileResponse resp = - handleError(submitRequest(omRequest)).getCreateFileResponse(); + CreateFileResponse resp = handleSubmitRequestAndSCMSafeModeRetry(omRequest).getCreateFileResponse(); + return new OpenKeySession(resp.getID(), OmKeyInfo.getFromProtobuf(resp.getKeyInfo()), resp.getOpenVersion()); } + + @Nonnull + private OMResponse handleSubmitRequestAndSCMSafeModeRetry(OMRequest omRequest) throws IOException { + int retryCount = BLOCK_ALLOCATION_RETRY_COUNT; + while (true) { + try { + return handleError(submitRequest(omRequest)); + } catch (OMException e) { + if (e.getResult().equals(SCM_IN_SAFE_MODE) && retryCount > 0) { + System.err.println("SCM is in safe mode. Will retry in " + + BLOCK_ALLOCATION_RETRY_WAIT_TIME_MS + "ms"); + retryCount--; + try { + Thread.sleep(BLOCK_ALLOCATION_RETRY_WAIT_TIME_MS); + continue; + } catch (InterruptedException ex) { + throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_SAFE_MODE); + } + } else if (e.getResult().equals(SCM_IN_SAFE_MODE) && retryCount == 0) { + throw new OMException(e.getMessage(), ResultCodes.SCM_IN_SAFE_MODE); + } + throw e; + } + } + } + @Override public List listStatus(OmKeyArgs args, boolean recursive, String startKey, long numEntries, boolean allowPartialPrefixes) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java index 14b1a30b44f1..a5e3c69a5368 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java @@ -17,6 +17,12 @@ */ package org.apache.hadoop.ozone.om; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.SafeMode; +import org.apache.hadoop.fs.SafeModeAction; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -55,11 +61,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdds.client.ReplicationType.RATIS; import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -334,4 +343,49 @@ public void testSCMSafeModeDisabled() throws Exception { cluster.restartStorageContainerManager(true); assertFalse(scm.isInSafeMode()); } + + @Test + public void testCreateRetryWhileSCMSafeMode() throws Exception { + // Test1: Test safe mode when there are no containers in system. + cluster.stop(); + + try { + cluster = builder.build(); + } catch (IOException e) { + fail("Cluster startup failed."); + } + + final String rootPath = String.format("%s://%s/", + OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY)); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + OMMetrics omMetrics = cluster.getOzoneManager().getMetrics(); + long allocateBlockReqCount = omMetrics.getNumBlockAllocateFails(); + + try (FileSystem fs = FileSystem.get(conf)) { + assertTrue(((SafeMode)fs).setSafeMode(SafeModeAction.GET)); + + Thread t = new Thread(() -> { + try { + LOG.info("Wait for allocate block fails at least once"); + GenericTestUtils.waitFor(() -> omMetrics.getNumBlockAllocateFails() > allocateBlockReqCount, + 100, 10000); + + cluster.startHddsDatanodes(); + cluster.waitForClusterToBeReady(); + cluster.waitTobeOutOfSafeMode(); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + }); + t.start(); + + final Path file = new Path("file"); + try (FSDataOutputStream outputStream = fs.create(file, true)) { + LOG.info("Successfully created a file"); + } + t.join(); + } + + assertFalse(cluster.getStorageContainerManager().isInSafeMode()); + } }