Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@

import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_CALLER_CONTEXT_PREFIX;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.SCM_IN_SAFE_MODE;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelPrepareRequest;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelPrepareResponse;
Expand Down Expand Up @@ -258,6 +259,10 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
= new ThreadLocal<>();

private boolean s3AuthCheck;

public static final int BLOCK_ALLOCATION_RETRY_COUNT = 5;
public static final int BLOCK_ALLOCATION_RETRY_WAIT_TIME_MS = 3000;

public OzoneManagerProtocolClientSideTranslatorPB(OmTransport omTransport,
String clientId) {
this.clientID = clientId;
Expand Down Expand Up @@ -728,8 +733,7 @@ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
.setCreateKeyRequest(req)
.build();

CreateKeyResponse keyResponse =
handleError(submitRequest(omRequest)).getCreateKeyResponse();
CreateKeyResponse keyResponse = handleSubmitRequestAndSCMSafeModeRetry(omRequest).getCreateKeyResponse();
return new OpenKeySession(keyResponse.getID(),
OmKeyInfo.getFromProtobuf(keyResponse.getKeyInfo()),
keyResponse.getOpenVersion());
Expand Down Expand Up @@ -774,8 +778,7 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId,
.setAllocateBlockRequest(req)
.build();

AllocateBlockResponse resp = handleError(submitRequest(omRequest))
.getAllocateBlockResponse();
AllocateBlockResponse resp = handleSubmitRequestAndSCMSafeModeRetry(omRequest).getAllocateBlockResponse();
return OmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation());
}

Expand Down Expand Up @@ -2243,12 +2246,38 @@ public OpenKeySession createFile(OmKeyArgs args,
OMRequest omRequest = createOMRequest(Type.CreateFile)
.setCreateFileRequest(createFileRequest)
.build();
CreateFileResponse resp =
handleError(submitRequest(omRequest)).getCreateFileResponse();
CreateFileResponse resp = handleSubmitRequestAndSCMSafeModeRetry(omRequest).getCreateFileResponse();

return new OpenKeySession(resp.getID(),
OmKeyInfo.getFromProtobuf(resp.getKeyInfo()), resp.getOpenVersion());
}


@Nonnull
private OMResponse handleSubmitRequestAndSCMSafeModeRetry(OMRequest omRequest) throws IOException {
int retryCount = BLOCK_ALLOCATION_RETRY_COUNT;
while (true) {
try {
return handleError(submitRequest(omRequest));
} catch (OMException e) {
if (e.getResult().equals(SCM_IN_SAFE_MODE) && retryCount > 0) {
System.err.println("SCM is in safe mode. Will retry in " +
BLOCK_ALLOCATION_RETRY_WAIT_TIME_MS + "ms");
retryCount--;
try {
Thread.sleep(BLOCK_ALLOCATION_RETRY_WAIT_TIME_MS);
continue;
} catch (InterruptedException ex) {
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_SAFE_MODE);
}
} else if (e.getResult().equals(SCM_IN_SAFE_MODE) && retryCount == 0) {
throw new OMException(e.getMessage(), ResultCodes.SCM_IN_SAFE_MODE);
}
throw e;
}
}
}

@Override
public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
String startKey, long numEntries, boolean allowPartialPrefixes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
*/
package org.apache.hadoop.ozone.om;

import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeMode;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
Expand Down Expand Up @@ -55,11 +61,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -334,4 +343,49 @@ public void testSCMSafeModeDisabled() throws Exception {
cluster.restartStorageContainerManager(true);
assertFalse(scm.isInSafeMode());
}

@Test
public void testCreateRetryWhileSCMSafeMode() throws Exception {
// Test1: Test safe mode when there are no containers in system.
cluster.stop();

try {
cluster = builder.build();
} catch (IOException e) {
fail("Cluster startup failed.");
}

final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
OMMetrics omMetrics = cluster.getOzoneManager().getMetrics();
long allocateBlockReqCount = omMetrics.getNumBlockAllocateFails();

try (FileSystem fs = FileSystem.get(conf)) {
assertTrue(((SafeMode)fs).setSafeMode(SafeModeAction.GET));

Thread t = new Thread(() -> {
try {
LOG.info("Wait for allocate block fails at least once");
GenericTestUtils.waitFor(() -> omMetrics.getNumBlockAllocateFails() > allocateBlockReqCount,
100, 10000);

cluster.startHddsDatanodes();
cluster.waitForClusterToBeReady();
cluster.waitTobeOutOfSafeMode();
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
});
t.start();

final Path file = new Path("file");
try (FSDataOutputStream outputStream = fs.create(file, true)) {
LOG.info("Successfully created a file");
}
t.join();
}

assertFalse(cluster.getStorageContainerManager().isInSafeMode());
}
}