Skip to content

Commit

Permalink
Fix latency issues with FORK/JOIN. Work In Progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmigueprieto committed Jan 4, 2025
1 parent 5ef888a commit 8c75eda
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,13 @@ public boolean execute(
@Override
public Optional<Long> getEvaluationOffset(TaskModel taskModel, long defaultOffset) {
int index = taskModel.getPollCount() > 0 ? taskModel.getPollCount() - 1 : 0;
if (index == 0) {
// Assuming SystemTaskWorker 50ms pollInterval this will cause the join to be evaluated
// continuously during the first 5 seconds and the FORK/JOIN will end without any delay.
if (index <= 100) {
return Optional.of(0L);
}
return Optional.of(Math.min((long) Math.pow(2, index), defaultOffset));
// Changed the base to 1.2 to reduce the steepness of the exponential growth.
return Optional.of(Math.min((long) Math.pow(1.2, index - 100), defaultOffset));
}

public boolean isAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) {

LOGGER.debug("Polling queue: {} with {} slots acquired", queueName, messagesToAcquire);

List<String> polledTaskIds = queueDAO.pop(queueName, messagesToAcquire, 200);
// TODO remove hardcoded values
List<String> polledTaskIds =
queueDAO.pop(
queueName, messagesToAcquire, 0); // 0 requires changes in orkes-queues

Monitors.recordTaskPoll(queueName);
LOGGER.debug("Polling queue:{}, got {} tasks", queueName, polledTaskIds.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public ResponseEntity<List<Task>> batchPoll(
@RequestParam(value = "workerid", required = false) String workerId,
@RequestParam(value = "domain", required = false) String domain,
@RequestParam(value = "count", defaultValue = "1") int count,
@RequestParam(value = "timeout", defaultValue = "100") int timeout) {
@RequestParam(value = "timeout", defaultValue = "0") int timeout) {
// for backwards compatibility with 2.x client which expects a 204 when no Task is found
return Optional.ofNullable(
taskService.batchPoll(taskType, workerId, domain, count, timeout))
// "long polling" is not working as expected
return Optional.ofNullable(taskService.batchPoll(taskType, workerId, domain, count, 0))
.map(ResponseEntity::ok)
.orElse(ResponseEntity.noContent().build());
}
Expand Down

0 comments on commit 8c75eda

Please sign in to comment.