diff --git a/go/pkg/clock/cached_clock.go b/go/pkg/clock/cached_clock.go new file mode 100644 index 0000000000..650d52dedf --- /dev/null +++ b/go/pkg/clock/cached_clock.go @@ -0,0 +1,73 @@ +package clock + +import ( + "sync/atomic" + "time" +) + +// This clock implementation takes inspiration from +// https://github.com/agilira/go-timecache +// but works with our clock.Clock interface + +// CachedClock implements the Clock interface using a cached atomic value +// to avoid the overhead of system calls from time.Now(). +type CachedClock struct { + nanos atomic.Int64 + resolution time.Duration + ticker *time.Ticker + close chan struct{} +} + +// NewCachedClock creates a new CachedClock that uses the system time cached every [resolution]. +// +// Example: +// +// clock := clock.NewCachedClock(time.Millisecond) +// currentTime := clock.Now() +func NewCachedClock(resolution time.Duration) *CachedClock { + + close := make(chan struct{}) + ticker := time.NewTicker(resolution) + + c := &CachedClock{ + nanos: atomic.Int64{}, + resolution: resolution, + ticker: ticker, + close: close, + } + + // Initialize with current time + c.nanos.Store(time.Now().UnixNano()) + + go func() { + for { + select { + case <-ticker.C: + c.nanos.Store(time.Now().UnixNano()) + case <-close: + ticker.Stop() + return + } + } + }() + + return c + +} + +// Ensure CachedClock implements the Clock interface +var _ Clock = &CachedClock{} + +// Now returns the current system time. +// This implementation returns the cached time value. +func (c *CachedClock) Now() time.Time { + return time.Unix(0, c.nanos.Load()) +} + +// Close stops the background goroutine that updates the cached time. +// After calling Close, the clock will continue to return the last cached time +// but will no longer update. This method should be called to clean up resources +// when the CachedClock is no longer needed. +func (c *CachedClock) Close() { + close(c.close) +} diff --git a/go/pkg/clock/cached_clock_test.go b/go/pkg/clock/cached_clock_test.go new file mode 100644 index 0000000000..48423c9bbf --- /dev/null +++ b/go/pkg/clock/cached_clock_test.go @@ -0,0 +1,128 @@ +package clock + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCachedClock(t *testing.T) { + t.Run("NewCachedClock creates working clock", func(t *testing.T) { + resolution := 10 * time.Millisecond + clock := NewCachedClock(resolution) + defer clock.Close() + + // Initial time should be set quickly + time.Sleep(2 * time.Millisecond) + now := clock.Now() + + // Should be within reasonable bounds of current time + require.WithinDuration(t, time.Now(), now, 50*time.Millisecond) + }) + + t.Run("Now returns cached time within resolution", func(t *testing.T) { + resolution := 50 * time.Millisecond + clock := NewCachedClock(resolution) + defer clock.Close() + + // Get initial time + time.Sleep(resolution + 5*time.Millisecond) // Ensure cache is populated + t1 := clock.Now() + realTime1 := time.Now() + + // Wait less than resolution, time should be the same + time.Sleep(10 * time.Millisecond) + t2 := clock.Now() + realTime2 := time.Now() + + // Cached time should be the same (within nanoseconds) + require.True(t, t1.Equal(t2), "cached time should not change within resolution") + + // Real time should have advanced + require.True(t, realTime2.After(realTime1), "real time should advance") + + // Wait for resolution to pass, cached time should update + time.Sleep(resolution + 5*time.Millisecond) + t3 := clock.Now() + + require.True(t, t3.After(t2), "cached time should update after resolution") + }) + + t.Run("Concurrent access is safe", func(t *testing.T) { + resolution := 1 * time.Millisecond + clock := NewCachedClock(resolution) + defer clock.Close() + + const numGoroutines = 100 + const numCalls = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < numCalls; j++ { + _ = clock.Now() + } + }() + } + + wg.Wait() + // If we reach here without panic/race, concurrent access is safe + }) + + t.Run("Close stops background goroutine", func(t *testing.T) { + resolution := 10 * time.Millisecond + clock := NewCachedClock(resolution) + + // Let it run for a bit + time.Sleep(resolution + 5*time.Millisecond) + t1 := clock.Now() + + // Close the clock + clock.Close() + + // Wait longer than resolution + time.Sleep(resolution * 3) + + t2 := clock.Now() + + // Time should be the same after close (no more updates) + require.True(t, t1.Equal(t2), "time should not change after Close()") + }) + + t.Run("Very small resolution works", func(t *testing.T) { + resolution := 100 * time.Microsecond + clock := NewCachedClock(resolution) + defer clock.Close() + + time.Sleep(1 * time.Millisecond) + now := clock.Now() + + require.WithinDuration(t, time.Now(), now, 5*time.Millisecond) + }) + + t.Run("Large resolution works", func(t *testing.T) { + resolution := 100 * time.Millisecond + clock := NewCachedClock(resolution) + defer clock.Close() + + // Get initial time + time.Sleep(resolution + 10*time.Millisecond) + t1 := clock.Now() + + // Wait less than resolution + time.Sleep(10 * time.Millisecond) + t2 := clock.Now() + + // Should be the same cached value + require.True(t, t1.Equal(t2), "time should not change within large resolution") + }) + + t.Run("CachedClock implements Clock interface", func(t *testing.T) { + var _ Clock = &CachedClock{} // nolint:exhaustruct + }) +} diff --git a/go/pkg/clock/clock_benchmarks_test.go b/go/pkg/clock/clock_benchmarks_test.go new file mode 100644 index 0000000000..e6b7538ab0 --- /dev/null +++ b/go/pkg/clock/clock_benchmarks_test.go @@ -0,0 +1,40 @@ +package clock + +import ( + "testing" + "time" +) + +func BenchmarkCachedClockNow(b *testing.B) { + resolutions := []time.Duration{ + time.Microsecond, + 500 * time.Microsecond, + time.Millisecond, + 10 * time.Millisecond, + } + + for _, resolution := range resolutions { + b.Run(resolution.String(), func(b *testing.B) { + clock := NewCachedClock(resolution) + defer clock.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = clock.Now() + } + }) + }) + } +} + +func BenchmarkRealClockNow(b *testing.B) { + clock := New() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = clock.Now() + } + }) +} diff --git a/go/pkg/clock/interface.go b/go/pkg/clock/interface.go index 94f8b59057..edb1f21a4c 100644 --- a/go/pkg/clock/interface.go +++ b/go/pkg/clock/interface.go @@ -15,21 +15,4 @@ type Clock interface { // In test implementations, this returns a controlled time that // can be manipulated for testing purposes. Now() time.Time - - // NewTicker returns a new Ticker containing a channel that will send - // the current time on the channel after each tick. - // In production implementations, this creates a real ticker. - // In test implementations, this creates a controllable ticker. - NewTicker(d time.Duration) Ticker -} - -// Ticker represents a ticker that sends the current time at regular intervals. -// This interface abstracts the standard library's time.Ticker to enable -// deterministic testing of ticker-based code. -type Ticker interface { - // C returns the channel on which the ticks are delivered. - C() <-chan time.Time - - // Stop turns off a ticker. After Stop, no more ticks will be sent. - Stop() } diff --git a/go/pkg/clock/real_clock.go b/go/pkg/clock/real_clock.go index cf9c4ab7d7..ca7aa8fad7 100644 --- a/go/pkg/clock/real_clock.go +++ b/go/pkg/clock/real_clock.go @@ -27,25 +27,3 @@ var _ Clock = &RealClock{} func (c *RealClock) Now() time.Time { return time.Now() } - -// NewTicker returns a new real ticker that sends the current time -// on the channel after each tick. This implementation delegates -// to time.NewTicker(). -func (c *RealClock) NewTicker(d time.Duration) Ticker { - return &realTicker{ticker: time.NewTicker(d)} -} - -// realTicker wraps time.Ticker to implement the Ticker interface -type realTicker struct { - ticker *time.Ticker -} - -// C returns the channel on which the ticks are delivered. -func (t *realTicker) C() <-chan time.Time { - return t.ticker.C -} - -// Stop turns off the ticker. -func (t *realTicker) Stop() { - t.ticker.Stop() -} diff --git a/go/pkg/clock/test_clock.go b/go/pkg/clock/test_clock.go index dcff8f9d0b..65b601084a 100644 --- a/go/pkg/clock/test_clock.go +++ b/go/pkg/clock/test_clock.go @@ -9,9 +9,8 @@ import ( // It allows tests to manually set and advance time to create deterministic // test scenarios for time-dependent code. type TestClock struct { - mu sync.RWMutex - now time.Time - tickers []*testTicker + mu sync.RWMutex + now time.Time } // NewTestClock creates a new TestClock instance. @@ -30,7 +29,7 @@ func NewTestClock(now ...time.Time) *TestClock { if len(now) == 0 { now = append(now, time.Now()) } - return &TestClock{mu: sync.RWMutex{}, now: now[0], tickers: []*testTicker{}} + return &TestClock{mu: sync.RWMutex{}, now: now[0]} } // Ensure TestClock implements the Clock interface @@ -47,8 +46,7 @@ func (c *TestClock) Now() time.Time { // Tick advances the clock by the given duration and returns the new time. // This method is particularly useful for testing time-dependent behavior -// without waiting for real time to pass. It also triggers any tickers -// that should fire during this time advancement. +// without waiting for real time to pass. // // Example: // @@ -64,18 +62,12 @@ func (c *TestClock) Tick(d time.Duration) time.Time { defer c.mu.Unlock() c.now = c.now.Add(d) - // Notify all tickers about the time advancement - for _, ticker := range c.tickers { - ticker.checkForTick(c.now) - } - return c.now } // Set changes the clock to the given time and returns the new time. // This allows tests to jump to specific points in time for testing -// time-dependent behavior. It also triggers any tickers that should -// fire during this time change. +// time-dependent behavior. // // Example: // @@ -89,97 +81,5 @@ func (c *TestClock) Set(t time.Time) time.Time { defer c.mu.Unlock() c.now = t - // Notify all tickers about the time change - for _, ticker := range c.tickers { - ticker.checkForTick(c.now) - } - return c.now } - -// NewTicker returns a new test ticker that can be manually controlled. -// The ticker will only send ticks when the clock is advanced using Tick() or Set(). -// This enables deterministic testing of ticker-based code. -func (c *TestClock) NewTicker(d time.Duration) Ticker { - c.mu.Lock() - defer c.mu.Unlock() - - ch := make(chan time.Time, 1) // Buffered to prevent blocking - ticker := &testTicker{ - mu: sync.Mutex{}, - clock: c, - interval: d, - lastTick: c.now, - ch: ch, - stopped: false, - } - - // Register this ticker with the clock - c.tickers = append(c.tickers, ticker) - - return ticker -} - -// testTicker implements a controllable ticker for testing -type testTicker struct { - clock *TestClock - interval time.Duration - lastTick time.Time - ch chan time.Time - stopped bool - mu sync.Mutex -} - -// C returns the channel on which the ticks are delivered. -func (t *testTicker) C() <-chan time.Time { - return t.ch -} - -// Stop turns off the ticker. -func (t *testTicker) Stop() { - t.mu.Lock() - defer t.mu.Unlock() - t.stopped = true - close(t.ch) - - // Remove this ticker from the clock's list - t.clock.removeTicker(t) -} - -// removeTicker removes a ticker from the clock's list (internal method) -func (c *TestClock) removeTicker(tickerToRemove *testTicker) { - c.mu.Lock() - defer c.mu.Unlock() - - // Find and remove the ticker - for i, ticker := range c.tickers { - if ticker == tickerToRemove { - // Remove by swapping with last element and truncating - c.tickers[i] = c.tickers[len(c.tickers)-1] - c.tickers = c.tickers[:len(c.tickers)-1] - break - } - } -} - -// checkForTick checks if enough time has passed to send a tick. -// This is called internally when the clock advances. -func (t *testTicker) checkForTick(currentTime time.Time) { - t.mu.Lock() - defer t.mu.Unlock() - - if t.stopped { - return - } - - // Check if enough time has passed since last tick - if currentTime.Sub(t.lastTick) >= t.interval { - // Send tick if channel has space (non-blocking) - select { - case t.ch <- currentTime: - t.lastTick = currentTime - default: - // Channel is full, skip this tick (mimics real ticker behavior) - } - } -}