Skip to content

Commit

Permalink
add support for DeepFreeze
Browse files Browse the repository at this point in the history
In most testing scenarios,
it's sufficient to freeze time at a fixed point without stopping Tickers,
as doing so could disrupt background goroutines that rely on them.

However, there are cases where we need to completely freeze time,
including all background tasks.
  • Loading branch information
adamluzsi committed Aug 4, 2024
1 parent 06fa756 commit eff987b
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 62 deletions.
143 changes: 125 additions & 18 deletions clock/Clock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clock

import (
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -28,21 +29,38 @@ func After(d time.Duration) <-chan struct{} {
return ch
}
go func() {
timeTravel := make(chan struct{})
timeTravel := make(chan internal.TimeTravelEvent)
defer internal.Notify(timeTravel)()
defer close(ch)
var handleTimeTravel func(tt internal.TimeTravelEvent) bool
handleTimeTravel = func(tt internal.TimeTravelEvent) bool {
deadline := startedAt.Add(d)
if tt.When.After(deadline) || tt.When.Equal(deadline) {
return true
}
if tt.Deep && tt.Freeze {
// wait for next time travel, since during deep freeze, the flow of time is frozen
return handleTimeTravel(<-timeTravel)
}
return false
}
if tt, ok := internal.Check(); ok && tt.Deep && tt.Freeze {
if handleTimeTravel(tt) {
return
}
}
var onWait = func() (_restart bool) {
c, td := after(internal.RemainingDuration(startedAt, d))
defer td()
select {
case tt := <-timeTravel:
return !handleTimeTravel(tt)
case <-c:
return false
case <-timeTravel:
return true
}
}
for onWait() {
}
close(ch)
}()
return ch
}
Expand All @@ -59,7 +77,7 @@ type Ticker struct {
d time.Duration

onInit sync.Once
lock sync.RWMutex
lock sync.RWMutex // is lock really needed if only the background goroutine reads the values from it?
done chan struct{}
pulse chan struct{}
ticker *time.Ticker
Expand All @@ -74,33 +92,118 @@ func (t *Ticker) init() {
t.ticker = time.NewTicker(t.getScaledDuration())
t.updateLastTickedAt()
go func() {
timeTravel := make(chan struct{})
timeTravel := make(chan internal.TimeTravelEvent)
defer internal.Notify(timeTravel)()

if tt, ok := internal.Check(); ok { // trigger initial time travel awareness
if !t.handleTimeTravel(timeTravel, tt) {
return
}
}

for {
if !t.ticking(timeTravel, t.ticker.C) {
if !t.ticking(timeTravel, t.ticker.C, tickingOption{}) {
break
}
}
}()
})
}

func (t *Ticker) ticking(timeTravel <-chan struct{}, tick <-chan time.Time) bool {
type tickingOption struct {
// OnEvent will be executed when an event is received during waiting for ticking
OnEvent func()
}

func (h tickingOption) onEvent() {
if h.OnEvent == nil {
return
}
h.OnEvent()
}

func (t *Ticker) ticking(timeTravel <-chan internal.TimeTravelEvent, tick <-chan time.Time, o tickingOption) bool {
select {
case <-t.done:
o.onEvent()
return false

case <-timeTravel: // on time travel, we reset the ticker according to the new time
defer t.resetTicker()
c, td := after(internal.RemainingDuration(t.getLastTickedAt(), t.getRealDuration()))
defer td()
return t.ticking(timeTravel, c) // wait the remaining time from the current tick
case tt := <-timeTravel: // on time travel, we reset the ticker according to the new time
o.onEvent()
return t.handleTimeTravel(timeTravel, tt)

case <-tick: // on timeout, we notify the listener
now := t.updateLastTickedAt()
t.C <- now
case <-tick: // on time.Ticker tick, we also tick
o.onEvent()
select {
case tt := <-timeTravel:
return t.handleTimeTravel(timeTravel, tt)
case t.C <- t.updateLastTickedAt():
}
return true

}
}

func (t *Ticker) handleTimeTravel(timeTravel <-chan internal.TimeTravelEvent, tt internal.TimeTravelEvent) bool {
var (
opt = tickingOption{}
prev = tt.Prev
when = tt.When
)
if lastTickedAt := t.getLastTickedAt(); lastTickedAt.Before(prev) {
prev = lastTickedAt
}
if fn := t.fastForwardTicksTo(prev, when); fn != nil {
opt.OnEvent = fn
}
if tt.Deep && tt.Freeze {
return t.ticking(timeTravel, nil, opt) // wait for unfreeze
}
defer t.resetTicker()
c, td := after(internal.RemainingDuration(t.getLastTickedAt(), t.getRealDuration()))
defer td()
return t.ticking(timeTravel, c, opt) // wait the remaining time from the current tick
}

func (t *Ticker) fastForwardTicksTo(from, till time.Time) func() {
var travelledDuration = till.Sub(from)

if travelledDuration <= 0 {
return nil
}

var (
doneBeforeNextEvent = make(chan struct{})
fastforwardWG = &sync.WaitGroup{}
timeBetweenTicks = t.getRealDuration()
missingTicks = int(travelledDuration / timeBetweenTicks)
)
var OnBeforeEvent = func() {
close(doneBeforeNextEvent)
fastforwardWG.Wait()
}

// fast forward last ticked at position to the time after the ticks
t.updateLastTickedAtTo(from.Add(timeBetweenTicks * time.Duration(missingTicks)))

fastforwardWG.Add(1)
go func(tickedAt time.Time) {
defer fastforwardWG.Done()

fastForward:
for i := 0; i < missingTicks; i++ {
tickedAt = tickedAt.Add(timeBetweenTicks) // move to the next tick time
select {
case <-doneBeforeNextEvent:
break fastForward
case t.C <- tickedAt: // tick!
continue fastForward
}
}
}(from)
runtime.Gosched()

return OnBeforeEvent
}

// Stop turns off a ticker. After Stop, no more ticks will be sent.
Expand Down Expand Up @@ -153,14 +256,18 @@ func (t *Ticker) getLastTickedAt() time.Time {
}

func (t *Ticker) updateLastTickedAt() time.Time {
return t.updateLastTickedAtTo(Now())
}

func (t *Ticker) updateLastTickedAtTo(at time.Time) time.Time {
t.lock.RLock()
defer t.lock.RUnlock()
t.lastTickedAt = Now()
t.lastTickedAt = at
return t.lastTickedAt
}

func after(d time.Duration) (<-chan time.Time, func()) {
if d == 0 {
if d <= 0 {
var ch = make(chan time.Time)
close(ch)
return ch, func() {}
Expand Down
Loading

0 comments on commit eff987b

Please sign in to comment.