Skip to content

Commit

Permalink
Revert "Implementation of Postgres PollDataDAO with read and write ca…
Browse files Browse the repository at this point in the history
…ching (conductor-oss#82)"

This reverts commit 7342f62.
  • Loading branch information
matiasbur committed May 3, 2024
1 parent 7342f62 commit 67fe156
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 702 deletions.
50 changes: 0 additions & 50 deletions docs/documentation/advanced/postgresql.md

This file was deleted.

1 change: 0 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ nav:
- documentation/advanced/azureblob-storage.md
- documentation/advanced/externalpayloadstorage.md
- documentation/advanced/redis.md
- documentation/advanced/postgresql.md
- Client SDKs:
- documentation/clientsdks/index.md
- documentation/clientsdks/java-sdk.md
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.postgres.dao.*;
import com.netflix.conductor.postgres.dao.PostgresExecutionDAO;
import com.netflix.conductor.postgres.dao.PostgresIndexDAO;
import com.netflix.conductor.postgres.dao.PostgresMetadataDAO;
import com.netflix.conductor.postgres.dao.PostgresQueueDAO;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.*;
Expand Down Expand Up @@ -82,15 +85,6 @@ public PostgresExecutionDAO postgresExecutionDAO(
return new PostgresExecutionDAO(retryTemplate, objectMapper, dataSource);
}

@Bean
@DependsOn({"flywayForPrimaryDb"})
public PostgresPollDataDAO postgresPollDataDAO(
@Qualifier("postgresRetryTemplate") RetryTemplate retryTemplate,
ObjectMapper objectMapper,
PostgresProperties properties) {
return new PostgresPollDataDAO(retryTemplate, objectMapper, dataSource, properties);
}

@Bean
@DependsOn({"flywayForPrimaryDb"})
public PostgresQueueDAO postgresQueueDAO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ public class PostgresProperties {

private Integer deadlockRetryMax = 3;

@DurationUnit(ChronoUnit.MILLIS)
private Duration pollDataFlushInterval = Duration.ofMillis(0);

@DurationUnit(ChronoUnit.MILLIS)
private Duration pollDataCacheValidityPeriod = Duration.ofMillis(0);

public String schema = "public";

public Duration getTaskDefCacheRefreshInterval() {
Expand All @@ -58,52 +52,4 @@ public String getSchema() {
public void setSchema(String schema) {
this.schema = schema;
}

public boolean getAllowFullTextQueries() {
return allowFullTextQueries;
}

public void setAllowFullTextQueries(boolean allowFullTextQueries) {
this.allowFullTextQueries = allowFullTextQueries;
}

public boolean getAllowJsonQueries() {
return allowJsonQueries;
}

public void setAllowJsonQueries(boolean allowJsonQueries) {
this.allowJsonQueries = allowJsonQueries;
}

public int getAsyncWorkerQueueSize() {
return asyncWorkerQueueSize;
}

public void setAsyncWorkerQueueSize(int asyncWorkerQueueSize) {
this.asyncWorkerQueueSize = asyncWorkerQueueSize;
}

public int getAsyncMaxPoolSize() {
return asyncMaxPoolSize;
}

public void setAsyncMaxPoolSize(int asyncMaxPoolSize) {
this.asyncMaxPoolSize = asyncMaxPoolSize;
}

public Duration getPollDataFlushInterval() {
return pollDataFlushInterval;
}

public void setPollDataFlushInterval(Duration interval) {
this.pollDataFlushInterval = interval;
}

public Duration getPollDataCacheValidityPeriod() {
return pollDataCacheValidityPeriod;
}

public void setPollDataCacheValidityPeriod(Duration period) {
this.pollDataCacheValidityPeriod = period;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.sql.Connection;
import java.sql.Date;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executors;
Expand All @@ -26,10 +27,12 @@
import org.springframework.retry.support.RetryTemplate;

import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.dao.ConcurrentExecutionLimitDAO;
import com.netflix.conductor.dao.ExecutionDAO;
import com.netflix.conductor.dao.PollDataDAO;
import com.netflix.conductor.dao.RateLimitingDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
Expand All @@ -44,7 +47,7 @@
import jakarta.annotation.*;

public class PostgresExecutionDAO extends PostgresBaseDAO
implements ExecutionDAO, RateLimitingDAO, ConcurrentExecutionLimitDAO {
implements ExecutionDAO, RateLimitingDAO, PollDataDAO, ConcurrentExecutionLimitDAO {

private final ScheduledExecutorService scheduledExecutorService;

Expand Down Expand Up @@ -555,6 +558,45 @@ public List<EventExecution> getEventExecutions(
}
}

@Override
public void updateLastPollData(String taskDefName, String domain, String workerId) {
Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null");
PollData pollData = new PollData(taskDefName, domain, workerId, System.currentTimeMillis());
String effectiveDomain = (domain == null) ? "DEFAULT" : domain;
withTransaction(tx -> insertOrUpdatePollData(tx, pollData, effectiveDomain));
}

@Override
public PollData getPollData(String taskDefName, String domain) {
Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null");
String effectiveDomain = (domain == null) ? "DEFAULT" : domain;
return getWithRetriedTransactions(tx -> readPollData(tx, taskDefName, effectiveDomain));
}

@Override
public List<PollData> getPollData(String taskDefName) {
Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null");
return readAllPollData(taskDefName);
}

@Override
public List<PollData> getAllPollData() {
try (Connection tx = dataSource.getConnection()) {
boolean previousAutoCommitMode = tx.getAutoCommit();
tx.setAutoCommit(true);
try {
String GET_ALL_POLL_DATA = "SELECT json_data FROM poll_data ORDER BY queue_name";
return query(tx, GET_ALL_POLL_DATA, q -> q.executeAndFetch(PollData.class));
} catch (Throwable th) {
throw new NonTransientException(th.getMessage(), th);
} finally {
tx.setAutoCommit(previousAutoCommitMode);
}
} catch (SQLException ex) {
throw new NonTransientException(ex.getMessage(), ex);
}
}

private List<TaskModel> getTasks(Connection connection, List<String> taskIds) {
if (taskIds.isEmpty()) {
return Lists.newArrayList();
Expand Down Expand Up @@ -985,6 +1027,56 @@ private EventExecution readEventExecution(
.executeAndFetchFirst(EventExecution.class));
}

private void insertOrUpdatePollData(Connection connection, PollData pollData, String domain) {
/*
* Most times the row will be updated so let's try the update first. This used to be an 'INSERT/ON CONFLICT do update' sql statement. The problem with that
* is that if we try the INSERT first, the sequence will be increased even if the ON CONFLICT happens. Since polling happens *a lot*, the sequence can increase
* dramatically even though it won't be used.
*/
String UPDATE_POLL_DATA =
"UPDATE poll_data SET json_data=?, modified_on=CURRENT_TIMESTAMP WHERE queue_name=? AND domain=?";
int rowsUpdated =
query(
connection,
UPDATE_POLL_DATA,
q ->
q.addJsonParameter(pollData)
.addParameter(pollData.getQueueName())
.addParameter(domain)
.executeUpdate());

if (rowsUpdated == 0) {
String INSERT_POLL_DATA =
"INSERT INTO poll_data (queue_name, domain, json_data, modified_on) VALUES (?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT (queue_name,domain) DO UPDATE SET json_data=excluded.json_data, modified_on=excluded.modified_on";
execute(
connection,
INSERT_POLL_DATA,
q ->
q.addParameter(pollData.getQueueName())
.addParameter(domain)
.addJsonParameter(pollData)
.executeUpdate());
}
}

private PollData readPollData(Connection connection, String queueName, String domain) {
String GET_POLL_DATA =
"SELECT json_data FROM poll_data WHERE queue_name = ? AND domain = ?";
return query(
connection,
GET_POLL_DATA,
q ->
q.addParameter(queueName)
.addParameter(domain)
.executeAndFetchFirst(PollData.class));
}

private List<PollData> readAllPollData(String queueName) {
String GET_ALL_POLL_DATA = "SELECT json_data FROM poll_data WHERE queue_name = ?";
return queryWithTransaction(
GET_ALL_POLL_DATA, q -> q.addParameter(queueName).executeAndFetch(PollData.class));
}

private List<String> findAllTasksInProgressInOrderOfArrival(TaskModel task, int limit) {
String GET_IN_PROGRESS_TASKS_WITH_LIMIT =
"SELECT task_id FROM task_in_progress WHERE task_def_name = ? ORDER BY created_on LIMIT ?";
Expand Down
Loading

0 comments on commit 67fe156

Please sign in to comment.