Skip to content

Commit

Permalink
Merge pull request #356 from conductor-oss/feature/java-spring-client…
Browse files Browse the repository at this point in the history
…-metrics-collector

Add support for configurable MetricsCollector in Spring client
  • Loading branch information
jmigueprieto authored Jan 7, 2025
2 parents 12b483c + a43dbe4 commit fa9420f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -30,6 +31,7 @@
import com.netflix.conductor.client.http.ConductorClient;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.http.WorkflowClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.sdk.workflow.executor.WorkflowExecutor;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
Expand Down Expand Up @@ -79,7 +81,8 @@ public AnnotatedWorkerExecutor annotatedWorkerExecutor(TaskClient taskClient) {
public TaskRunnerConfigurer taskRunnerConfigurer(Environment env,
TaskClient taskClient,
ClientProperties clientProperties,
List<Worker> workers) {
List<Worker> workers,
Optional<MetricsCollector> metricsCollector) {
Map<String, Integer> taskThreadCount = new HashMap<>();
for (Worker worker : workers) {
String key = "conductor.worker." + worker.getTaskDefName() + ".threadCount";
Expand All @@ -94,15 +97,16 @@ public TaskRunnerConfigurer taskRunnerConfigurer(Environment env,
clientProperties.setTaskThreadCount(taskThreadCount);
}

return new TaskRunnerConfigurer.Builder(taskClient, workers)
TaskRunnerConfigurer.Builder builder = new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(clientProperties.getTaskThreadCount())
.withThreadCount(clientProperties.getThreadCount())
.withSleepWhenRetry((int) clientProperties.getSleepWhenRetryDuration().toMillis())
.withUpdateRetryCount(clientProperties.getUpdateRetryCount())
.withTaskToDomain(clientProperties.getTaskToDomain())
.withShutdownGracePeriodSeconds(clientProperties.getShutdownGracePeriodSeconds())
.withTaskPollTimeout(clientProperties.getTaskPollTimeout())
.build();
.withTaskPollTimeout(clientProperties.getTaskPollTimeout());
metricsCollector.ifPresent(builder::withMetricsCollector);
return builder.build();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.stereotype.Component;

import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.sdk.workflow.executor.task.AnnotatedWorkerExecutor;
import com.netflix.conductor.sdk.workflow.executor.task.WorkerConfiguration;

Expand All @@ -44,8 +45,12 @@ public void onApplicationEvent(ContextRefreshedEvent refreshedEvent) {
ApplicationContext applicationContext = refreshedEvent.getApplicationContext();
Environment environment = applicationContext.getEnvironment();
WorkerConfiguration configuration = new SpringWorkerConfiguration(environment);
AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(taskClient, configuration);

AnnotatedWorkerExecutor annotatedWorkerExecutor = new AnnotatedWorkerExecutor(taskClient, configuration);
String[] beanNames = applicationContext.getBeanNamesForType(MetricsCollector.class);
if (beanNames.length > 0) {
annotatedWorkerExecutor.setMetricsCollector(applicationContext.getBean(MetricsCollector.class));
}
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Component.class);
beans.values().forEach(annotatedWorkerExecutor::addBean);
annotatedWorkerExecutor.startPolling();
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=4.0.2-beta
version=4.0.4
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.netflix.conductor.client.automator.TaskRunnerConfigurer;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

Expand All @@ -50,6 +51,8 @@ public class AnnotatedWorkerExecutor {

protected Map<String, String> workerDomains = new HashMap<>();

private MetricsCollector metricsCollector;

private static final Set<String> scannedPackages = new HashSet<>();

private final WorkerConfiguration workerConfiguration;
Expand Down Expand Up @@ -197,12 +200,14 @@ public void startPolling() {
LOGGER.info("Starting workers with threadCount {}", workerToThreadCount);
LOGGER.info("Worker domains {}", workerDomains);

taskRunner =
new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(workerToThreadCount)
.withTaskToDomain(workerDomains)
.build();
var builder = new TaskRunnerConfigurer.Builder(taskClient, workers)
.withTaskThreadCount(workerToThreadCount)
.withTaskToDomain(workerDomains);
if (metricsCollector != null) {
builder.withMetricsCollector(metricsCollector);
}

taskRunner = builder.build();
taskRunner.init();
}

Expand All @@ -215,4 +220,12 @@ List<Worker> getWorkers() {
TaskRunnerConfigurer getTaskRunner() {
return taskRunner;
}

public MetricsCollector getMetricsCollector() {
return metricsCollector;
}

public void setMetricsCollector(MetricsCollector metricsCollector) {
this.metricsCollector = metricsCollector;
}
}

0 comments on commit fa9420f

Please sign in to comment.