Skip to content

Workflow bulk deletion API and action module options #110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,23 @@ BulkResponse terminate(
"Cannot process more than {max} workflows. Please use multiple requests.")
List<String> workflowIds,
String reason);

BulkResponse deleteWorkflow(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
message =
"Cannot process more than {max} workflows. Please use multiple requests.")
List<String> workflowIds,
boolean archiveWorkflow);

BulkResponse terminateRemove(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
message =
"Cannot process more than {max} workflows. Please use multiple requests.")
List<String> workflowIds,
String reason,
boolean archiveWorkflow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ public class WorkflowBulkServiceImpl implements WorkflowBulkService {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowBulkService.class);
private final WorkflowExecutor workflowExecutor;
private final WorkflowService workflowService;

public WorkflowBulkServiceImpl(WorkflowExecutor workflowExecutor) {
public WorkflowBulkServiceImpl(
WorkflowExecutor workflowExecutor, WorkflowService workflowService) {
this.workflowExecutor = workflowExecutor;
this.workflowService = workflowService;
}

/**
Expand Down Expand Up @@ -164,4 +167,70 @@ public BulkResponse terminate(List<String> workflowIds, String reason) {
}
return bulkResponse;
}

/**
* Removes a list of workflows from the system.
*
* @param workflowIds List of WorkflowIDs of the workflows you want to remove from system.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
public BulkResponse deleteWorkflow(List<String> workflowIds, boolean archiveWorkflow) {
BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
try {
workflowService.deleteWorkflow(
workflowId,
archiveWorkflow); // TODO: change this to method that cancels then deletes
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error(
"bulk delete exception, workflowId {}, message: {} ",
workflowId,
e.getMessage(),
e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}
return bulkResponse;
}

/**
* Terminates execution for workflows in a list, then removes each workflow.
*
* @param workflowIds List of workflow IDs to terminate and delete.
* @param reason Reason for terminating the workflow.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
* @return bulk response object containing a list of succeeded workflows and a list of failed
* ones with errors
*/
public BulkResponse terminateRemove(
List<String> workflowIds, String reason, boolean archiveWorkflow) {
BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
try {
workflowExecutor.terminateWorkflow(workflowId, reason);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error(
"bulk terminate exception, workflowId {}, message: {} ",
workflowId,
e.getMessage(),
e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}

try {
workflowService.deleteWorkflow(workflowId, archiveWorkflow);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error(
"bulk delete exception, workflowId {}, message: {} ",
workflowId,
e.getMessage(),
e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}
return bulkResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,19 @@ void terminateWorkflow(
@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId,
String reason);

/**
* Terminate workflow execution, and then remove it from the system. Acts as terminate and
* remove combined.
*
* @param workflowId WorkflowId of the workflow
* @param reason Reason for terminating the workflow.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
void terminateRemove(
@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId,
String reason,
boolean archiveWorkflow);

/**
* Search for workflows based on payload and given parameters. Use sort options as sort ASCor
* DESC e.g. sort=name or sort=workflowId:DESC. If order is not specified, defaults to ASC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,19 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
executionService.removeWorkflow(workflowId, archiveWorkflow);
}

/**
* Terminate workflow execution, and then remove it from the system. Acts as terminate and
* remove combined.
*
* @param workflowId WorkflowId of the workflow
* @param reason Reason for terminating the workflow.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
public void terminateRemove(String workflowId, String reason, boolean archiveWorkflow) {
workflowExecutor.terminateWorkflow(workflowId, reason);
executionService.removeWorkflow(workflowId, archiveWorkflow);
}

/**
* Retrieves all the running workflows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@ WorkflowExecutor workflowExecutor() {
}

@Bean
public WorkflowBulkService workflowBulkService(WorkflowExecutor workflowExecutor) {
return new WorkflowBulkServiceImpl(workflowExecutor);
WorkflowService workflowService() {
return mock(WorkflowService.class);
}

@Bean
public WorkflowBulkService workflowBulkService(
WorkflowExecutor workflowExecutor, WorkflowService workflowService) {
return new WorkflowBulkServiceImpl(workflowExecutor, workflowService);
}
}

Expand Down Expand Up @@ -144,4 +150,28 @@ public void testTerminateNull() {
throw ex;
}
}

@Test(expected = ConstraintViolationException.class)
public void testDeleteWorkflowNull() {
try {
workflowBulkService.deleteWorkflow(null, false);
} catch (ConstraintViolationException ex) {
assertEquals(1, ex.getConstraintViolations().size());
Set<String> messages = getConstraintViolationMessages(ex.getConstraintViolations());
assertTrue(messages.contains("WorkflowIds list cannot be null."));
throw ex;
}
}

@Test(expected = ConstraintViolationException.class)
public void testTerminateRemoveNull() {
try {
workflowBulkService.terminateRemove(null, null, false);
} catch (ConstraintViolationException ex) {
assertEquals(1, ex.getConstraintViolations().size());
Set<String> messages = getConstraintViolationMessages(ex.getConstraintViolations());
assertTrue(messages.contains("WorkflowIds list cannot be null."));
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.List;

import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand Down Expand Up @@ -111,4 +112,33 @@ public BulkResponse terminate(
@RequestParam(value = "reason", required = false) String reason) {
return workflowBulkService.terminate(workflowIds, reason);
}

/**
* Delete the list of workflows.
*
* @param workflowIds - list of workflow Ids to be deleted
* @return bulk reponse object containing a list of successfully deleted workflows
*/
@DeleteMapping("/remove")
public BulkResponse deleteWorkflow(
@RequestBody List<String> workflowIds,
@RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false)
boolean archiveWorkflow) {
return workflowBulkService.deleteWorkflow(workflowIds, archiveWorkflow);
}

/**
* Terminate then delete the list of workflows.
*
* @param workflowIds - list of workflow Ids to be deleted
* @return bulk response object containing a list of successfully deleted workflows
*/
@DeleteMapping("/terminate-remove")
public BulkResponse terminateRemove(
@RequestBody List<String> workflowIds,
@RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false)
boolean archiveWorkflow,
@RequestParam(value = "reason", required = false) String reason) {
return workflowBulkService.terminateRemove(workflowIds, reason, archiveWorkflow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ public void terminate(
workflowService.terminateWorkflow(workflowId, reason);
}

@DeleteMapping("/{workflowId}/terminate-remove")
@Operation(summary = "Terminate workflow execution and remove the workflow from the system")
public void terminateRemove(
@PathVariable("workflowId") String workflowId,
@RequestParam(value = "reason", required = false) String reason,
@RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false)
boolean archiveWorkflow) {
workflowService.terminateRemove(workflowId, reason, archiveWorkflow);
}

@Operation(
summary = "Search for workflows based on payload and other parameters",
description =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ public void testTerminate() {
verify(mockWorkflowService, times(1)).terminateWorkflow(anyString(), anyString());
}

@Test
public void testTerminateRemove() {
workflowResource.terminateRemove("w123", "test", false);
verify(mockWorkflowService, times(1))
.terminateRemove(anyString(), anyString(), anyBoolean());
}

@Test
public void testSearch() {
workflowResource.search(0, 100, "asc", "*", "*");
Expand Down
17 changes: 17 additions & 0 deletions ui/src/data/bulkactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,20 @@ export const useBulkTerminateWithReasonAction = (callbacks) => {
});
}, callbacks);
};

export const useBulkDeleteAction = (callbacks) => {
const fetchContext = useFetchContext();

return useMutation((mutateParams) => {
const path = new Path("/workflow/bulk/remove");
path.search.append("archiveWorkflow", mutateParams.archiveWorkflow);

return fetchWithContext(path, fetchContext, {
method: "delete",
headers: {
"Content-Type": "application/json",
},
body: _.get(mutateParams, "body"),
});
}, callbacks);
};
26 changes: 25 additions & 1 deletion ui/src/pages/executions/BulkActionModule.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
useBulkPauseAction,
useBulkRetryAction,
useBulkTerminateWithReasonAction,
useBulkDeleteAction,
} from "../../data/bulkactions";

const useStyles = makeStyles({
Expand Down Expand Up @@ -59,6 +60,8 @@ export default function BulkActionModule({ selectedRows }) {
mutate: terminateWithReasonAction,
isLoading: terminateWithReasonLoading,
} = useBulkTerminateWithReasonAction({ onSuccess });
const { mutate: deleteAction, isLoading: deleteLoading } =
useBulkDeleteAction({ onSuccess });

const isLoading =
pauseLoading ||
Expand All @@ -67,7 +70,8 @@ export default function BulkActionModule({ selectedRows }) {
restartLatestLoading ||
retryLoading ||
terminateLoading ||
terminateWithReasonLoading;
terminateWithReasonLoading ||
deleteLoading;

function onSuccess(data, variables, context) {
const retval = {
Expand Down Expand Up @@ -133,6 +137,26 @@ export default function BulkActionModule({ selectedRows }) {
}
},
},
{
label: "Archive",
handler: () => {
const archiveWorkflow = "true";
deleteAction({
body: JSON.stringify(selectedIds),
archiveWorkflow,
});
},
},
{
label: "Delete",
handler: () => {
const archiveWorkflow = "false";
deleteAction({
body: JSON.stringify(selectedIds),
archiveWorkflow,
});
},
},
]}
>
Bulk Action
Expand Down
Loading