Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public synchronized void start()
}
});

// when query is done or any time a stage completes, attempt to transition query to "final query info ready"
// when query is done, attempt to transition query to "final query info ready"
queryStateMachine.addStateChangeListener(state -> {
if (!state.isDone()) {
return;
Expand Down Expand Up @@ -629,6 +629,8 @@ public void run()

Optional<Throwable> failure = Optional.empty();
try {
// schedule() is the main logic, but expensive, so we do not want to call it after every event.
// Process events for some time (measured by schedulingDelayer) before invoking schedule() next time.
Copy link
Copy Markdown
Member

@losipiuk losipiuk Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we are processing all the events from queue (up to EVENT_BUFFER_CAPACITY) - and then calling schedule.
scheduleDelayer is only to pause calling schedule in case we observed failures which we believe may impact newly scheduled tasks too.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, processEvents() processes all existing events. Not even limited by EVENT_BUFFER_CAPACITY (but it does that in EVENT_BUFFER_CAPACITY portions, repeatedly)
but then, provided that getRemainingDelayInMillis() > 0, we call `processEvents()' waiting for new events (for 1m)

if (schedule()) {
while (processEvents()) {
if (schedulingDelayer.getRemainingDelayInMillis() > 0) {
Expand Down Expand Up @@ -672,6 +674,9 @@ private Optional<Throwable> closeAndAddSuppressed(Optional<Throwable> existingFa
return existingFailure;
}

/**
* @return whether processing should continue
*/
private boolean processEvents()
{
try {
Expand Down Expand Up @@ -705,6 +710,9 @@ private boolean processEvents()
}
}

/**
* @return whether processing should continue
*/
private boolean schedule()
{
if (checkComplete()) {
Expand Down Expand Up @@ -1452,6 +1460,7 @@ else if (taskState == TaskState.FAILED) {
List<PrioritizedScheduledTask> replacementTasks = stageExecution.taskFailed(taskId, failureInfo, taskStatus);
replacementTasks.forEach(schedulingQueue::addOrUpdate);

// When tasks fail for some intermittent reason, delay scheduling retries
if (shouldDelayScheduling(failureInfo.getErrorCode())) {
schedulingDelayer.startOrProlongDelayIfNecessary();
scheduledExecutorService.schedule(() -> eventQueue.add(Event.WAKE_UP), schedulingDelayer.getRemainingDelayInMillis(), MILLISECONDS);
Expand Down