From 7f36cb9924da3ce9ebdb038a7308b10bef923be1 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Thu, 3 Feb 2022 08:16:05 +0100 Subject: [PATCH] feat(kafka-runner): add consumer on GlobalStateProcessor to be notified about every change --- .../kestra/runner/kafka/KafkaScheduler.java | 5 ++-- .../executors/ExecutorWorkerRunning.java | 2 +- .../kafka/streams/GlobalStateProcessor.java | 28 +++++++++++++++++-- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java index da66e1737ca..8d6e1344172 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/KafkaScheduler.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.Stores; @@ -138,7 +139,7 @@ public StreamsBuilder topology() { JsonSerde.of(Executor.class) ), kafkaAdminService.getTopicName(Executor.class), - Consumed.with(Serdes.String(), JsonSerde.of(Executor.class)), + Consumed.with(Serdes.String(), JsonSerde.of(Executor.class)).withName("GlobalStore.Executor"), () -> new GlobalStateProcessor<>(STATESTORE_EXECUTOR) ); @@ -150,7 +151,7 @@ public StreamsBuilder topology() { JsonSerde.of(Trigger.class) ), kafkaAdminService.getTopicName(Trigger.class), - Consumed.with(Serdes.String(), JsonSerde.of(Trigger.class)), + Consumed.with(Serdes.String(), JsonSerde.of(Trigger.class)).withName("GlobalStore.Trigger"), () -> new GlobalStateLockProcessor<>(STATESTORE_TRIGGER, triggerLock) ); diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java index cad48486180..f630dcd261e 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/executors/ExecutorWorkerRunning.java @@ -46,7 +46,7 @@ public StreamsBuilder topology() { JsonSerde.of(WorkerInstance.class) ), kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_EXECUTOR_WORKERINSTANCE), - Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("GlobalStore.ExecutorWorkerInstace"), + Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)).withName("GlobalStore.ExecutorWorkerInstance"), () -> new GlobalStateProcessor<>(WORKERINSTANCE_STATE_STORE_NAME) ); diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/GlobalStateProcessor.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/GlobalStateProcessor.java index e0b05cfe531..097547c69d3 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/GlobalStateProcessor.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/GlobalStateProcessor.java @@ -1,23 +1,36 @@ package io.kestra.runner.kafka.streams; +import com.google.common.collect.Streams; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; + @Slf4j public class GlobalStateProcessor implements Processor { private final String storeName; + private final Consumer> consumer; private KeyValueStore store; public GlobalStateProcessor(String storeName) { + this(storeName, null); + } + + public GlobalStateProcessor(String storeName, Consumer> consumer) { this.storeName = storeName; + this.consumer = consumer; } - @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { - this.store = (KeyValueStore) context.getStateStore(this.storeName); + this.store = context.getStateStore(this.storeName); + + this.send(); } @Override @@ -27,6 +40,17 @@ public void process(String key, T value) { } else { this.store.put(key, value); } + + this.send(); + } + + @SuppressWarnings("UnstableApiUsage") + private void send() { + if (consumer != null) { + try (KeyValueIterator all = this.store.all()) { + consumer.accept(Streams.stream(all).map(e -> e.value).collect(Collectors.toList())); + } + } } @Override