diff --git a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java index a063fdabd8b..3ed491b8093 100644 --- a/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java +++ b/core/src/main/java/io/kestra/core/metrics/MetricRegistry.java @@ -26,13 +26,13 @@ public class MetricRegistry { public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count"; public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration"; - public final static String KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count"; - public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count"; - public final static String KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration"; - public final static String KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count"; - public final static String KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count"; - public final static String KESTRA_EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count"; - public final static String METRIC_EXECUTOR_EXECUTION_DURATION = "executor.execution.duration"; + public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count"; + public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count"; + public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration"; + public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count"; + public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count"; + public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count"; + public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration"; public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count"; public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration"; @@ -51,6 +51,8 @@ public class MetricRegistry { public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration"; public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration"; + public final static String STREAMS_STATE_COUNT = "stream.state.count"; + public final static String TAG_TASK_TYPE = "task_type"; public final static String TAG_FLOW_ID = "flow_id"; public final static String TAG_NAMESPACE_ID = "namespace_id"; diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index b4421c287b7..f24dc0e585a 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -8,7 +8,6 @@ import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; -import io.kestra.core.models.tasks.DynamicTask; import io.kestra.core.models.tasks.FlowableTask; import io.kestra.core.models.tasks.ResolvedTask; import io.kestra.core.models.tasks.Task; @@ -118,7 +117,7 @@ public Execution onNexts(Flow flow, Execution execution, List nexts) { if (execution.getState().getCurrent() == State.Type.CREATED) { metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution)) + .counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(execution)) .increment(); flow.logger().info( @@ -132,7 +131,7 @@ public Execution onNexts(Flow flow, Execution execution, List nexts) { } metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution)) + .counter(MetricRegistry.EXECUTOR_TASKRUN_NEXT_COUNT, metricRegistry.tags(execution)) .increment(nexts.size()); return newExecution; @@ -201,7 +200,7 @@ private Optional childWorkerTaskTypeToWorkerTask( .peek(workerTaskResult -> { metricRegistry .counter( - MetricRegistry.KESTRA_EXECUTOR_WORKERTASKRESULT_COUNT, + MetricRegistry.EXECUTOR_WORKERTASKRESULT_COUNT, metricRegistry.tags(workerTaskResult) ) .increment(); @@ -296,11 +295,11 @@ private Executor onEnd(Executor executor) { } metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution)) + .counter(MetricRegistry.EXECUTOR_EXECUTION_END_COUNT, metricRegistry.tags(newExecution)) .increment(); metricRegistry - .timer(MetricRegistry.METRIC_EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution)) + .timer(MetricRegistry.EXECUTOR_EXECUTION_DURATION, metricRegistry.tags(newExecution)) .record(newExecution.getState().getDuration()); return executor.withExecution(newExecution, "onEnd"); @@ -481,7 +480,7 @@ private Executor handleRestart(Executor executor) { } metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution())) + .counter(MetricRegistry.EXECUTOR_EXECUTION_STARTED_COUNT, metricRegistry.tags(executor.getExecution())) .increment(); executor.getFlow().logger().info( diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java index db444bf517d..0332b746a60 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamService.java @@ -7,6 +7,7 @@ import io.micronaut.context.annotation.Value; import io.micronaut.context.event.ApplicationEventPublisher; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -115,11 +116,23 @@ public KafkaStreamService.Stream of(Class clientId, Class groupId, Topolog public static class Stream extends KafkaStreams { private final Logger logger; + + private final MetricRegistry meterRegistry; + + private final String[] tags; + private KafkaStreamsMetrics metrics; + private boolean hasStarted = false; private Stream(Topology topology, Properties props, MetricRegistry meterRegistry, Logger logger) { super(topology, props); + this.meterRegistry = meterRegistry; + + tags = new String[]{ + "client_class_id", + (String) props.get(CommonClientConfigs.CLIENT_ID_CONFIG) + }; if (meterRegistry != null) { metrics = new KafkaStreamsMetrics( @@ -148,6 +161,18 @@ public synchronized void start(final KafkaStreams.StateListener listener) throws this.setGlobalStateRestoreListener(new StateRestoreLoggerListeners(logger)); this.setStateListener((newState, oldState) -> { + meterRegistry.gauge( + MetricRegistry.STREAMS_STATE_COUNT, + 0, + ArrayUtils.addAll(tags, "state", oldState.name()) + ); + + meterRegistry.gauge( + MetricRegistry.STREAMS_STATE_COUNT, + 1, + ArrayUtils.addAll(tags, "state", newState.name()) + ); + if (newState == State.RUNNING) { this.hasStarted = true; } diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java index 7fa7d6c6277..096bab7b8f4 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/ExecutorJoinerTransformer.java @@ -86,14 +86,14 @@ public Executor transform(final String key, final Executor value) { if (workerTaskResult.getTaskRun().getState().isTerninated()) { metricRegistry .counter( - MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT, + MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(workerTaskResult) ) .increment(); metricRegistry .timer( - MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION, + MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(workerTaskResult) ) .record(workerTaskResult.getTaskRun().getState().getDuration()); diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java index e26f630004b..9f779e4ebc4 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemoryExecutor.java @@ -299,11 +299,11 @@ private void workerTaskResultQueue(WorkerTaskResult message) { // send metrics on terminated if (message.getTaskRun().getState().isTerninated()) { metricRegistry - .counter(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message)) + .counter(MetricRegistry.EXECUTOR_TASKRUN_ENDED_COUNT, metricRegistry.tags(message)) .increment(); metricRegistry - .timer(MetricRegistry.KESTRA_EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message)) + .timer(MetricRegistry.EXECUTOR_TASKRUN_ENDED_DURATION, metricRegistry.tags(message)) .record(message.getTaskRun().getState().getDuration()); }