Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce FORK/JOIN latency #352

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.springframework.util.unit.DataSize;
import org.springframework.util.unit.DataUnit;

import com.netflix.conductor.model.TaskModel;

@ConfigurationProperties("conductor.app")
public class ConductorProperties {

Expand Down Expand Up @@ -226,6 +228,25 @@ public class ConductorProperties {
/** Used to limit the size of task execution logs. */
private int taskExecLogSizeLimit = 10;

/**
* This threshold defines the default number of executions after which SystemTasks implementing
* getEvaluationOffset should begin postponing execution.
*
* @see
* com.netflix.conductor.core.execution.tasks.WorkflowSystemTask#getEvaluationOffset(TaskModel,
* long)
* @see com.netflix.conductor.core.execution.tasks.Join#getEvaluationOffset(TaskModel, long)
*/
private int systemTaskPostponeThreshold =
200; // 10 seconds based on default systemTaskWorkerPollInterval of 50ms

/**
* Timeout in milliseconds used by {@link
* com.netflix.conductor.core.execution.tasks.SystemTaskWorker} when polling, i.e.: call to
* {@link com.netflix.conductor.dao.QueueDAO#pop(String, int, int)}.
*/
private int systemTaskQueuePopTimeout = 200;

public String getStack() {
return stack;
}
Expand Down Expand Up @@ -567,4 +588,20 @@ public Map<String, Object> getAll() {
props.forEach((key, value) -> map.put(key.toString(), value));
return map;
}

public void setSystemTaskPostponeThreshold(int systemTaskPostponeThreshold) {
this.systemTaskPostponeThreshold = systemTaskPostponeThreshold;
}

public int getSystemTaskPostponeThreshold() {
return systemTaskPostponeThreshold;
}

public int getSystemTaskQueuePopTimeout() {
return systemTaskQueuePopTimeout;
}

public void setSystemTaskQueuePopTimeout(int systemTaskQueuePopTimeout) {
this.systemTaskQueuePopTimeout = systemTaskQueuePopTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import org.springframework.stereotype.Component;

import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
Expand All @@ -29,8 +31,13 @@
@Component(TASK_TYPE_JOIN)
public class Join extends WorkflowSystemTask {

public Join() {
@VisibleForTesting static final double EVALUATION_OFFSET_BASE = 1.2;

private final ConductorProperties properties;

public Join(ConductorProperties properties) {
super(TASK_TYPE_JOIN);
this.properties = properties;
}

@Override
Expand Down Expand Up @@ -117,12 +124,17 @@ public boolean execute(
}

@Override
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long maxOffset) {
int pollCount = taskModel.getPollCount();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the delay of the JOIN task when all forked tasks complete it will not be postponed until pollCount is above a given configurable threshold.

Once pollCount is greater than that threshold, the delay will increase until reaching maxOffset.

// Assuming pollInterval = 50ms and evaluationOffsetThreshold = 200 this will cause
// a JOIN task to be evaluated continuously during the first 10 seconds and the FORK/JOIN
// will end with minimal delay.
if (pollCount <= properties.getSystemTaskPostponeThreshold()) {
return Optional.of(0L);
}
return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset));

double exp = pollCount - properties.getSystemTaskPostponeThreshold();
return Optional.of(Math.min((long) Math.pow(EVALUATION_OFFSET_BASE, exp), maxOffset));
}

public boolean isAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) {

LOGGER.debug("Polling queue: {} with {} slots acquired", queueName, messagesToAcquire);

List<String> polledTaskIds = queueDAO.pop(queueName, messagesToAcquire, 200);
List<String> polledTaskIds =
queueDAO.pop(
queueName,
messagesToAcquire,
properties.getSystemTaskQueuePopTimeout());

Monitors.recordTaskPoll(queueName);
LOGGER.debug("Polling queue:{}, got {} tasks", queueName, polledTaskIds.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,19 @@ public boolean execute(
*/
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {}

public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
/**
* Determines the time in seconds by which the next execution of a task will be postponed after
* an execution. By default, this method returns {@code Optional.empty()}.
*
* <p>WorkflowSystemTasks may override this method to define a custom evaluation offset based on
* the task's behavior or requirements.
*
* @param taskModel task model
* @param maxOffset the max recommended offset value to use
* @return an {@code Optional<Long>} specifying the evaluation offset in seconds, or {@code
* Optional.empty()} if no postponement is required
*/
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long maxOffset) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public Switch switchTask() {

@Bean(TASK_TYPE_JOIN)
public Join join() {
return new Join();
return new Join(new ConductorProperties());
}

@Bean
Expand Down Expand Up @@ -595,7 +595,8 @@ public void testOptionalWithDynamicFork() {

assertEquals(TaskModel.Status.SCHEDULED, outcome.tasksToBeScheduled.get(0).getStatus());
System.out.println(outcome.tasksToBeScheduled.get(0));
new Join().execute(workflow, outcome.tasksToBeScheduled.get(0), null);
new Join(new ConductorProperties())
.execute(workflow, outcome.tasksToBeScheduled.get(0), null);
assertEquals(TaskModel.Status.COMPLETED, outcome.tasksToBeScheduled.get(0).getStatus());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Test;

import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
Expand All @@ -27,6 +28,9 @@
import static org.mockito.Mockito.mock;

public class TestJoin {

private final ConductorProperties properties = new ConductorProperties();

private final WorkflowExecutor executor = mock(WorkflowExecutor.class);

private TaskModel createTask(
Expand Down Expand Up @@ -65,7 +69,7 @@ public void testShouldNotMarkJoinAsCompletedWithErrorsWhenNotDone() {
// task2 is not scheduled yet, so the join is not completed
var wfJoinPair = createJoinWorkflow(List.of(task1), "task2");

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse(result);
}
Expand All @@ -77,7 +81,7 @@ public void testJoinCompletesSuccessfullyWhenAllTasksSucceed() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should execute successfully when all tasks succeed", result);
assertEquals(
Expand All @@ -93,7 +97,7 @@ public void testJoinWaitsWhenAnyTaskIsNotTerminal() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse("Join task should wait when any task is not in terminal state", result);
}
Expand All @@ -107,7 +111,7 @@ public void testJoinFailsWhenMandatoryTaskFails() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when a mandatory task fails", result);
assertEquals(
Expand All @@ -125,7 +129,7 @@ public void testJoinCompletesWithErrorsWhenOnlyOptionalTasksFail() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when only optional tasks fail", result);
assertEquals(
Expand All @@ -143,7 +147,7 @@ public void testJoinAggregatesFailureReasonsCorrectly() {

var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

var join = new Join();
var join = new Join(properties);
var result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertTrue("Join task should be executed when tasks fail", result);
assertEquals(
Expand Down Expand Up @@ -174,7 +178,7 @@ public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() {
var wfJoinPair = createJoinWorkflow(List.of(task1, task2));

// First execution: Task 2 is not yet terminal.
var join = new Join();
var join = new Join(properties);
boolean result = join.execute(wfJoinPair.getLeft(), wfJoinPair.getRight(), executor);
assertFalse("Join task should wait as not all tasks are terminal", result);

Expand All @@ -189,4 +193,33 @@ public void testJoinWaitsForAllTasksBeforeFailingDueToPermissiveTaskFailure() {
TaskModel.Status.FAILED,
wfJoinPair.getRight().getStatus());
}

@Test
public void testEvaluationOffsetWhenPollCountIsBelowThreshold() {
var join = new Join(properties);
var taskModel = createTask("join1", TaskModel.Status.COMPLETED, false, false);
taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() - 1);
var opt = join.getEvaluationOffset(taskModel, 30L);
assertEquals(0L, (long) opt.orElseThrow());
}

@Test
public void testEvaluationOffsetWhenPollCountIsAboveThreshold() {
final var maxOffset = 30L;
var join = new Join(properties);
var taskModel = createTask("join1", TaskModel.Status.COMPLETED, false, false);

taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 1);
var opt = join.getEvaluationOffset(taskModel, maxOffset);
assertEquals(1L, (long) opt.orElseThrow());

taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 10);
opt = join.getEvaluationOffset(taskModel, maxOffset);
long expected = (long) Math.pow(Join.EVALUATION_OFFSET_BASE, 10);
assertEquals(expected, (long) opt.orElseThrow());

taskModel.setPollCount(properties.getSystemTaskPostponeThreshold() + 40);
opt = join.getEvaluationOffset(taskModel, maxOffset);
assertEquals(maxOffset, (long) opt.orElseThrow());
}
}
Loading