Skip to content

Commit

Permalink
ONECOND-2342
Browse files Browse the repository at this point in the history
  • Loading branch information
sangeetanadgir committed Dec 19, 2024
1 parent 617d602 commit f13e6d8
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.conductor.core.events.EventProcessor;
import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider;
import com.netflix.conductor.core.execution.WorkflowSweeper;
import com.netflix.conductor.core.execution.alerts.AlertScheduler;
import com.netflix.conductor.core.execution.appconfig.cache.AppConfig;
import com.netflix.conductor.core.execution.appconfig.cache.PriorityConfig;
import com.netflix.conductor.core.execution.batch.BatchSweeper;
Expand Down Expand Up @@ -64,6 +65,7 @@ protected void configure() {
bind(SetVariable.class).asEagerSingleton();
bind(AppConfig.class).asEagerSingleton();
bind(PriorityConfig.class).asEagerSingleton();
bind(AlertScheduler.class).asEagerSingleton();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.netflix.conductor.core.execution.alerts;

import com.netflix.conductor.dao.ExecutionDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.Map;


public class AlertProcessor {
private static final Logger logger = LoggerFactory.getLogger(AlertProcessor.class);

private ExecutionDAO edao;

@Inject
public AlertProcessor(ExecutionDAO edao) {
this.edao = edao;
}

public void processAlerts() {
logger.info("Fetching grouped alerts from alerts table...");
Map<Integer, Integer> groupedAlerts = edao.getGroupedAlerts();

groupedAlerts.forEach((alertLookupId, alertCount) -> {
Integer registryAlertCount = edao.getAlertCountFromRegistry(alertLookupId);
if (registryAlertCount != null && alertCount > registryAlertCount) {
logger.info("Alert threshold exceeded for lookup ID: {}. Count: {}, Threshold: {}",
alertLookupId, alertCount, registryAlertCount);
notifyService(alertLookupId, alertCount);
}
});
}

private void notifyService(Integer alertLookupId, int alertCount) {
logger.info("Notifying service for alertLookupId: {}, alertCount: {}", alertLookupId, alertCount);
// Add logic to call the notify service
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.netflix.conductor.core.execution.alerts;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AlertScheduler {
private static final Logger logger = LoggerFactory.getLogger(AlertScheduler.class);

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final AlertProcessor alertProcessor;

@Inject
public AlertScheduler(AlertProcessor alertProcessor) {
this.alertProcessor = alertProcessor;
}

public void start() {
logger.info("Starting AlertScheduler to process alerts every 10 minutes...");
scheduler.scheduleAtFixedRate(() -> {
try {
alertProcessor.processAlerts();
} catch (Exception e) {
logger.error("Error processing alerts", e);
}
}, 0, 10, TimeUnit.MINUTES);
}

public void stop() {
logger.info("Stopping AlertScheduler...");
scheduler.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -341,6 +342,10 @@ default List<String> getRunningWorkflowByName(String workflowName) {

void addAlert(Alert alert);

Integer getAlertCountFromRegistry(Integer lookupId);

Map<Integer, Integer> getGroupedAlerts();

List<WorkflowError> searchWorkflowErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry);

List<WorkflowErrorRegistry> searchWorkflowErrorRegistryList(WorkflowErrorRegistry workflowErrorRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,15 @@ public void addErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry){}
@Override
public void addAlert(Alert alert){}

public Integer getAlertCountFromRegistry(Integer lookupId) {
return null;
}


public Map<Integer, Integer> getGroupedAlerts() {
return null;
}

@Override
public List<WorkflowError> searchWorkflowErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry){
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,14 @@ public void addErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry) {

public void addAlert(Alert alert){}

public Integer getAlertCountFromRegistry(Integer lookupId) {
return null;
}


public Map<Integer, Integer> getGroupedAlerts() {
return null;
}
public List<WorkflowError> searchWorkflowErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,29 @@ public ArrayList<AlertRegistry> apply(ResultSet rs) throws SQLException {
}
}

public Integer getAlertCountFromRegistry(Integer lookupId) {
String SQL = "SELECT alert_count FROM alert_registry WHERE id = ?";
return queryWithTransaction(SQL, q -> q.addParameter(lookupId).executeAndFetch(rs -> {
if (rs.next()) {
return rs.getInt("alert_count");
}
return null;
}));
}


public Map<Integer, Integer> getGroupedAlerts() {
String SQL = "SELECT alert_lookup_id, COUNT(*) AS count FROM alerts GROUP BY alert_lookup_id";
return queryWithTransaction(SQL, q -> q.executeAndFetch(rs -> {
Map<Integer, Integer> groupedAlerts = new HashMap<>();
while (rs.next()) {
groupedAlerts.put(rs.getInt("alert_lookup_id"), rs.getInt("count"));
}
return groupedAlerts;
}));
}


public List<WorkflowError> searchWorkflowErrorRegistry(WorkflowErrorRegistry workflowErrorRegistryEntry) {
StringBuilder SQL = new StringBuilder("SELECT meta_error_registry.isRequiredInReporting, meta_error_registry.id, meta_error_registry.lookup,COUNT(workflow_error_registry.id) AS numberOfErrors FROM workflow_error_registry \n" +
"LEFT JOIN meta_error_registry ON workflow_error_registry.error_lookup_id = meta_error_registry.id \n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,15 @@ public void addErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry) {

public void addAlert(Alert alert){}

public Integer getAlertCountFromRegistry(Integer lookupId) {
return null;
}


public Map<Integer, Integer> getGroupedAlerts() {
return null;
}

public List<WorkflowError> searchWorkflowErrorRegistry(WorkflowErrorRegistry workflowErrorRegistry){
return null;
}
Expand Down

0 comments on commit f13e6d8

Please sign in to comment.