From b518a7b9d5fc08651bd774eceb9ee19f535de82c Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Mon, 27 Jan 2025 10:52:05 +0100 Subject: [PATCH] fix(webserver): ensure queues are not closed in nioEventLoop --- .../io/kestra/core/services/ExecutionLogService.java | 10 +++++++--- .../webserver/controllers/api/ExecutionController.java | 8 +++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/kestra/core/services/ExecutionLogService.java b/core/src/main/java/io/kestra/core/services/ExecutionLogService.java index 194813e1308..2d070a28fb2 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionLogService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionLogService.java @@ -11,6 +11,7 @@ import org.slf4j.event.Level; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Schedulers; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -40,6 +41,7 @@ public Flux> streamExecutionLogs(final String tenantId, final AtomicReference disposable = new AtomicReference<>(); return Flux.>create(emitter -> { + // send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs emitter.next(Event.of(LogEntry.builder().build()).id("start")); @@ -65,9 +67,11 @@ public Flux> streamExecutionLogs(final String tenantId, })); }, FluxSink.OverflowStrategy.BUFFER) .doFinally(ignored -> { - if (disposable.get() != null) { - disposable.get().run(); - } + Schedulers.boundedElastic().schedule(() -> { + if (disposable.get() != null) { + disposable.get().run(); + } + }); }); } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java index 132688e2ba4..a4c247df3da 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java @@ -1518,9 +1518,11 @@ public Flux> follow( cancel.set(receive); }, FluxSink.OverflowStrategy.BUFFER) .doFinally(ignored -> { - if (cancel.get() != null) { - cancel.get().run(); - } + Schedulers.boundedElastic().schedule(() -> { + if (cancel.get() != null) { + cancel.get().run(); + } + }); }); }