Skip to content

Conversation

@dpratt
Copy link

@dpratt dpratt commented Feb 27, 2020

  • Modify Scheduler to take responsibility for the lifecycle of
    the job ExecutorService only if it's not externally supplied.
  • Make the shutdown timeout be externally configurable by the user.
  • Additional logging for when the Scheduler does not shut down cleanly.
  • Add bits to the POM that allow for building under Java 9+
  • Have the tests use logback instead of slf4-simple for log output

This addresses #88

- Modify Scheduler to take responsibility for the lifecycle of
  the job ExecutorService only if it's not externally supplied.
- Make the shutdown timeout be externally configurable by the user.
- Additional logging for when the Scheduler does not shut down cleanly.
- Add bits to the POM that allow for building under Java 9+
- Have the tests use logback instead of slf4-simple for log output
@kagkarlsson
Copy link
Owner

Thanks for the contribution! Will try and have a look in the coming days :)

@kagkarlsson
Copy link
Owner

Bit of a busy week, haven't had the time to check this yet

Copy link
Owner

@kagkarlsson kagkarlsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contributing to db-scheduler! Looks good, but left a few comments :)

Comment on lines +1 to +15
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %highlight(%-5level) %cyan(%logger{50}) - %msg%n%rootException</pattern>
</encoder>
</appender>

<logger level="INFO" name="com.github.kagkarlsson"/>

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>

</configuration>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Was not aware of those options (highlight, cyan), better looking output 👍

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I see I have a bit of a whitespace issue in the project. I will fix that afterwards, in another PR, to make it more consistent.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the placement of this comment was probably better before I disabled whitespace changes... )

Comment on lines +89 to +92
// this.dueExecutor = Executors.newSingleThreadExecutor(defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-execute-due-"));
// this.detectDeadExecutor = Executors.newSingleThreadExecutor(defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-detect-dead-"));
// this.updateHeartbeatExecutor = Executors.newSingleThreadExecutor(defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-update-heartbeat-"));

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be removed

Comment on lines +154 to +174
if(managedExecutorService) {
LOG.info("Letting running executions finish. Will wait up to {}.", executorShutdownWait);
if (ExecutorUtils.shutdownAndAwaitTermination(executorService, executorShutdownWait)) {
LOG.info("Scheduler stopped.");
} else {
if(!currentlyProcessing.isEmpty()) {
String runningJobs = currentlyProcessing.keySet().stream().map(Execution::toString).collect(Collectors.joining("\n"));
LOG.warn("Scheduler did not shut down within the timeout. Interrupting the following jobs:\n{}", runningJobs);
} else {
LOG.warn("Scheduler did not shut down cleanly within the timeout. Terminating the executor.");
}
if(!ExecutorUtils.shutdownNowAndAwaitTermination(executorService, Duration.ofSeconds(5))) {
LOG.warn("Scheduler did not terminate within the timeout. The Scheduler pool may be orphaned and have live threads.");
}
if(!currentlyProcessing.isEmpty()) {
String staleJobs = currentlyProcessing.keySet().stream().map(Execution::toString).collect(Collectors.joining("\n"));
LOG.warn("Scheduler stopped, but the following jobs were orphaned:\n{}", staleJobs);
}
}
} else {
LOG.info("Not shutting down externally managed job executor.");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent is a bit messed up. I will probably do a reformat to 4 spaces after this PR is merged.

return this;
}

public SchedulerBuilder executorServiceShutdownTimeout(Duration shutdownTimeout) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name. Especially since it is not applicable for the builder-method executorService where you send in the externally managed one.

What about shutdownTimeout?

new LinkedBlockingQueue<>(),
defaultThreadFactoryWithPrefix(prefix)
);
executor.allowCoreThreadTimeOut(true);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I don't think that the pool will ever be scaled to less than 3 threads? (Since currently all util-threads run until shutdown with their own sleep)

Comment on lines +84 to +87
ThreadPoolExecutor housekeepingExecutor = newThreadPool(THREAD_PREFIX + "-housekeeper-", 3, THREAD_POOL_TIMEOUT);
this.dueExecutor = housekeepingExecutor;
this.detectDeadExecutor = housekeepingExecutor;
this.updateHeartbeatExecutor = housekeepingExecutor;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. The util-threads currently run "forever" (internal sleep). The only difference now will be that the thread-names will be a bit less describing. What do you feel is the benefit here?

Maybe they should be made into a ScheduledExecutorService, but I suspect that generates a bigger change..

Comment on lines +54 to 56
public static ThreadFactory defaultThreadFactoryWithPrefix(String prefix, boolean useDaemonThreads) {
return new PrefixingDefaultThreadFactory(prefix, useDaemonThreads);
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to find any references to this method (where daemon=true)? Was there meant to be?

@kagkarlsson
Copy link
Owner

Much of these changes have reached master via other PRs. Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants