diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java index abc21ed43514..a173bd9222e5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRangerBGSyncService.java @@ -80,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -233,7 +234,7 @@ public void setUp() throws IOException { ozoneManager.getMetadataManager().getMetaTable().put( OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY, String.valueOf(v)); return null; - }).when(omRatisServer).submitRequest(any(), any()); + }).when(omRatisServer).submitRequest(any(), any(), anyLong()); } catch (ServiceException e) { throw new RuntimeException(e); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index b6903ca9e91f..0038bca2e32a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -2065,6 +2065,7 @@ private void addOMNodeToPeers(String newOMNodeId) throws IOException { } catch (IOException e) { LOG.error("{}: Couldn't add OM {} to peer list.", getOMNodeId(), newOMNodeId); + return; } if (omRatisSnapshotProvider == null) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java index 6e1c9da34cbc..bd462224e9d8 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/TrashOzoneFileSystem.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.om; import com.google.common.base.Preconditions; -import com.google.protobuf.RpcController; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.om.exceptions.OMException; @@ -35,15 +34,12 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,8 +65,6 @@ */ public class TrashOzoneFileSystem extends FileSystem { - private static final RpcController NULL_RPC_CONTROLLER = null; - private static final int OZONE_FS_ITERATE_BATCH_SIZE = 100; private static final int OZONE_MAX_LIST_KEYS_SIZE = 10000; @@ -97,34 +91,15 @@ public TrashOzoneFileSystem(OzoneManager ozoneManager) throws IOException { ozoneConfiguration = OzoneConfiguration.of(getConf()); } - private RaftClientRequest getRatisRequest( - OzoneManagerProtocolProtos.OMRequest omRequest) { - return RaftClientRequest.newBuilder() - .setClientId(CLIENT_ID) - .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId()) - .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId()) - .setCallId(runCount.getAndIncrement()) - .setMessage( - Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - - } - private void submitRequest(OzoneManagerProtocolProtos.OMRequest omRequest) throws Exception { ozoneManager.getMetrics().incNumTrashWriteRequests(); if (ozoneManager.isRatisEnabled()) { - OMClientRequest omClientRequest = - OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager); + // perform preExecute as ratis submit do no perform preExecute + OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager); omRequest = omClientRequest.preExecute(ozoneManager); - RaftClientRequest req = getRatisRequest(omRequest); - ozoneManager.getOmRatisServer().submitRequest(omRequest, req); - } else { - ozoneManager.getOmServerProtocol(). - submitRequest(NULL_RPC_CONTROLLER, omRequest); } + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.getAndIncrement()); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 78d6ed89d2d1..af4d42ad68a9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -301,15 +301,23 @@ private RaftClientRequest createRaftRequest(OMRequest omRequest) { } /** - * API used internally from OzoneManager Server when requests needs to be - * submitted to ratis, where the crafted RaftClientRequest is passed along. + * API used internally from OzoneManager Server when requests need to be submitted. * @param omRequest - * @param raftClientRequest + * @param cliId + * @param callId * @return OMResponse * @throws ServiceException */ - public OMResponse submitRequest(OMRequest omRequest, - RaftClientRequest raftClientRequest) throws ServiceException { + public OMResponse submitRequest(OMRequest omRequest, ClientId cliId, long callId) throws ServiceException { + RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() + .setClientId(cliId) + .setServerId(getRaftPeerId()) + .setGroupId(getRaftGroupId()) + .setCallId(callId) + .setMessage(Message.valueOf( + OMRatisHelper.convertRequestToByteString(omRequest))) + .setType(RaftClientRequest.writeRequestType()) + .build(); RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest); return createOmResponse(omRequest, raftClientReply); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 5dc640c742cc..ffaedaa06a99 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import java.io.File; import java.nio.file.InvalidPathException; @@ -98,6 +99,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.ClientId; import org.rocksdb.RocksDBException; import java.io.IOException; @@ -117,6 +119,7 @@ public final class OzoneManagerRatisUtils { private static final Logger LOG = LoggerFactory .getLogger(OzoneManagerRatisUtils.class); + private static final RpcController NULL_RPC_CONTROLLER = null; private OzoneManagerRatisUtils() { } @@ -502,4 +505,13 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf, return null; } + + public static OzoneManagerProtocolProtos.OMResponse submitRequest( + OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException { + if (om.isRatisEnabled()) { + return om.getOmRatisServer().submitRequest(omRequest, clientId, callId); + } else { + return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER, omRequest); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 154bd474b6de..2c2d16bf14c7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -33,11 +33,11 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SnapshotChainManager; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys; @@ -48,8 +48,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.util.Preconditions; import java.io.IOException; @@ -247,10 +245,7 @@ private int submitPurgeKeysRequest(List results, // Submit PurgeKeys request to OM try { - RaftClientRequest raftClientRequest = - createRaftClientRequestForPurge(omRequest); - ozoneManager.getOmRatisServer().submitRequest(omRequest, - raftClientRequest); + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get()); } catch (ServiceException e) { LOG.error("PurgeKey request failed. Will retry at next run."); return 0; @@ -259,20 +254,6 @@ private int submitPurgeKeysRequest(List results, return deletedCount; } - protected RaftClientRequest createRaftClientRequestForPurge( - OMRequest omRequest) { - return RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId()) - .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId()) - .setCallId(runCount.get()) - .setMessage( - Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - } - /** * Parse Volume and Bucket Name from ObjectKey and add it to given map of * keys to be purged per bucket. @@ -311,15 +292,7 @@ protected void submitPurgePaths(List requests, // Submit Purge paths request to OM try { - if (isRatisEnabled()) { - RaftClientRequest raftClientRequest = - createRaftClientRequestForPurge(omRequest); - ozoneManager.getOmRatisServer().submitRequest(omRequest, - raftClientRequest); - } else { - getOzoneManager().getOmServerProtocol() - .submitRequest(null, omRequest); - } + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get()); } catch (ServiceException e) { LOG.error("PurgePaths request failed. Will retry at next run."); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index aa2eb6720a3c..5e622cb17019 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -42,8 +42,7 @@ import org.apache.hadoop.ozone.om.OmSnapshot; import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize; @@ -67,8 +66,6 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -481,24 +478,7 @@ private void updateDeepCleanedSnapshots(List deepCleanedSnapshots) { public void submitRequest(OMRequest omRequest, ClientId clientId) { try { - if (isRatisEnabled()) { - OzoneManagerRatisServer server = getOzoneManager().getOmRatisServer(); - - RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(server.getRaftPeerId()) - .setGroupId(server.getRaftGroupId()) - .setCallId(getRunCount().get()) - .setMessage(Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - - server.submitRequest(omRequest, raftClientRequest); - } else { - getOzoneManager().getOmServerProtocol() - .submitRequest(null, omRequest); - } + OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get()); } catch (ServiceException e) { LOG.error("Snapshot deep cleaning request failed. " + "Will retry at next run.", e); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java index 1199a0c65068..f1084155e98e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java @@ -29,16 +29,13 @@ import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadsExpiredAbortRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,24 +205,7 @@ private OMRequest createRequest(List private void submitRequest(OMRequest omRequest) { try { - if (isRatisEnabled()) { - OzoneManagerRatisServer server = ozoneManager.getOmRatisServer(); - - RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(server.getRaftPeerId()) - .setGroupId(server.getRaftGroupId()) - .setCallId(runCount.get()) - .setMessage(Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - - server.submitRequest(omRequest, raftClientRequest); - } else { - ozoneManager.getOmServerProtocol().submitRequest(null, - omRequest); - } + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get()); } catch (ServiceException e) { LOG.error("Expired multipart info delete request failed. " + "Will retry at next run.", e); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java index 45112037c1b9..768c77ad16e2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java @@ -47,7 +47,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock; @@ -55,12 +54,11 @@ import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController; import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy; import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -375,19 +373,6 @@ long getRangerOzoneServicePolicyVersion() throws IOException { return policyVersion; } - private RaftClientRequest newRaftClientRequest(OMRequest omRequest) { - return RaftClientRequest.newBuilder() - .setClientId(CLIENT_ID) - .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId()) - .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId()) - .setCallId(runCount.get()) - .setMessage( - Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - } - public void setOMDBRangerServiceVersion(long version) throws ServiceException { // OM DB update goes through Ratis @@ -402,9 +387,7 @@ public void setOMDBRangerServiceVersion(long version) .build(); try { - RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest); - ozoneManager.getOmRatisServer().submitRequest(omRequest, - raftClientRequest); + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, CLIENT_ID, runCount.get()); } catch (ServiceException e) { LOG.error("SetRangerServiceVersion request failed. " + "Will retry at next run."); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java index ab5562301940..c0d958f61213 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java @@ -31,8 +31,7 @@ import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; @@ -41,8 +40,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -268,24 +265,7 @@ private OMRequest createDeleteOpenKeysRequest( private OMResponse submitRequest(OMRequest omRequest) { try { - if (isRatisEnabled()) { - OzoneManagerRatisServer server = ozoneManager.getOmRatisServer(); - - RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(server.getRaftPeerId()) - .setGroupId(server.getRaftGroupId()) - .setCallId(runCount.get()) - .setMessage(Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - - return server.submitRequest(omRequest, raftClientRequest); - } else { - return ozoneManager.getOmServerProtocol().submitRequest( - null, omRequest); - } + return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get()); } catch (ServiceException e) { LOG.error("Open key " + omRequest.getCmdType() + " request failed. Will retry at next run.", e); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java index c043a6a72f24..1a29ee8d96bf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/QuotaRepairTask.java @@ -51,14 +51,11 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,22 +191,7 @@ private void repairActiveDb( private OzoneManagerProtocolProtos.OMResponse submitRequest( OzoneManagerProtocolProtos.OMRequest omRequest, ClientId clientId) throws Exception { try { - if (om.isRatisEnabled()) { - OzoneManagerRatisServer server = om.getOmRatisServer(); - RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(om.getOmRatisServer().getRaftPeerId()) - .setGroupId(om.getOmRatisServer().getRaftGroupId()) - .setCallId(RUN_CNT.getAndIncrement()) - .setMessage(Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - return server.submitRequest(omRequest, raftClientRequest); - } else { - RUN_CNT.getAndIncrement(); - return om.getOmServerProtocol().submitRequest( - null, omRequest); - } + return OzoneManagerRatisUtils.submitRequest(om, omRequest, clientId, RUN_CNT.getAndIncrement()); } catch (ServiceException e) { LOG.error("repair quota count " + omRequest.getCmdType() + " request failed.", e); throw e; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index a98081c63a17..f85bd781b050 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -40,7 +40,6 @@ import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SnapshotChainManager; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -48,7 +47,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest; @@ -58,8 +57,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -562,23 +559,7 @@ public void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, public void submitRequest(OMRequest omRequest) { try { - if (isRatisEnabled()) { - OzoneManagerRatisServer server = ozoneManager.getOmRatisServer(); - - RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(server.getRaftPeerId()) - .setGroupId(server.getRaftGroupId()) - .setCallId(getRunCount().get()) - .setMessage(Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - - server.submitRequest(omRequest, raftClientRequest); - } else { - ozoneManager.getOmServerProtocol().submitRequest(null, omRequest); - } + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, getRunCount().get()); } catch (ServiceException e) { LOG.error("Snapshot Deleting request failed. " + "Will retry at next run.", e); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java index 9746b4421b77..26d5d24a8a03 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java @@ -34,13 +34,12 @@ import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SnapshotChainManager; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; @@ -48,8 +47,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; import java.io.IOException; import java.util.ArrayList; @@ -436,25 +433,7 @@ private void updateDeepCleanSnapshotDir(String snapshotKeyTable) { public void submitRequest(OMRequest omRequest, ClientId clientId) { try { - if (isRatisEnabled()) { - OzoneManagerRatisServer server = - getOzoneManager().getOmRatisServer(); - - RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(server.getRaftPeerId()) - .setGroupId(server.getRaftGroupId()) - .setCallId(getRunCount().get()) - .setMessage(Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - - server.submitRequest(omRequest, raftClientRequest); - } else { - getOzoneManager().getOmServerProtocol() - .submitRequest(null, omRequest); - } + OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get()); } catch (ServiceException e) { LOG.error("Snapshot deep cleaning request failed. " + "Will retry at next run.", e);