diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 2d955e7cea6..eda35255be6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -226,6 +226,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; +import org.apache.hadoop.ozone.om.execution.OMExecutionFlow; import org.apache.hadoop.ozone.om.ha.OMHAMetrics; import org.apache.hadoop.ozone.om.ha.OMHANodeDetails; import org.apache.hadoop.ozone.om.helpers.BasicOmKeyInfo; @@ -422,6 +423,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol; private OzoneManagerRatisServer omRatisServer; + private OMExecutionFlow omExecutionFlow; private OmRatisSnapshotProvider omRatisSnapshotProvider; private OMNodeDetails omNodeDetails; private final Map peerNodesMap; @@ -714,6 +716,10 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) if (isOmGrpcServerEnabled) { omS3gGrpcServer = getOmS3gGrpcServer(configuration); } + + // init om execution flow for request + omExecutionFlow = new OMExecutionFlow(this); + ShutdownHookManager.get().addShutdownHook(this::saveOmMetrics, SHUTDOWN_HOOK_PRIORITY); @@ -5031,4 +5037,8 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException throw new OMException("Feature disabled: " + feature, OMException.ResultCodes.NOT_SUPPORTED_OPERATION); } } + + public OMExecutionFlow getOmExecutionFlow() { + return omExecutionFlow; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java new file mode 100644 index 00000000000..4ce714ab3dc --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.execution; + +import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; + +import com.google.protobuf.ServiceException; +import java.io.IOException; +import org.apache.hadoop.ozone.om.OMPerformanceMetrics; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; + +/** + * entry for execution flow for write request. + */ +public class OMExecutionFlow { + + private final OzoneManager ozoneManager; + private final OMPerformanceMetrics perfMetrics; + + public OMExecutionFlow(OzoneManager om) { + this.ozoneManager = om; + this.perfMetrics = ozoneManager.getPerfMetrics(); + } + + /** + * External request handling. + * + * @param omRequest the request + * @return OMResponse the response of execution + * @throws ServiceException the exception on execution + */ + public OMResponse submit(OMRequest omRequest) throws ServiceException { + // TODO: currently have only execution after ratis submission, but with new flow can have switch later + return submitExecutionToRatis(omRequest); + } + + private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceException { + // 1. create client request and preExecute + OMClientRequest omClientRequest = null; + final OMRequest requestToSubmit; + try { + omClientRequest = OzoneManagerRatisUtils.createClientRequest(request, ozoneManager); + assert (omClientRequest != null); + final OMClientRequest finalOmClientRequest = omClientRequest; + requestToSubmit = captureLatencyNs(perfMetrics.getPreExecuteLatencyNs(), + () -> finalOmClientRequest.preExecute(ozoneManager)); + } catch (IOException ex) { + if (omClientRequest != null) { + OMAuditLogger.log(omClientRequest.getAuditBuilder()); + omClientRequest.handleRequestFailure(ozoneManager); + } + return OzoneManagerRatisUtils.createErrorResponse(request, ex); + } + + // 2. submit request to ratis + OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit); + if (!response.getSuccess()) { + omClientRequest.handleRequestFailure(ozoneManager); + } + return response; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/package-info.java new file mode 100644 index 00000000000..713f8348978 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes for the OM execution implementation. + */ +package org.apache.hadoop.ozone.om.execution; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 7a2c66e81d2..5548be7bd8b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -517,4 +517,20 @@ public static OzoneManagerProtocolProtos.OMResponse submitRequest( OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException { return om.getOmRatisServer().submitRequest(omRequest, clientId, callId); } + + public static OzoneManagerProtocolProtos.OMResponse createErrorResponse( + OMRequest omRequest, IOException exception) { + // Added all write command types here, because in future if any of the + // preExecute is changed to return IOException, we can return the error + // OMResponse to the client. + OzoneManagerProtocolProtos.OMResponse.Builder omResponse = OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) + .setCmdType(omRequest.getCmdType()) + .setTraceID(omRequest.getTraceID()) + .setSuccess(false); + if (exception.getMessage() != null) { + omResponse.setMessage(exception.getMessage()); + } + return omResponse.build(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 13403cadce1..6b7bccd4ab9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -19,7 +19,7 @@ import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY; import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER; -import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createClientRequest; +import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createErrorResponse; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PrepareStatus; import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs; @@ -38,12 +38,10 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; -import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; -import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.om.request.validation.RequestValidations; import org.apache.hadoop.ozone.om.request.validation.ValidationContext; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; @@ -173,43 +171,13 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep return cached; } - // process new request - OMClientRequest omClientRequest = null; - final OMRequest requestToSubmit; - try { - omClientRequest = createClientRequest(request, ozoneManager); - // TODO: Note: Due to HDDS-6055, createClientRequest() could now - // return null, which triggered the findbugs warning. - // Added the assertion. - assert (omClientRequest != null); - OMClientRequest finalOmClientRequest = omClientRequest; - - requestToSubmit = preExecute(finalOmClientRequest); - this.lastRequestToSubmit = requestToSubmit; - } catch (IOException ex) { - if (omClientRequest != null) { - OMAuditLogger.log(omClientRequest.getAuditBuilder()); - omClientRequest.handleRequestFailure(ozoneManager); - } - return createErrorResponse(request, ex); - } - - final OMResponse response = omRatisServer.submitRequest(requestToSubmit); - if (!response.getSuccess()) { - omClientRequest.handleRequestFailure(ozoneManager); - } - return response; + this.lastRequestToSubmit = request; + return ozoneManager.getOmExecutionFlow().submit(request); } finally { OzoneManager.setS3Auth(null); } } - private OMRequest preExecute(OMClientRequest finalOmClientRequest) - throws IOException { - return captureLatencyNs(perfMetrics.getPreExecuteLatencyNs(), - () -> finalOmClientRequest.preExecute(ozoneManager)); - } - @VisibleForTesting public OMRequest getLastRequestToSubmit() { return lastRequestToSubmit; @@ -248,23 +216,6 @@ private ServiceException createLeaderNotReadyException() { return new ServiceException(leaderNotReadyException); } - /** @return an {@link OMResponse} from the given {@link OMRequest} and the given exception. */ - private OMResponse createErrorResponse( - OMRequest omRequest, IOException exception) { - // Added all write command types here, because in future if any of the - // preExecute is changed to return IOException, we can return the error - // OMResponse to the client. - OMResponse.Builder omResponse = OMResponse.newBuilder() - .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) - .setCmdType(omRequest.getCmdType()) - .setTraceID(omRequest.getTraceID()) - .setSuccess(false); - if (exception.getMessage() != null) { - omResponse.setMessage(exception.getMessage()); - } - return omResponse.build(); - } - public static Logger getLog() { return LOG; } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisRequest.java index f8bb6b3227e..7226cf6e91f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisRequest.java @@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.execution.OMExecutionFlow; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; @@ -109,6 +110,8 @@ public void testUnknownRequestHandling() omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration, ozoneManager); when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + OMExecutionFlow omExecutionFlow = new OMExecutionFlow(ozoneManager); + when(ozoneManager.getOmExecutionFlow()).thenReturn(omExecutionFlow); when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration); final OmConfig omConfig = ozoneConfiguration.getObject(OmConfig.class); when(ozoneManager.getConfig()).thenReturn(omConfig);