Skip to content

Commit

Permalink
scheduler: Debounce commit events
Browse files Browse the repository at this point in the history
When loading a state that contained large numbers of nodes and tasks,
but no ready nodes that could accept the tasks, swarmd used large
amounts of CPU repeatedly trying to schedule the full set of tasks. The
allocator caused many commits on startup (see moby#1286), and this produced
a large backlog of commit events, each one of which caused a full
scheduling pass.

To avoid this pathological behavior, debounce the commit events
similarly to how the dispatcher's Tasks loop debounces events. When a
commit event is received, that starts a 50 ms countdown to wait for
another commit event before running the scheduling pass. If commit
events keep being received and resetting this timer, the scheduler will
run the scheduling pass anyway after a second.

Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Aug 1, 2016
1 parent 4e27e1b commit 5d5f5c4
Showing 1 changed file with 41 additions and 7 deletions.
48 changes: 41 additions & 7 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,32 @@ func (s *Scheduler) Run(ctx context.Context) error {
// Queue all unassigned tasks before processing changes.
s.tick(ctx)

const (
// commitDebounceGap is the amount of time to wait between
// commit events to debounce them.
commitDebounceGap = 50 * time.Millisecond
// maxLatency is a time limit on the debouncing.
maxLatency = time.Second
)
var (
debouncingStarted time.Time
commitDebounceTimer *time.Timer
commitDebounceTimeout <-chan time.Time
)

pendingChanges := 0

schedule := func() {
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
}
if pendingChanges > 0 {
before := time.Now()
s.tick(ctx)
pendingChanges = 0
}
}

// Watch for changes.
for {
select {
Expand All @@ -131,15 +155,25 @@ func (s *Scheduler) Run(ctx context.Context) error {
case state.EventDeleteNode:
s.nodeHeap.remove(v.Node.ID)
case state.EventCommit:
if len(s.preassignedTasks) > 0 {
s.processPreassignedTasks(ctx)
}
if pendingChanges > 0 {
s.tick(ctx)
pendingChanges = 0
if commitDebounceTimer != nil {
if time.Since(debouncingStarted) > maxLatency {
commitDebounceTimer.Stop()
commitDebounceTimer = nil
commitDebounceTimeout = nil
schedule()
} else {
commitDebounceTimer.Reset(commitDebounceGap)
}
} else {
commitDebounceTimer = time.NewTimer(commitDebounceGap)
commitDebounceTimeout = commitDebounceTimer.C
debouncingStarted = time.Now()
}
}

case <-commitDebounceTimeout:
schedule()
commitDebounceTimer = nil
commitDebounceTimeout = nil
case <-s.stopChan:
return nil
}
Expand Down

0 comments on commit 5d5f5c4

Please sign in to comment.