Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement removeWorkflow for postgres index
Browse files Browse the repository at this point in the history
bjpirt authored and denniscodes committed Mar 5, 2024

Unverified

This user has not yet uploaded their public signing key.
1 parent 080c0eb commit 599cb7d
Showing 3 changed files with 108 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -33,6 +33,12 @@ public class PostgresProperties {

public boolean allowJsonQueries = true;

/** The maximum number of threads allowed in the async pool */
private int asyncMaxPoolSize = 12;

/** The size of the queue used for holding async indexing tasks */
private int asyncWorkerQueueSize = 100;

public Duration getTaskDefCacheRefreshInterval() {
return taskDefCacheRefreshInterval;
}
@@ -72,4 +78,20 @@ public boolean getAllowJsonQueries() {
public void setAllowJsonQueries(boolean allowJsonQueries) {
this.allowJsonQueries = allowJsonQueries;
}

public int getAsyncWorkerQueueSize() {
return asyncWorkerQueueSize;
}

public void setAsyncWorkerQueueSize(int asyncWorkerQueueSize) {
this.asyncWorkerQueueSize = asyncWorkerQueueSize;
}

public int getAsyncMaxPoolSize() {
return asyncMaxPoolSize;
}

public void setAsyncMaxPoolSize(int asyncMaxPoolSize) {
this.asyncMaxPoolSize = asyncMaxPoolSize;
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,10 @@
import java.time.temporal.TemporalAccessor;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

@@ -30,6 +34,7 @@
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.postgres.config.PostgresProperties;
import com.netflix.conductor.postgres.util.PostgresIndexQueryBuilder;

@@ -38,6 +43,10 @@
public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO {

private final PostgresProperties properties;
private final ExecutorService executorService;

private static final int CORE_POOL_SIZE = 6;
private static final long KEEP_ALIVE_TIME = 1L;

public PostgresIndexDAO(
RetryTemplate retryTemplate,
@@ -46,6 +55,25 @@ public PostgresIndexDAO(
PostgresProperties properties) {
super(retryTemplate, objectMapper, dataSource);
this.properties = properties;

int maximumPoolSize = properties.getAsyncMaxPoolSize();
int workerQueueSize = properties.getAsyncWorkerQueueSize();

// Set up a workerpool for performing async operations.
this.executorService =
new ThreadPoolExecutor(
CORE_POOL_SIZE,
maximumPoolSize,
KEEP_ALIVE_TIME,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(workerQueueSize),
(runnable, executor) -> {
logger.warn(
"Request {} to async dao discarded in executor {}",
runnable,
executor);
Monitors.recordDiscardedIndexingCount("indexQueue");
});
}

@Override
@@ -208,13 +236,14 @@ public SearchResult<String> searchTasks(

@Override
public void removeWorkflow(String workflowId) {
logger.info("removeWorkflow is not supported for postgres indexing");
String REMOVE_WORKFLOW_SQL = "DELETE FROM workflow_index WHERE workflow_id = ?";

queryWithTransaction(REMOVE_WORKFLOW_SQL, q -> q.addParameter(workflowId).executeUpdate());
}

@Override
public CompletableFuture<Void> asyncRemoveWorkflow(String workflowId) {
logger.info("asyncRemoveWorkflow is not supported for postgres indexing");
return CompletableFuture.completedFuture(null);
return CompletableFuture.runAsync(() -> removeWorkflow(workflowId), executorService);
}

@Override
@@ -231,13 +260,17 @@ public CompletableFuture<Void> asyncUpdateWorkflow(

@Override
public void removeTask(String workflowId, String taskId) {
logger.info("removeTask is not supported for postgres indexing");
String REMOVE_TASK_SQL =
"WITH task_delete AS (DELETE FROM task_index WHERE task_id = ?)"
+ "DELETE FROM task_execution_logs WHERE task_id =?";

queryWithTransaction(
REMOVE_TASK_SQL, q -> q.addParameter(taskId).addParameter(taskId).executeUpdate());
}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
logger.info("asyncRemoveTask is not supported for postgres indexing");
return CompletableFuture.completedFuture(null);
return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService);
}

@Override
Original file line number Diff line number Diff line change
@@ -403,4 +403,51 @@ public void testGetTaskExecutionLogs() throws SQLException {
assertEquals(logs.get(1).getLog(), records.get(1).getLog());
assertEquals(logs.get(1).getCreatedTime(), 1675845987000L);
}

@Test
public void testRemoveWorkflow() throws SQLException {
String workflowId = UUID.randomUUID().toString();
WorkflowSummary wfs = getMockWorkflowSummary(workflowId);
indexDAO.indexWorkflow(wfs);

List<Map<String, Object>> workflow_records =
queryDb("SELECT * FROM workflow_index WHERE workflow_id = '" + workflowId + "'");
assertEquals("Workflow index record was not created", 1, workflow_records.size());

indexDAO.removeWorkflow(workflowId);

workflow_records =
queryDb("SELECT * FROM workflow_index WHERE workflow_id = '" + workflowId + "'");
assertEquals("Workflow index record was not deleted", 0, workflow_records.size());
}

@Test
public void testRemoveTask() throws SQLException {
String workflowId = UUID.randomUUID().toString();

String taskId = UUID.randomUUID().toString();
TaskSummary ts = getMockTaskSummary(taskId);
indexDAO.indexTask(ts);

List<TaskExecLog> logs = new ArrayList<>();
logs.add(getMockTaskExecutionLog(taskId, new Date(1675845986000L).getTime(), "Log 1"));
logs.add(getMockTaskExecutionLog(taskId, new Date(1675845987000L).getTime(), "Log 2"));
indexDAO.addTaskExecutionLogs(logs);

List<Map<String, Object>> task_records =
queryDb("SELECT * FROM task_index WHERE task_id = '" + taskId + "'");
assertEquals("Task index record was not created", 1, task_records.size());

List<Map<String, Object>> log_records =
queryDb("SELECT * FROM task_execution_logs WHERE task_id = '" + taskId + "'");
assertEquals("Task execution logs were not created", 2, log_records.size());

indexDAO.removeTask(workflowId, taskId);

task_records = queryDb("SELECT * FROM task_index WHERE task_id = '" + taskId + "'");
assertEquals("Task index record was not deleted", 0, task_records.size());

log_records = queryDb("SELECT * FROM task_execution_logs WHERE task_id = '" + taskId + "'");
assertEquals("Task execution logs were not deleted", 0, log_records.size());
}
}

0 comments on commit 599cb7d

Please sign in to comment.