Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.execution.OMExecutionFlow;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.ha.OMHANodeDetails;
import org.apache.hadoop.ozone.om.helpers.BasicOmKeyInfo;
Expand Down Expand Up @@ -422,6 +423,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private OzoneManagerProtocolServerSideTranslatorPB omServerProtocol;

private OzoneManagerRatisServer omRatisServer;
private OMExecutionFlow omExecutionFlow;
private OmRatisSnapshotProvider omRatisSnapshotProvider;
private OMNodeDetails omNodeDetails;
private final Map<String, OMNodeDetails> peerNodesMap;
Expand Down Expand Up @@ -714,6 +716,10 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
if (isOmGrpcServerEnabled) {
omS3gGrpcServer = getOmS3gGrpcServer(configuration);
}

// init om execution flow for request
omExecutionFlow = new OMExecutionFlow(this);

ShutdownHookManager.get().addShutdownHook(this::saveOmMetrics,
SHUTDOWN_HOOK_PRIORITY);

Expand Down Expand Up @@ -5031,4 +5037,8 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException
throw new OMException("Feature disabled: " + feature, OMException.ResultCodes.NOT_SUPPORTED_OPERATION);
}
}

public OMExecutionFlow getOmExecutionFlow() {
return omExecutionFlow;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.execution;

import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

import com.google.protobuf.ServiceException;
import java.io.IOException;
import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;

/**
* entry for execution flow for write request.
*/
public class OMExecutionFlow {

private final OzoneManager ozoneManager;
private final OMPerformanceMetrics perfMetrics;

public OMExecutionFlow(OzoneManager om) {
this.ozoneManager = om;
this.perfMetrics = ozoneManager.getPerfMetrics();
}

/**
* External request handling.
*
* @param omRequest the request
* @return OMResponse the response of execution
* @throws ServiceException the exception on execution
*/
public OMResponse submit(OMRequest omRequest) throws ServiceException {
// TODO: currently have only execution after ratis submission, but with new flow can have switch later
return submitExecutionToRatis(omRequest);
}

private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceException {
// 1. create client request and preExecute
OMClientRequest omClientRequest = null;
final OMRequest requestToSubmit;
try {
omClientRequest = OzoneManagerRatisUtils.createClientRequest(request, ozoneManager);
assert (omClientRequest != null);
final OMClientRequest finalOmClientRequest = omClientRequest;
requestToSubmit = captureLatencyNs(perfMetrics.getPreExecuteLatencyNs(),
() -> finalOmClientRequest.preExecute(ozoneManager));
} catch (IOException ex) {
if (omClientRequest != null) {
OMAuditLogger.log(omClientRequest.getAuditBuilder());
omClientRequest.handleRequestFailure(ozoneManager);
}
return OzoneManagerRatisUtils.createErrorResponse(request, ex);
}

// 2. submit request to ratis
OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit);
if (!response.getSuccess()) {
omClientRequest.handleRequestFailure(ozoneManager);
}
return response;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This package contains classes for the OM execution implementation.
*/
package org.apache.hadoop.ozone.om.execution;
Original file line number Diff line number Diff line change
Expand Up @@ -517,4 +517,20 @@ public static OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
}

public static OzoneManagerProtocolProtos.OMResponse createErrorResponse(
OMRequest omRequest, IOException exception) {
// Added all write command types here, because in future if any of the
// preExecute is changed to return IOException, we can return the error
// OMResponse to the client.
OzoneManagerProtocolProtos.OMResponse.Builder omResponse = OzoneManagerProtocolProtos.OMResponse.newBuilder()
.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
.setCmdType(omRequest.getCmdType())
.setTraceID(omRequest.getTraceID())
.setSuccess(false);
if (exception.getMessage() != null) {
omResponse.setMessage(exception.getMessage());
}
return omResponse.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER;
import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createClientRequest;
import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createErrorResponse;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PrepareStatus;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;

Expand All @@ -38,12 +38,10 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.request.validation.RequestValidations;
import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
Expand Down Expand Up @@ -173,43 +171,13 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep
return cached;
}

// process new request
OMClientRequest omClientRequest = null;
final OMRequest requestToSubmit;
try {
omClientRequest = createClientRequest(request, ozoneManager);
// TODO: Note: Due to HDDS-6055, createClientRequest() could now
// return null, which triggered the findbugs warning.
// Added the assertion.
assert (omClientRequest != null);
OMClientRequest finalOmClientRequest = omClientRequest;

requestToSubmit = preExecute(finalOmClientRequest);
this.lastRequestToSubmit = requestToSubmit;
} catch (IOException ex) {
if (omClientRequest != null) {
OMAuditLogger.log(omClientRequest.getAuditBuilder());
omClientRequest.handleRequestFailure(ozoneManager);
}
return createErrorResponse(request, ex);
}

final OMResponse response = omRatisServer.submitRequest(requestToSubmit);
if (!response.getSuccess()) {
omClientRequest.handleRequestFailure(ozoneManager);
}
return response;
this.lastRequestToSubmit = request;
return ozoneManager.getOmExecutionFlow().submit(request);
} finally {
OzoneManager.setS3Auth(null);
}
}

private OMRequest preExecute(OMClientRequest finalOmClientRequest)
throws IOException {
return captureLatencyNs(perfMetrics.getPreExecuteLatencyNs(),
() -> finalOmClientRequest.preExecute(ozoneManager));
}

@VisibleForTesting
public OMRequest getLastRequestToSubmit() {
return lastRequestToSubmit;
Expand Down Expand Up @@ -248,23 +216,6 @@ private ServiceException createLeaderNotReadyException() {
return new ServiceException(leaderNotReadyException);
}

/** @return an {@link OMResponse} from the given {@link OMRequest} and the given exception. */
private OMResponse createErrorResponse(
OMRequest omRequest, IOException exception) {
// Added all write command types here, because in future if any of the
// preExecute is changed to return IOException, we can return the error
// OMResponse to the client.
OMResponse.Builder omResponse = OMResponse.newBuilder()
.setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
.setCmdType(omRequest.getCmdType())
.setTraceID(omRequest.getTraceID())
.setSuccess(false);
if (exception.getMessage() != null) {
omResponse.setMessage(exception.getMessage());
}
return omResponse.build();
}

public static Logger getLog() {
return LOG;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.execution.OMExecutionFlow;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
Expand Down Expand Up @@ -109,6 +110,8 @@ public void testUnknownRequestHandling()
omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration,
ozoneManager);
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
OMExecutionFlow omExecutionFlow = new OMExecutionFlow(ozoneManager);
when(ozoneManager.getOmExecutionFlow()).thenReturn(omExecutionFlow);
when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
final OmConfig omConfig = ozoneConfiguration.getObject(OmConfig.class);
when(ozoneManager.getConfig()).thenReturn(omConfig);
Expand Down