From eff987bd22b79706ed25ce26ee78d31134a2e7a6 Mon Sep 17 00:00:00 2001 From: Adam Luzsi Date: Sun, 4 Aug 2024 20:29:48 +0200 Subject: [PATCH] add support for DeepFreeze In most testing scenarios, it's sufficient to freeze time at a fixed point without stopping Tickers, as doing so could disrupt background goroutines that rely on them. However, there are cases where we need to completely freeze time, including all background tasks. --- clock/Clock.go | 143 ++++++++++++++++++++---- clock/Clock_test.go | 198 ++++++++++++++++++++++++++++++---- clock/internal/chronos.go | 46 +++++--- clock/internal/notify.go | 41 +++++-- clock/timecop/timecop.go | 11 ++ internal/teardown/Teardown.go | 2 + 6 files changed, 379 insertions(+), 62 deletions(-) diff --git a/clock/Clock.go b/clock/Clock.go index a1d37ab..353f01c 100644 --- a/clock/Clock.go +++ b/clock/Clock.go @@ -1,6 +1,7 @@ package clock import ( + "runtime" "sync" "time" @@ -28,21 +29,38 @@ func After(d time.Duration) <-chan struct{} { return ch } go func() { - timeTravel := make(chan struct{}) + timeTravel := make(chan internal.TimeTravelEvent) defer internal.Notify(timeTravel)() + defer close(ch) + var handleTimeTravel func(tt internal.TimeTravelEvent) bool + handleTimeTravel = func(tt internal.TimeTravelEvent) bool { + deadline := startedAt.Add(d) + if tt.When.After(deadline) || tt.When.Equal(deadline) { + return true + } + if tt.Deep && tt.Freeze { + // wait for next time travel, since during deep freeze, the flow of time is frozen + return handleTimeTravel(<-timeTravel) + } + return false + } + if tt, ok := internal.Check(); ok && tt.Deep && tt.Freeze { + if handleTimeTravel(tt) { + return + } + } var onWait = func() (_restart bool) { c, td := after(internal.RemainingDuration(startedAt, d)) defer td() select { + case tt := <-timeTravel: + return !handleTimeTravel(tt) case <-c: return false - case <-timeTravel: - return true } } for onWait() { } - close(ch) }() return ch } @@ -59,7 +77,7 @@ type Ticker struct { d time.Duration onInit sync.Once - lock sync.RWMutex + lock sync.RWMutex // is lock really needed if only the background goroutine reads the values from it? done chan struct{} pulse chan struct{} ticker *time.Ticker @@ -74,10 +92,17 @@ func (t *Ticker) init() { t.ticker = time.NewTicker(t.getScaledDuration()) t.updateLastTickedAt() go func() { - timeTravel := make(chan struct{}) + timeTravel := make(chan internal.TimeTravelEvent) defer internal.Notify(timeTravel)() + + if tt, ok := internal.Check(); ok { // trigger initial time travel awareness + if !t.handleTimeTravel(timeTravel, tt) { + return + } + } + for { - if !t.ticking(timeTravel, t.ticker.C) { + if !t.ticking(timeTravel, t.ticker.C, tickingOption{}) { break } } @@ -85,22 +110,100 @@ func (t *Ticker) init() { }) } -func (t *Ticker) ticking(timeTravel <-chan struct{}, tick <-chan time.Time) bool { +type tickingOption struct { + // OnEvent will be executed when an event is received during waiting for ticking + OnEvent func() +} + +func (h tickingOption) onEvent() { + if h.OnEvent == nil { + return + } + h.OnEvent() +} + +func (t *Ticker) ticking(timeTravel <-chan internal.TimeTravelEvent, tick <-chan time.Time, o tickingOption) bool { select { case <-t.done: + o.onEvent() return false - case <-timeTravel: // on time travel, we reset the ticker according to the new time - defer t.resetTicker() - c, td := after(internal.RemainingDuration(t.getLastTickedAt(), t.getRealDuration())) - defer td() - return t.ticking(timeTravel, c) // wait the remaining time from the current tick + case tt := <-timeTravel: // on time travel, we reset the ticker according to the new time + o.onEvent() + return t.handleTimeTravel(timeTravel, tt) - case <-tick: // on timeout, we notify the listener - now := t.updateLastTickedAt() - t.C <- now + case <-tick: // on time.Ticker tick, we also tick + o.onEvent() + select { + case tt := <-timeTravel: + return t.handleTimeTravel(timeTravel, tt) + case t.C <- t.updateLastTickedAt(): + } return true + + } +} + +func (t *Ticker) handleTimeTravel(timeTravel <-chan internal.TimeTravelEvent, tt internal.TimeTravelEvent) bool { + var ( + opt = tickingOption{} + prev = tt.Prev + when = tt.When + ) + if lastTickedAt := t.getLastTickedAt(); lastTickedAt.Before(prev) { + prev = lastTickedAt + } + if fn := t.fastForwardTicksTo(prev, when); fn != nil { + opt.OnEvent = fn + } + if tt.Deep && tt.Freeze { + return t.ticking(timeTravel, nil, opt) // wait for unfreeze + } + defer t.resetTicker() + c, td := after(internal.RemainingDuration(t.getLastTickedAt(), t.getRealDuration())) + defer td() + return t.ticking(timeTravel, c, opt) // wait the remaining time from the current tick +} + +func (t *Ticker) fastForwardTicksTo(from, till time.Time) func() { + var travelledDuration = till.Sub(from) + + if travelledDuration <= 0 { + return nil + } + + var ( + doneBeforeNextEvent = make(chan struct{}) + fastforwardWG = &sync.WaitGroup{} + timeBetweenTicks = t.getRealDuration() + missingTicks = int(travelledDuration / timeBetweenTicks) + ) + var OnBeforeEvent = func() { + close(doneBeforeNextEvent) + fastforwardWG.Wait() } + + // fast forward last ticked at position to the time after the ticks + t.updateLastTickedAtTo(from.Add(timeBetweenTicks * time.Duration(missingTicks))) + + fastforwardWG.Add(1) + go func(tickedAt time.Time) { + defer fastforwardWG.Done() + + fastForward: + for i := 0; i < missingTicks; i++ { + tickedAt = tickedAt.Add(timeBetweenTicks) // move to the next tick time + select { + case <-doneBeforeNextEvent: + break fastForward + case t.C <- tickedAt: // tick! + continue fastForward + } + } + }(from) + runtime.Gosched() + + return OnBeforeEvent } // Stop turns off a ticker. After Stop, no more ticks will be sent. @@ -153,14 +256,18 @@ func (t *Ticker) getLastTickedAt() time.Time { } func (t *Ticker) updateLastTickedAt() time.Time { + return t.updateLastTickedAtTo(Now()) +} + +func (t *Ticker) updateLastTickedAtTo(at time.Time) time.Time { t.lock.RLock() defer t.lock.RUnlock() - t.lastTickedAt = Now() + t.lastTickedAt = at return t.lastTickedAt } func after(d time.Duration) (<-chan time.Time, func()) { - if d == 0 { + if d <= 0 { var ch = make(chan time.Time) close(ch) return ch, func() {} diff --git a/clock/Clock_test.go b/clock/Clock_test.go index d539155..c99ba21 100644 --- a/clock/Clock_test.go +++ b/clock/Clock_test.go @@ -9,10 +9,12 @@ import ( "time" "go.llib.dev/testcase/let" + "go.llib.dev/testcase/pp" "go.llib.dev/testcase" "go.llib.dev/testcase/assert" "go.llib.dev/testcase/clock" + "go.llib.dev/testcase/clock/internal" "go.llib.dev/testcase/clock/timecop" ) @@ -236,6 +238,36 @@ func TestAfter(t *testing.T) { <-clock.After(0) }, "expected to finish instantly") }) + + s.Test("deep freezing things before calling After will make the newly made after not moving", func(t *testcase.T) { + timecop.Travel(t, time.Duration(0), timecop.DeepFreeze) + + var ( + duration = time.Microsecond + assertTimeout = time.Millisecond + afterChannel = clock.After(duration) + ) + + var tryReadChannel = func(ctx context.Context) { + select { + case <-afterChannel: + case <-ctx.Done(): + } + } + + assert.NotWithin(t, assertTimeout, tryReadChannel, + "expected that channel is not readable due to deep freeze") + + timecop.Travel(t, duration/2, timecop.DeepFreeze) + assert.NotWithin(t, assertTimeout, tryReadChannel, + "even after travelling a shorter duration than the After(timeout)", + "it should be still not ticking off") + + timecop.Travel(t, duration/2+time.Nanosecond, timecop.DeepFreeze) + assert.Within(t, assertTimeout, tryReadChannel, + "after time travel went to a time where the after should have ended already") + + }) } func Test_testTimeWithMinusDuration(t *testing.T) { @@ -261,7 +293,10 @@ func Test_race(t *testing.T) { } func TestNewTicker(t *testing.T) { - const failureRateMultiplier = 0.80 + const failureRateMultiplier float64 = 0.70 + var adjust = func(n int64) int64 { + return int64(float64(n) * failureRateMultiplier) + } s := testcase.NewSpec(t) duration := testcase.Let[time.Duration](s, nil) @@ -272,7 +307,7 @@ func TestNewTicker(t *testing.T) { }) s.Test("by default, clock.Ticker behaves as time.Ticker", func(t *testcase.T) { - duration.Set(t, time.Second/100) + duration.Set(t, time.Second/1000) var ( clockTicks, timeTicks int64 @@ -314,13 +349,13 @@ func TestNewTicker(t *testing.T) { }, ) - time.Sleep(time.Second / 4) + time.Sleep(time.Second / 10) close(done) wg.Wait() assert.True(t, 10 < timeTicks) - assert.True(t, 100/4*failureRateMultiplier <= timeTicks) - assert.True(t, 100/4*failureRateMultiplier <= clockTicks) + assert.True(t, adjust(100/10) <= timeTicks) + assert.True(t, adjust(100/10) <= clockTicks) }) s.Test("time travelling affect ticks", func(t *testcase.T) { @@ -333,7 +368,7 @@ func TestNewTicker(t *testing.T) { go func() { select { case at := <-ticker.Get(t).C: - t.Log("ticker ticked") + t.Log("ticker ticked", pp.Format(at)) atomic.AddInt64(&now, at.Unix()) case <-done: return @@ -362,6 +397,133 @@ func TestNewTicker(t *testing.T) { }) }) + s.Test("freezing will not affect the frequency of the ticks only the returned time, as ticks often used for background scheduling", func(t *testcase.T) { + timecop.Travel(t, time.Duration(0), timecop.Freeze) + duration.Set(t, time.Second/10) + + var ticks int64 + go func() { + for { + select { + case <-ticker.Get(t).C: + atomic.AddInt64(&ticks, 1) + case <-t.Done(): + return + } + } + }() + + time.Sleep(duration.Get(t) * 2) + assert.True(t, 0 < atomic.LoadInt64(&ticks)) + + const additionalTicks = 10000 + timecop.Travel(t, duration.Get(t)*additionalTicks) + runtime.Gosched() + + assert.Eventually(t, 2*duration.Get(t), func(t assert.It) { + currentTicks := atomic.LoadInt64(&ticks) + expMinTicks := int64(additionalTicks * failureRateMultiplier) + t.Log("additional ticks:", additionalTicks) + t.Log("current ticks:", currentTicks) + t.Log("min exp ticks:", expMinTicks) + assert.True(t, expMinTicks < currentTicks) + }) + }) + + s.Test("deep freeze that happened before the creation of ticker will make them halted from the get go", func(t *testcase.T) { + timecop.Travel(t, time.Duration(0), timecop.DeepFreeze) + duration.Set(t, time.Microsecond) + + var ( + ticks int64 + done = make(chan struct{}) + ) + defer close(done) + go func() { + for { + select { + case <-ticker.Get(t).C: + atomic.AddInt64(&ticks, 1) + case <-done: + return + } + } + }() + + time.Sleep(time.Second / 10) + assert.Equal(t, atomic.LoadInt64(&ticks), 0) + }) + + s.Test("deep freeze that happened during the ticker's lifetime will affect the frequency of the ticks as it will make it halt", func(t *testcase.T) { + duration.Set(t, time.Second) + timecop.Travel(t, time.Duration(0), timecop.DeepFreeze) + _, ok := internal.Check() + assert.True(t, ok) + + var ticks int64 + go func() { + for { + select { + case <-ticker.Get(t).C: + atomic.AddInt64(&ticks, 1) + case <-t.Done(): + return + } + } + }() + + // time.Sleep(time.Second) + // travel 3 tick ahead + timecop.Travel(t, 3*duration.Get(t)+time.Nanosecond, timecop.DeepFreeze) + + time.Sleep(3 * time.Second) + + assert.Eventually(t, time.Second, func(t assert.It) { + assert.Equal(t, atomic.LoadInt64(&ticks), 3) + }) + }) + + s.TODO("travelling backwards will make the ticks freeze in time until the last ticked at is reached") + + s.Test("travelling forward with deep freeze flag will cause the ticker to tick the amount it should have if the time was spent", func(t *testcase.T) { + duration.Set(t, time.Microsecond) + + var ( + ticks int64 + done = make(chan struct{}) + ) + defer close(done) + go func() { + for { + select { + case <-ticker.Get(t).C: + atomic.AddInt64(&ticks, 1) + case <-done: + return + } + } + }() + + time.Sleep(time.Second / 20) + + timecop.Travel(t, time.Duration(0), timecop.DeepFreeze) + time.Sleep(2 * duration.Get(t)) + + var ticksAfterFreezing int64 + assert.Eventually(t, time.Second, func(t assert.It) { + curTicks := atomic.LoadInt64(&ticks) + if ticksAfterFreezing == curTicks { + return + } + ticksAfterFreezing = curTicks + runtime.Gosched() + time.Sleep(time.Nanosecond) + }) + + time.Sleep(time.Second / 20) + assert.True(t, int64(float64(ticksAfterFreezing)*failureRateMultiplier) <= atomic.LoadInt64(&ticks)) + }) + s.Test("ticks are continous", func(t *testcase.T) { duration.Set(t, time.Second/100) @@ -381,8 +543,8 @@ func TestNewTicker(t *testing.T) { } }() - time.Sleep(time.Second / 2) - assert.True(t, 100/2*failureRateMultiplier <= atomic.LoadInt64(&ticks)) + time.Sleep(time.Second / 10) + assert.True(t, adjust(100/10) <= atomic.LoadInt64(&ticks)) }) s.Test("duration is scaled", func(t *testcase.T) { @@ -405,8 +567,8 @@ func TestNewTicker(t *testing.T) { } }() - time.Sleep(time.Second / 4) - assert.True(t, 100/4*failureRateMultiplier <= atomic.LoadInt64(&ticks)) + time.Sleep(time.Second / 10) + assert.True(t, adjust(100/10) <= atomic.LoadInt64(&ticks)) }) s.Test("duration is scaled midflight", func(t *testcase.T) { @@ -429,28 +591,24 @@ func TestNewTicker(t *testing.T) { }() t.Log("ticks:", atomic.LoadInt64(&ticks)) - time.Sleep(time.Second/4 + time.Microsecond) + time.Sleep(time.Second/10 + time.Microsecond) runtime.Gosched() - var expectedTickCount int64 = 100 / 4 * failureRateMultiplier + var expectedTickCount int64 = adjust(100 / 10) t.Log("exp:", expectedTickCount, "got:", atomic.LoadInt64(&ticks)) assert.True(t, expectedTickCount <= atomic.LoadInt64(&ticks)) timecop.SetSpeed(t, 1000) // 100x times faster - time.Sleep(time.Second/4 + time.Microsecond) + time.Sleep(time.Second/10 + time.Microsecond) runtime.Gosched() - - // TODO: flaky assertion - // - // FLAKY* - expectedTickCount += 100 / 4 * 1000 * failureRateMultiplier + expectedTickCount += adjust(100 / 10 * 1000) t.Log("exp:", expectedTickCount, "got:", atomic.LoadInt64(&ticks)) assert.True(t, expectedTickCount <= atomic.LoadInt64(&ticks)) // *FLAKY }) t.Run("race", func(t *testing.T) { - ticker := clock.NewTicker(time.Minute) - const timeout = 100 * time.Millisecond + ticker := clock.NewTicker(time.Nanosecond) + const timeout = 50 * time.Millisecond testcase.Race( func() { diff --git a/clock/internal/chronos.go b/clock/internal/chronos.go index 7a29eea..5984ac4 100644 --- a/clock/internal/chronos.go +++ b/clock/internal/chronos.go @@ -4,18 +4,22 @@ import ( "time" ) -func init() { - chrono.Speed = 1 +var chrono struct{ Timeline Timeline } + +func init() { chrono.Timeline.Speed = 1 } + +type Timeline struct { + Altered bool + SetAt time.Time + When time.Time + Prev time.Time + Frozen bool + Deep bool + Speed float64 } -var chrono struct { - Timeline struct { - Altered bool - SetAt time.Time - When time.Time - Frozen bool - } - Speed float64 +func (tl Timeline) IsZero() bool { + return tl == Timeline{} } func SetSpeed(s float64) func() { @@ -23,12 +27,12 @@ func SetSpeed(s float64) func() { defer lock()() frozen := chrono.Timeline.Frozen td := setTime(getTime(), Option{Freeze: frozen}) - og := chrono.Speed - chrono.Speed = s + og := chrono.Timeline.Speed + chrono.Timeline.Speed = s return func() { defer notify() defer lock()() - chrono.Speed = og + chrono.Timeline.Speed = og td() } } @@ -36,6 +40,7 @@ func SetSpeed(s float64) func() { type Option struct { Freeze bool Unfreeze bool + Deep bool } func SetTime(target time.Time, opt Option) func() { @@ -50,16 +55,22 @@ func SetTime(target time.Time, opt Option) func() { } func setTime(target time.Time, opt Option) func() { + prev := getTime() og := chrono.Timeline n := chrono.Timeline n.Altered = true n.SetAt = time.Now() + n.Prev = prev n.When = target if opt.Freeze { n.Frozen = true } + if opt.Deep { + n.Deep = true + } if opt.Unfreeze { n.Frozen = false + n.Deep = false } chrono.Timeline = n return func() { chrono.Timeline = og } @@ -76,7 +87,7 @@ func scaledDuration(d time.Duration) time.Duration { if !chrono.Timeline.Altered { return d } - return time.Duration(float64(d) / chrono.Speed) + return time.Duration(float64(d) / chrono.Timeline.Speed) } func RemainingDuration(from time.Time, nonScaledDuration time.Duration) time.Duration { @@ -103,10 +114,11 @@ func getTime() time.Time { if !chrono.Timeline.Altered { return now } + setAt := chrono.Timeline.SetAt if chrono.Timeline.Frozen { - chrono.Timeline.SetAt = now + setAt = now } - delta := now.Sub(chrono.Timeline.SetAt) - delta = time.Duration(float64(delta) * chrono.Speed) + delta := now.Sub(setAt) + delta = time.Duration(float64(delta) * chrono.Timeline.Speed) return chrono.Timeline.When.Add(delta) } diff --git a/clock/internal/notify.go b/clock/internal/notify.go index 5e38eeb..03c7a7f 100644 --- a/clock/internal/notify.go +++ b/clock/internal/notify.go @@ -1,8 +1,19 @@ package internal -var handlers = map[int]chan<- struct{}{} +import ( + "time" +) -func Notify(c chan<- struct{}) func() { +var handlers = make(map[int]chan<- TimeTravelEvent) + +type TimeTravelEvent struct { + Deep bool + Freeze bool + When time.Time + Prev time.Time +} + +func Notify(c chan<- TimeTravelEvent) func() { if c == nil { panic("clock: Notify using nil channel") } @@ -21,12 +32,28 @@ func Notify(c chan<- struct{}) func() { } } +func Check() (TimeTravelEvent, bool) { + defer rlock()() + return lookupTimeTravelEvent() +} + +func lookupTimeTravelEvent() (TimeTravelEvent, bool) { + return TimeTravelEvent{ + Deep: chrono.Timeline.Deep, + Freeze: chrono.Timeline.Frozen, + When: chrono.Timeline.When, + Prev: chrono.Timeline.Prev, + }, !chrono.Timeline.IsZero() +} + func notify() { defer rlock()() - for index, ch := range handlers { - go func(i int, ch chan<- struct{}) { - defer recover() - ch <- struct{}{} - }(index, ch) + tt, _ := lookupTimeTravelEvent() + var publish = func(channel chan<- TimeTravelEvent) { + defer recover() + channel <- tt + } + for _, ch := range handlers { + go publish(ch) } } diff --git a/clock/timecop/timecop.go b/clock/timecop/timecop.go index de7599d..c66cd81 100644 --- a/clock/timecop/timecop.go +++ b/clock/timecop/timecop.go @@ -57,6 +57,17 @@ func (freeze) configure(o *internal.Option) { o.Freeze = true } +// DeepFreeze is a Travel TravelOption, and it instruct travel to freeze the time wherever it lands after the travelling. +// It is a stronger level of freezing, and will force tickers and timers to also halt immedietly. +const DeepFreeze = deepFreeze(1) + +type deepFreeze int + +func (deepFreeze) configure(o *internal.Option) { + o.Freeze = true + o.Deep = true +} + // Unfreeze is a Travel TravelOption, and it instruct travel that after the time travelling, the flow of time should continue. const Unfreeze = unfreeze(0) diff --git a/internal/teardown/Teardown.go b/internal/teardown/Teardown.go index db05b24..8cf4bf4 100644 --- a/internal/teardown/Teardown.go +++ b/internal/teardown/Teardown.go @@ -119,6 +119,8 @@ func (td *Teardown) Finish() { } func (td *Teardown) isEmpty() bool { + td.mutex.Lock() + defer td.mutex.Unlock() return len(td.fns) == 0 }