Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import java.sql.SQLException;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.FlowStatusId;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.helix.HelixManager;

Expand All @@ -46,56 +48,38 @@ public GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionRe
this.dagActionStore = dagActionStore;
}

@Override
public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
FlowStatusId id = key.getKey();
addDagAction(id.getFlowGroup(), id.getFlowName(), id.getFlowExecutionId(), DagActionStore.FlowActionType.RESUME);
}
Comment on lines +52 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is sort of how the class was designed, but I'm curious why we also don't return a 200 OK response for resume.

Also I'm wondering why we don't enforce that the dag exists before we attempt to kill/resume it, seems like it is a source for some pain in users

Copy link
Copy Markdown
Contributor Author

@phet phet Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar enough w/ restli to know whether actions like resume can actually return a result the way HTTP methods like DELETE (kill) do. I see that the FlowExecutionResource at least expects the handlers to behave in the current way. that said, when I use curl to do a resume, I do observe a 200 OK coming back at me. (perhaps that's what's sent if a void method doesn't throw an exception?) in this case where we queue the work, strictly speaking, it should probably be HTTP 202 Accepted...

as for no 404, upon an entity not found, I intuitively agree w/ those semantics and can't think of a counter-argument. do we think it was merely forgotten? if so, I agree on adding, but suggest we do that on a separate commit, since these changes are merely a quick, unplanned bug fix


@Override
public void resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> key) {
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
public UpdateResponse delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> key) {
FlowStatusId id = key.getKey();
addDagAction(id.getFlowGroup(), id.getFlowName(), id.getFlowExecutionId(), DagActionStore.FlowActionType.KILL);
return new UpdateResponse(HttpStatus.S_200_OK);
}

/** NOTE: may throw {@link RestLiServiceException}, see: https://linkedin.github.io/rest.li/user_guide/restli_server#returning-errors */
protected void addDagAction(String flowGroup, String flowName, Long flowExecutionId, DagActionStore.FlowActionType actionType) {
try {
// If an existing resume request is still pending then do not accept this request
if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
this.prepareError("There is already a pending RESUME action for this flow. Please wait to resubmit and wait "
if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId.toString(), actionType)) {
this.throwErrorResponse("There is already a pending " + actionType + " action for this flow. Please wait to resubmit and wait "
+ "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), actionType);
} catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s %s to dag action store due to", flowGroup,
String.format("Failed to add %s action for flow %s %s %s to dag action store due to:", actionType, flowGroup,
flowName, flowExecutionId), e);
this.prepareError(e.getMessage(), HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}

}

private void prepareError(String exceptionMessage, HttpStatus errorType) {
if (errorType == HttpStatus.S_409_CONFLICT) {
throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, exceptionMessage);
} else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST);
this.throwErrorResponse(e.getMessage(), HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, exceptionMessage);
}

@Override
public UpdateResponse delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> key) {

String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
// If an existing kill request is still pending then do not accept this request
if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.FlowActionType.KILL)) {
this.prepareError("There is already a pending KILL action for this flow. Please wait to resubmit and wait "
+ "for action to be completed.", HttpStatus.S_400_BAD_REQUEST);
}
this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.FlowActionType.KILL);
return new UpdateResponse(HttpStatus.S_200_OK);
} catch (IOException | SQLException e) {
this.prepareError(String.format("Failed to add execution delete action for flow %s %s %s to dag action store due to", flowGroup,
flowName, flowExecutionId), HttpStatus.S_500_INTERNAL_SERVER_ERROR);
return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
private void throwErrorResponse(String exceptionMessage, HttpStatus errorType) {
throw StringUtils.isBlank(exceptionMessage) ? new RestLiServiceException(errorType) : new RestLiServiceException(errorType, exceptionMessage);
}
}