From bb6a425ff79ce0308eeec7144e2259693a3fe1ae Mon Sep 17 00:00:00 2001 From: YannC <37600690+Skraye@users.noreply.github.com> Date: Mon, 27 Feb 2023 22:26:37 +0100 Subject: [PATCH] feat(core): Implement bulk delete/disable for flows & templates (#1008) --- .../io/kestra/core/models/flows/Flow.java | 1 + .../core/schedulers/DefaultScheduler.java | 2 +- .../io/kestra/core/services/FlowService.java | 10 ++ ui/src/components/flows/Flows.vue | 66 ++++++++++ ui/src/components/templates/Templates.vue | 37 +++++- ui/src/stores/flow.js | 12 ++ ui/src/stores/template.js | 6 + ui/src/translations.json | 22 ++++ .../controllers/ExecutionController.java | 17 +-- .../webserver/controllers/FlowController.java | 120 ++++++++++++++++-- .../controllers/TemplateController.java | 38 ++++++ .../responses/BulkErrorResponse.java | 16 +++ .../webserver/responses/BulkResponse.java | 12 ++ .../controllers/FlowControllerTest.java | 111 +++++++++++++++- .../controllers/TemplateControllerTest.java | 80 ++++++++++++ 15 files changed, 522 insertions(+), 28 deletions(-) create mode 100644 webserver/src/main/java/io/kestra/webserver/responses/BulkErrorResponse.java create mode 100644 webserver/src/main/java/io/kestra/webserver/responses/BulkResponse.java diff --git a/core/src/main/java/io/kestra/core/models/flows/Flow.java b/core/src/main/java/io/kestra/core/models/flows/Flow.java index edcf3b8c37e..af0aae6ec4a 100644 --- a/core/src/main/java/io/kestra/core/models/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/models/flows/Flow.java @@ -30,6 +30,7 @@ import javax.validation.Valid; import javax.validation.constraints.*; + @SuperBuilder(toBuilder = true) @Getter @AllArgsConstructor diff --git a/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java b/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java index 7eabf6f6594..7166786e45e 100644 --- a/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java +++ b/core/src/main/java/io/kestra/core/schedulers/DefaultScheduler.java @@ -49,7 +49,7 @@ public void run() { }); triggerQueue.receive(trigger -> { - if (trigger.getExecutionId() != null) { + if (trigger != null && trigger.getExecutionId() != null) { this.watchingTrigger.put(trigger.getExecutionId(), trigger); } }); diff --git a/core/src/main/java/io/kestra/core/services/FlowService.java b/core/src/main/java/io/kestra/core/services/FlowService.java index 37f298fe7d8..9f9d6c8898c 100644 --- a/core/src/main/java/io/kestra/core/services/FlowService.java +++ b/core/src/main/java/io/kestra/core/services/FlowService.java @@ -15,6 +15,7 @@ import io.kestra.core.utils.ListUtils; import java.util.*; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import jakarta.inject.Inject; @@ -215,6 +216,15 @@ public static String cleanupSource(String source) { return source.replaceFirst("(?m)^revision: \\d+\n?",""); } + public static String injectDisabledTrue(String source) { + Pattern p = Pattern.compile("^disabled\\s*:\\s*false\\s*", Pattern.MULTILINE); + if (p.matcher(source).find()) { + return p.matcher(source).replaceAll("disabled: true\n"); + } + + return source + "\ndisabled: true"; + } + @AllArgsConstructor @Getter private static class FlowWithTrigger { diff --git a/ui/src/components/flows/Flows.vue b/ui/src/components/flows/Flows.vue index 1ea200bef5d..2a346785229 100644 --- a/ui/src/components/flows/Flows.vue +++ b/ui/src/components/flows/Flows.vue @@ -109,6 +109,12 @@ {{ $t('export') }} + + {{ $t('delete') }} + + + {{ $t('disable') }} +
  • @@ -152,6 +158,8 @@ import Plus from "vue-material-design-icons/Plus.vue"; import TextBoxSearch from "vue-material-design-icons/TextBoxSearch.vue"; import Download from "vue-material-design-icons/Download.vue"; + import TrashCan from "vue-material-design-icons/TrashCan.vue"; + import FileDocumentRemoveOutline from "vue-material-design-icons/FileDocumentRemoveOutline.vue"; diff --git a/ui/src/stores/flow.js b/ui/src/stores/flow.js index 14bd56393c5..5fe9cc015dd 100644 --- a/ui/src/stores/flow.js +++ b/ui/src/stores/flow.js @@ -180,6 +180,18 @@ export default { }, importFlows(_, options) { return this.$http.post("/api/v1/flows/import", options, {headers: {"Content-Type": "multipart/form-data"}}) + }, + disableFlowByIds(_, options) { + return this.$http.post("/api/v1/flows/disable/by-ids", options.ids) + }, + disableFlowByQuery(_, options) { + return this.$http.post("/api/v1/flows/disable/by-query", options, {params: options}) + }, + deleteFlowByIds(_, options) { + return this.$http.delete("/api/v1/flows/delete/by-ids", {data: options.ids}) + }, + deleteFlowByQuery(_, options) { + return this.$http.delete("/api/v1/flows/delete/by-query", options, {params: options}) } }, mutations: { diff --git a/ui/src/stores/template.js b/ui/src/stores/template.js index a48c82e4053..42088ea63c6 100644 --- a/ui/src/stores/template.js +++ b/ui/src/stores/template.js @@ -75,6 +75,12 @@ export default { }, importTemplates(_, options) { return this.$http.post("/api/v1/templates/import", options, {headers: {"Content-Type": "multipart/form-data"}}) + }, + deleteTemplateByIds(_, options) { + return this.$http.delete("/api/v1/templates/delete/by-ids", {data: options.ids}) + }, + deleteTemplateByQuery(_, options) { + return this.$http.delete("/api/v1/templates/delete/by-query", options, {params: options}) } }, mutations: { diff --git a/ui/src/translations.json b/ui/src/translations.json index 1e15d07e223..cf99ba882d8 100644 --- a/ui/src/translations.json +++ b/ui/src/translations.json @@ -282,6 +282,13 @@ "flows exported": "Flows exported", "export all flows": "Export all flows", "import": "Import", + "disable": "Disable", + "template delete": "Are you sure you want to delete {templateCount} template(s)?", + "flow delete": "Are you sure you want to delete {flowCount} flow(s)?", + "templates deleted": "{count} Template(s) deleted", + "flows deleted": "{count} Flow(s) deleted", + "flow disable": "Are you sure you want to disable {flowCount} flow(s)?", + "flows disabled": "{count} Flow(s) disabled", "dependencies": "Dependencies", "see dependencies": "See dependencies", "dependencies missing acls": "No permissions on this flow", @@ -571,6 +578,13 @@ "flows exported": "Flows exportés", "export all flows": "Exporter tous les flows", "import": "Importer", + "disable": "Désactiver", + "template delete": "Êtes vous sûr de vouloir supprimer {templateCount} template(s)?", + "flow delete": "Êtes vous sûr de vouloir supprimer {flowCount} flow(s)?", + "templates deleted": "{count} Template(s) supprimé(s)", + "flows deleted": "{count} Flow(s) supprimé(s)", + "flow disable": "Êtes vous sûr de vouloir désactiver {flowCount} flow(s)?", + "flows disabled": "{count} Flow(s) désactivé(s)", "dependencies": "Dépendances", "see dependencies": "Voir les dépendances", "dependencies missing acls": "Aucune permission sur ce flow", @@ -859,6 +873,14 @@ "flows exported": "Flows exportiert", "export all flows": "Exportieren Sie alle Flows", "import": "Importieren", + "disable": "Deaktivieren", + "template delete": "Sind Sie sicher, dass Sie {templateCount} Template(n) löschen möchten?", + "flow delete": "Sind Sie sicher, dass Sie {flowCount} Flows löschen möchten?", + "templates deleted": "Template(s) gelöscht", + "flows deleted": "Flow(s) gelöscht", + "flow disable": "Sind Sie sicher, dass Sie {flowCount} Flows deaktivieren möchten?", + "flows disabled": "Flow(s) deaktiviert", + "import": "Importieren", "dependencies": "Abhängigkeiten", "see dependencies": "Siehe Abhängigkeiten", "dependencies missing acls": "Keinen Zugriff für diesen Flow", diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java index a829850f862..9f8f470de00 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/ExecutionController.java @@ -5,6 +5,8 @@ import io.kestra.core.models.validations.ManualConstraintViolation; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; +import io.kestra.webserver.responses.BulkErrorResponse; +import io.kestra.webserver.responses.BulkResponse; import io.micronaut.context.annotation.Value; import io.micronaut.context.event.ApplicationEventPublisher; import io.micronaut.core.convert.format.Format; @@ -109,21 +111,6 @@ public class ExecutionController { @Inject private RunContextFactory runContextFactory; - @SuperBuilder - @Getter - @NoArgsConstructor - public static class BulkResponse { - Integer count; - } - - @SuperBuilder - @Getter - @NoArgsConstructor - public static class BulkErrorResponse { - String message; - Set> invalids; - } - @ExecuteOn(TaskExecutors.IO) @Get(uri = "executions/search", produces = MediaType.TEXT_JSON) @Operation(tags = {"Executions"}, summary = "Search for executions") diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java b/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java index 1fa6cf4b77f..2e695bfc8cb 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/FlowController.java @@ -15,9 +15,11 @@ import io.kestra.core.repositories.FlowRepositoryInterface; import io.kestra.core.repositories.FlowTopologyRepositoryInterface; import io.kestra.core.serializers.YamlFlowParser; +import io.kestra.core.services.FlowService; import io.kestra.core.services.TaskDefaultService; import io.kestra.core.topologies.FlowTopologyService; import io.kestra.webserver.controllers.domain.IdWithNamespace; +import io.kestra.webserver.responses.BulkResponse; import io.kestra.webserver.responses.PagedResults; import io.kestra.webserver.utils.PageableUtils; import io.kestra.webserver.utils.RequestUtils; @@ -112,7 +114,7 @@ public Flow index( return source ? flowRepository .findByIdWithSource(namespace, id) - .orElse(null): + .orElse(null) : flowRepository .findById(namespace, id) .orElse(null); @@ -308,13 +310,13 @@ private List updateCompleteNamespace(String namespace, List !ids.contains(flow.getId())) .map(flow -> { flowRepository.delete(flow); - return FlowWithSource.of(flow, flow.generateSource()); + return FlowWithSource.of(flow, flow.generateSource()); }) .collect(Collectors.toList()); } // update or create flows - List updatedOrCreated = IntStream.range(0, flows.size()) + List updatedOrCreated = IntStream.range(0, flows.size()) .mapToObj(index -> { Flow flow = flows.get(index); String source = sources.get(index); @@ -438,7 +440,7 @@ public FlowTopologyGraph dependencies( @Post(uri = "validate", produces = MediaType.TEXT_JSON, consumes = MediaType.APPLICATION_YAML) @Operation(tags = {"Flows"}, summary = "Validate a list of flows") public List validateFlows( - @Parameter(description= "A list of flows") @Body String flows + @Parameter(description = "A list of flows") @Body String flows ) { AtomicInteger index = new AtomicInteger(0); return Stream @@ -455,7 +457,7 @@ public List validateFlows( modelValidator.validate(taskDefaultService.injectDefaults(flowParse)); - } catch (ConstraintViolationException e){ + } catch (ConstraintViolationException e) { validateConstraintViolationBuilder.constraints(e.getMessage()); } return validateConstraintViolationBuilder.build(); @@ -497,10 +499,10 @@ public HttpResponse exportByIds( } private static byte[] zipFlows(List flows) throws IOException { - try(ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ZipOutputStream archive = new ZipOutputStream(bos)) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ZipOutputStream archive = new ZipOutputStream(bos)) { - for(var flow : flows) { + for (var flow : flows) { var zipEntry = new ZipEntry(flow.getNamespace() + "." + flow.getId() + ".yml"); archive.putNextEntry(zipEntry); archive.write(flow.getSource().getBytes()); @@ -512,6 +514,108 @@ private static byte[] zipFlows(List flows) throws IOException { } } + @ExecuteOn(TaskExecutors.IO) + @Delete(uri = "/delete/by-query") + @Operation( + tags = {"Flows"}, + summary = "Delete flows returned by the query parameters." + ) + public HttpResponse deleteByQuery( + @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query, + @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace, + @Parameter(description = "A labels filter") @Nullable @QueryValue List labels + ) { + List list = flowRepository + .findWithSource(query, namespace, RequestUtils.toMap(labels)) + .stream() + .peek(flowRepository::delete) + .collect(Collectors.toList()); + + return HttpResponse.ok(BulkResponse.builder().count(list.size()).build()); + } + + @ExecuteOn(TaskExecutors.IO) + @Delete(uri = "/delete/by-ids") + @Operation( + tags = {"Flows"}, + summary = "Delete flows by their IDs." + ) + public HttpResponse deleteByIds( + @Parameter(description = "A list of tuple flow ID and namespace as flow identifiers") @Body List ids + ) { + List list = ids + .stream() + .map(id -> flowRepository.findByIdWithSource(id.getNamespace(), id.getId()).orElseThrow()) + .peek(flowRepository::delete) + .collect(Collectors.toList()); + + return HttpResponse.ok(BulkResponse.builder().count(list.size()).build()); + } + + @ExecuteOn(TaskExecutors.IO) + @Post(uri = "/disable/by-query") + @Operation( + tags = {"Flows"}, + summary = "Disable flows returned by the query parameters." + ) + public HttpResponse disableByQuery( + @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query, + @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace, + @Parameter(description = "A labels filter") @Nullable @QueryValue List labels + ) { + List list = flowRepository + .findWithSource(query, namespace, RequestUtils.toMap(labels)) + .stream() + .filter(flowWithSource -> !flowWithSource.isDisabled()) + .peek(flow -> { + FlowWithSource flowUpdated = flow.toBuilder() + .disabled(true) + .source(FlowService.injectDisabledTrue(flow.getSource())) + .build(); + + flowRepository.update( + flowUpdated, + flow, + flowUpdated.getSource(), + taskDefaultService.injectDefaults(flowUpdated) + ); + }) + .collect(Collectors.toList()); + + return HttpResponse.ok(BulkResponse.builder().count(list.size()).build()); + } + + @ExecuteOn(TaskExecutors.IO) + @Post(uri = "/disable/by-ids") + @Operation( + tags = {"Flows"}, + summary = "Disable flows by their IDs." + ) + public HttpResponse disableByIds( + @Parameter(description = "A list of tuple flow ID and namespace as flow identifiers") @Body List ids + ) { + List list = ids + .stream() + .map(id -> flowRepository.findByIdWithSource(id.getNamespace(), id.getId()).orElseThrow()) + .filter(flowWithSource -> !flowWithSource.isDisabled()) + .peek(flow -> { + FlowWithSource flowUpdated = flow.toBuilder() + .disabled(true) + .source(FlowService.injectDisabledTrue(flow.getSource())) + .build(); + + flowRepository.update( + flowUpdated, + flow, + flowUpdated.getSource(), + taskDefaultService.injectDefaults(flowUpdated) + ); + }) + .collect(Collectors.toList()); + + return HttpResponse.ok(BulkResponse.builder().count(list.size()).build()); + } + @ExecuteOn(TaskExecutors.IO) @Post(uri = "/import", consumes = MediaType.MULTIPART_FORM_DATA) @Operation( diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java b/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java index d5cabea45c3..7a3d7dac442 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/TemplateController.java @@ -7,6 +7,7 @@ import io.kestra.core.repositories.TemplateRepositoryInterface; import io.kestra.core.serializers.YamlFlowParser; import io.kestra.webserver.controllers.domain.IdWithNamespace; +import io.kestra.webserver.responses.BulkResponse; import io.kestra.webserver.responses.PagedResults; import io.kestra.webserver.utils.PageableUtils; import io.micronaut.core.annotation.Nullable; @@ -278,6 +279,43 @@ public HttpResponse exportByIds( return HttpResponse.ok(bytes).header("Content-Disposition", "attachment; filename=\"templates.zip\""); } + @ExecuteOn(TaskExecutors.IO) + @Delete(uri = "/delete/by-query") + @Operation( + tags = {"Flows"}, + summary = "Delete flows returned by the query parameters." + ) + public HttpResponse deleteByQuery( + @Parameter(description = "A string filter") @Nullable @QueryValue(value = "q") String query, + @Parameter(description = "A namespace filter prefix") @Nullable @QueryValue String namespace + ){ + List