-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Improve Watcher test framework resiliency #40658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
0cc878b
7a01c22
c81c129
908430f
26a6b5e
49f66e9
d7374de
3b89d6e
44fce8b
a47e774
8091110
5ea0551
723df70
77899f6
d5dde77
0fe11c4
4756aee
b3f0025
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,8 +21,10 @@ | |
| import java.time.ZonedDateTime; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| /** | ||
| * A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger | ||
|
|
@@ -31,7 +33,8 @@ | |
| public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { | ||
| private static final Logger logger = LogManager.getLogger(ScheduleTriggerEngineMock.class); | ||
|
|
||
| private final ConcurrentMap<String, Watch> watches = new ConcurrentHashMap<>(); | ||
| private final AtomicReference<Map<String, Watch>> watches = new AtomicReference<>(new ConcurrentHashMap<>()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious why using an AtomicReference here ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvmd.. I see now that you are swapping out the instances below. New question, why atomic swaps vs. shared lock (for this test code) ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An earlier version of this used a lock and it made the tests very slow (this version takes about ~2-3m to run the whole suite, I killed the version with locks at ~10m), I'm not entirely sure why. |
||
| private final AtomicBoolean paused = new AtomicBoolean(false); | ||
|
|
||
| public ScheduleTriggerEngineMock(ScheduleRegistry scheduleRegistry, Clock clock) { | ||
| super(scheduleRegistry, clock); | ||
|
|
@@ -50,37 +53,42 @@ public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String wat | |
|
|
||
| @Override | ||
| public void start(Collection<Watch> jobs) { | ||
|
||
| jobs.forEach(this::add); | ||
| Map<String, Watch> newWatches = new ConcurrentHashMap<>(); | ||
| jobs.forEach((watch) -> newWatches.put(watch.id(), watch)); | ||
| watches.set(newWatches); | ||
| paused.set(false); | ||
| } | ||
|
|
||
| @Override | ||
| public void stop() { | ||
| watches.clear(); | ||
| watches.set(new ConcurrentHashMap<>()); | ||
| } | ||
|
|
||
| @Override | ||
| public void add(Watch watch) { | ||
| logger.debug("adding watch [{}]", watch.id()); | ||
| watches.put(watch.id(), watch); | ||
| watches.get().put(watch.id(), watch); | ||
| } | ||
|
|
||
| @Override | ||
| public void pauseExecution() { | ||
| // No action is needed because this engine does not trigger watches on a schedule (instead | ||
| // they must be triggered manually). | ||
| paused.set(true); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean remove(String jobId) { | ||
| return watches.remove(jobId) != null; | ||
| return watches.get().remove(jobId) != null; | ||
| } | ||
|
|
||
| public boolean trigger(String jobName) { | ||
| return trigger(jobName, 1, null); | ||
| } | ||
|
|
||
| public boolean trigger(String jobName, int times, TimeValue interval) { | ||
| if (watches.containsKey(jobName) == false) { | ||
| if (paused.get()) { | ||
| return false; | ||
| } | ||
| if (watches.get().containsKey(jobName) == false) { | ||
| return false; | ||
| } | ||
|
|
||
|
|
@@ -89,7 +97,7 @@ public boolean trigger(String jobName, int times, TimeValue interval) { | |
| logger.debug("firing watch [{}] at [{}]", jobName, now); | ||
| ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now); | ||
| consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event))); | ||
| if (interval != null) { | ||
| if (interval != null) { | ||
| if (clock instanceof ClockMock) { | ||
| ((ClockMock) clock).fastForward(interval); | ||
| } else { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.