Skip to content

Commit c4fc9c6

Browse files
committed
Add option to only index workflows on Postgres when their status changes
1 parent 4008a07 commit c4fc9c6

File tree

3 files changed

+228
-22
lines changed

3 files changed

+228
-22
lines changed

postgres-persistence/src/main/java/com/netflix/conductor/postgres/config/PostgresProperties.java

+10
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public class PostgresProperties {
2727

2828
private Integer deadlockRetryMax = 3;
2929

30+
private boolean onlyIndexOnStatusChange = false;
31+
3032
public String schema = "public";
3133

3234
public boolean allowFullTextQueries = true;
@@ -47,6 +49,14 @@ public void setTaskDefCacheRefreshInterval(Duration taskDefCacheRefreshInterval)
4749
this.taskDefCacheRefreshInterval = taskDefCacheRefreshInterval;
4850
}
4951

52+
public boolean getOnlyIndexOnStatusChange() {
53+
return onlyIndexOnStatusChange;
54+
}
55+
56+
public void setOnlyIndexOnStatusChange(boolean onlyIndexOnStatusChange) {
57+
this.onlyIndexOnStatusChange = onlyIndexOnStatusChange;
58+
}
59+
5060
public Integer getDeadlockRetryMax() {
5161
return deadlockRetryMax;
5262
}

postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java

+37-22
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,16 @@ public class PostgresIndexDAO extends PostgresBaseDAO implements IndexDAO {
4848
private static final int CORE_POOL_SIZE = 6;
4949
private static final long KEEP_ALIVE_TIME = 1L;
5050

51+
private boolean onlyIndexOnStatusChange;
52+
5153
public PostgresIndexDAO(
5254
RetryTemplate retryTemplate,
5355
ObjectMapper objectMapper,
5456
DataSource dataSource,
5557
PostgresProperties properties) {
5658
super(retryTemplate, objectMapper, dataSource);
5759
this.properties = properties;
60+
this.onlyIndexOnStatusChange = properties.getOnlyIndexOnStatusChange();
5861

5962
int maximumPoolSize = properties.getAsyncMaxPoolSize();
6063
int workerQueueSize = properties.getAsyncWorkerQueueSize();
@@ -84,19 +87,25 @@ public void indexWorkflow(WorkflowSummary workflow) {
8487
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
8588
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";
8689

90+
if (onlyIndexOnStatusChange) {
91+
INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status";
92+
}
93+
8794
TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
8895
Timestamp startTime = Timestamp.from(Instant.from(ta));
8996

90-
queryWithTransaction(
91-
INSERT_WORKFLOW_INDEX_SQL,
92-
q ->
93-
q.addParameter(workflow.getWorkflowId())
94-
.addParameter(workflow.getCorrelationId())
95-
.addParameter(workflow.getWorkflowType())
96-
.addParameter(startTime)
97-
.addParameter(workflow.getStatus().toString())
98-
.addJsonParameter(workflow)
99-
.executeUpdate());
97+
int rowsUpdated =
98+
queryWithTransaction(
99+
INSERT_WORKFLOW_INDEX_SQL,
100+
q ->
101+
q.addParameter(workflow.getWorkflowId())
102+
.addParameter(workflow.getCorrelationId())
103+
.addParameter(workflow.getWorkflowType())
104+
.addParameter(startTime)
105+
.addParameter(workflow.getStatus().toString())
106+
.addJsonParameter(workflow)
107+
.executeUpdate());
108+
logger.debug("Postgres index workflow rows updated: {}", rowsUpdated);
100109
}
101110

102111
@Override
@@ -128,24 +137,30 @@ public void indexTask(TaskSummary task) {
128137
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
129138
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";
130139

140+
if (onlyIndexOnStatusChange) {
141+
INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status";
142+
}
143+
131144
TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
132145
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));
133146

134147
TemporalAccessor startTa = DateTimeFormatter.ISO_INSTANT.parse(task.getStartTime());
135148
Timestamp startTime = Timestamp.from(Instant.from(startTa));
136149

137-
queryWithTransaction(
138-
INSERT_TASK_INDEX_SQL,
139-
q ->
140-
q.addParameter(task.getTaskId())
141-
.addParameter(task.getTaskType())
142-
.addParameter(task.getTaskDefName())
143-
.addParameter(task.getStatus().toString())
144-
.addParameter(startTime)
145-
.addParameter(updateTime)
146-
.addParameter(task.getWorkflowType())
147-
.addJsonParameter(task)
148-
.executeUpdate());
150+
int rowsUpdated =
151+
queryWithTransaction(
152+
INSERT_TASK_INDEX_SQL,
153+
q ->
154+
q.addParameter(task.getTaskId())
155+
.addParameter(task.getTaskType())
156+
.addParameter(task.getTaskDefName())
157+
.addParameter(task.getStatus().toString())
158+
.addParameter(startTime)
159+
.addParameter(updateTime)
160+
.addParameter(task.getWorkflowType())
161+
.addJsonParameter(task)
162+
.executeUpdate());
163+
logger.debug("Postgres index task rows updated: {}", rowsUpdated);
149164
}
150165

151166
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright 2023 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.postgres.dao;
14+
15+
import java.sql.Connection;
16+
import java.sql.SQLException;
17+
import java.util.*;
18+
19+
import javax.sql.DataSource;
20+
21+
import org.flywaydb.core.Flyway;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.beans.factory.annotation.Qualifier;
27+
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
28+
import org.springframework.boot.test.context.SpringBootTest;
29+
import org.springframework.test.context.ContextConfiguration;
30+
import org.springframework.test.context.TestPropertySource;
31+
import org.springframework.test.context.junit4.SpringRunner;
32+
33+
import com.netflix.conductor.common.config.TestObjectMapperConfiguration;
34+
import com.netflix.conductor.common.metadata.tasks.Task;
35+
import com.netflix.conductor.common.run.TaskSummary;
36+
import com.netflix.conductor.common.run.Workflow;
37+
import com.netflix.conductor.common.run.WorkflowSummary;
38+
import com.netflix.conductor.postgres.config.PostgresConfiguration;
39+
import com.netflix.conductor.postgres.util.Query;
40+
41+
import com.fasterxml.jackson.databind.ObjectMapper;
42+
43+
import static org.junit.Assert.assertEquals;
44+
45+
@ContextConfiguration(
46+
classes = {
47+
TestObjectMapperConfiguration.class,
48+
PostgresConfiguration.class,
49+
FlywayAutoConfiguration.class
50+
})
51+
@RunWith(SpringRunner.class)
52+
@TestPropertySource(
53+
properties = {
54+
"conductor.app.asyncIndexingEnabled=false",
55+
"conductor.elasticsearch.version=0",
56+
"conductor.indexing.type=postgres",
57+
"conductor.postgres.onlyIndexOnStatusChange=true",
58+
"spring.flyway.clean-disabled=false"
59+
})
60+
@SpringBootTest
61+
public class PostgresIndexDAOStatusChangeOnlyTest {
62+
63+
@Autowired private PostgresIndexDAO indexDAO;
64+
65+
@Autowired private ObjectMapper objectMapper;
66+
67+
@Qualifier("dataSource")
68+
@Autowired
69+
private DataSource dataSource;
70+
71+
@Autowired Flyway flyway;
72+
73+
// clean the database between tests.
74+
@Before
75+
public void before() {
76+
flyway.migrate();
77+
}
78+
79+
private WorkflowSummary getMockWorkflowSummary(String id) {
80+
WorkflowSummary wfs = new WorkflowSummary();
81+
wfs.setWorkflowId(id);
82+
wfs.setCorrelationId("correlation-id");
83+
wfs.setWorkflowType("workflow-type");
84+
wfs.setStartTime("2023-02-07T08:42:45Z");
85+
wfs.setStatus(Workflow.WorkflowStatus.RUNNING);
86+
return wfs;
87+
}
88+
89+
private TaskSummary getMockTaskSummary(String taskId) {
90+
TaskSummary ts = new TaskSummary();
91+
ts.setTaskId(taskId);
92+
ts.setTaskType("task-type");
93+
ts.setTaskDefName("task-def-name");
94+
ts.setStatus(Task.Status.SCHEDULED);
95+
ts.setStartTime("2023-02-07T09:41:45Z");
96+
ts.setUpdateTime("2023-02-07T09:42:45Z");
97+
ts.setWorkflowType("workflow-type");
98+
return ts;
99+
}
100+
101+
private List<Map<String, Object>> queryDb(String query) throws SQLException {
102+
try (Connection c = dataSource.getConnection()) {
103+
try (Query q = new Query(objectMapper, c, query)) {
104+
return q.executeAndFetchMap();
105+
}
106+
}
107+
}
108+
109+
public void checkWorkflow(String workflowId, String status, String correlationId)
110+
throws SQLException {
111+
List<Map<String, Object>> result =
112+
queryDb(
113+
String.format(
114+
"SELECT * FROM workflow_index WHERE workflow_id = '%s'",
115+
workflowId));
116+
assertEquals("Wrong number of rows returned", 1, result.size());
117+
assertEquals("Wrong status returned", status, result.get(0).get("status"));
118+
assertEquals(
119+
"Correlation id does not match",
120+
correlationId,
121+
result.get(0).get("correlation_id"));
122+
}
123+
124+
public void checkTask(String taskId, String status, String updateTime) throws SQLException {
125+
List<Map<String, Object>> result =
126+
queryDb(String.format("SELECT * FROM task_index WHERE task_id = '%s'", taskId));
127+
assertEquals("Wrong number of rows returned", 1, result.size());
128+
assertEquals("Wrong status returned", status, result.get(0).get("status"));
129+
assertEquals(
130+
"Update time does not match",
131+
updateTime,
132+
result.get(0).get("update_time").toString());
133+
}
134+
135+
@Test
136+
public void testIndexWorkflowOnlyStatusChange() throws SQLException {
137+
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
138+
indexDAO.indexWorkflow(wfs);
139+
140+
// retrieve the record, make sure it exists
141+
checkWorkflow("workflow-id", "RUNNING", "correlation-id");
142+
143+
// Change the record, but not the status, and re-index
144+
wfs.setCorrelationId("new-correlation-id");
145+
indexDAO.indexWorkflow(wfs);
146+
147+
// retrieve the record, make sure it hasn't changed
148+
checkWorkflow("workflow-id", "RUNNING", "correlation-id");
149+
150+
// Change the status and re-index
151+
wfs.setStatus(Workflow.WorkflowStatus.FAILED);
152+
indexDAO.indexWorkflow(wfs);
153+
154+
// retrieve the record, make sure it has changed
155+
checkWorkflow("workflow-id", "FAILED", "new-correlation-id");
156+
}
157+
158+
@Test
159+
public void testIndexTaskOnlyStatusChange() throws SQLException {
160+
TaskSummary ts = getMockTaskSummary("task-id");
161+
162+
indexDAO.indexTask(ts);
163+
164+
// retrieve the record, make sure it exists
165+
checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0");
166+
167+
// Change the record, but not the status
168+
ts.setUpdateTime("2023-02-07T10:42:45Z");
169+
indexDAO.indexTask(ts);
170+
171+
// retrieve the record, make sure it hasn't changed
172+
checkTask("task-id", "SCHEDULED", "2023-02-07 09:42:45.0");
173+
174+
// Change the status and re-index
175+
ts.setStatus(Task.Status.FAILED);
176+
indexDAO.indexTask(ts);
177+
178+
// retrieve the record, make sure it has changed
179+
checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0");
180+
}
181+
}

0 commit comments

Comments
 (0)