From e660ed843e7455eb3854b35aa95c15a7f4043ef6 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Mon, 31 Jan 2022 18:46:29 +0100 Subject: [PATCH] feat(kafka-runner): create state dir early to avoid concurrent access --- .../java/io/kestra/runner/kafka/KafkaExecutor.java | 1 - .../runner/kafka/services/KafkaStreamService.java | 12 +++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java index 64902a86f5a..e4549a81a73 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaExecutor.java @@ -66,7 +66,6 @@ public void run() { this.streams = this.kafkaExecutors .stream() - .parallel() .map(executor -> { Properties properties = new Properties(); // build diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java index 28b54894212..359e9e6b5e6 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java @@ -22,6 +22,7 @@ import io.kestra.runner.kafka.configs.StreamDefaultsConfig; import org.slf4j.Logger; +import java.io.File; import java.time.Duration; import java.util.List; import java.util.Properties; @@ -97,8 +98,17 @@ public KafkaStreamService.Stream of(Class clientId, Class groupId, Topolog ); } + if (properties.containsKey(StreamsConfig.STATE_DIR_CONFIG)) { + File stateDir = new File((String) properties.get(StreamsConfig.STATE_DIR_CONFIG)); + + if (!stateDir.exists()) { + //noinspection ResultOfMethodCallIgnored + stateDir.mkdirs(); + } + } + Stream stream = new Stream(topology, properties, metricsEnabled ? metricRegistry : null, logger); - eventPublisher.publishEvent(new KafkaStreamEndpoint.Event(clientId.getName(), stream)); + eventPublisher.publishEventAsync(new KafkaStreamEndpoint.Event(clientId.getName(), stream)); return stream; }