diff --git a/README.md b/README.md index dd848c39..94cd6df9 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,7 @@ Concurrency helpers: - [AttemptWhileWithDelay](#attemptwhilewithdelay) - [Debounce](#debounce) - [DebounceBy](#debounceby) +- [Throttle](#throttle) - [Synchronize](#synchronize) - [Async](#async) - [Transaction](#transaction) @@ -2529,6 +2530,27 @@ cancel("second key") [[play](https://go.dev/play/p/d3Vpt6pxhY8)] +### Throttle +`NewThrottle` creates a throttled instance that invokes given functions only once in every interval. +This returns 2 functions, First one is throttled function and Second one is purge function which invokes given functions immediately and reset interval timer. + +```go + +f := func() { + println("Called once in every 100ms") +} + +throttle, purge := lo.NewThrottle(100 * time.Millisecond, f) + +for j := 0; j < 10; j++ { + throttle() + time.Sleep(30 * time.Millisecond) +} + +purge() + +``` + ### Synchronize Wraps the underlying callback in a mutex. It receives an optional mutex. diff --git a/retry.go b/retry.go index 11f456d2..e5de921b 100644 --- a/retry.go +++ b/retry.go @@ -287,4 +287,55 @@ func (t *Transaction[T]) Process(state T) (T, error) { return state, err } -// throttle ? +type throttle struct { + mu *sync.Mutex + timer *time.Timer + needInvoke bool + interval time.Duration + callbacks []func() +} + +func (th *throttle) throttledFunc() { + th.mu.Lock() + defer th.mu.Unlock() + th.needInvoke = true + if th.timer == nil { + th.timer = time.AfterFunc(th.interval, func() { + th.purge(false) + }) + } +} + +func (th *throttle) purge(forcePurge bool) { + th.mu.Lock() + defer th.mu.Unlock() + + if th.timer != nil { + th.timer.Stop() + } + + if th.needInvoke || forcePurge { + for _, f := range th.callbacks { + f() + } + th.needInvoke = false + th.timer = time.AfterFunc(th.interval, func() { + th.purge(false) + }) + } else { + th.timer = nil + } +} + +// NewThrottle creates a throttled instance that invokes given functions only once in every interval. +// This returns 2 functions, First one is throttled function and Second one is purge function which invokes given functions immediately and reset interval timer. +func NewThrottle(interval time.Duration, f ...func()) (func(), func()) { + th := &throttle{ + mu: new(sync.Mutex), + interval: interval, + callbacks: f, + } + return th.throttledFunc, func() { + th.purge(true) + } +} diff --git a/retry_test.go b/retry_test.go index 1ac00703..d3faca2a 100644 --- a/retry_test.go +++ b/retry_test.go @@ -498,3 +498,37 @@ func TestTransaction(t *testing.T) { is.Equal(assert.AnError, err) } } + +func TestNewThrottle(t *testing.T) { + t.Parallel() + is := assert.New(t) + callCount := 0 + f1 := func() { + callCount++ + } + th, purge := NewThrottle(10*time.Millisecond, f1) + + is.Equal(0, callCount) + for i := 0; i < 7; i++ { + var wg sync.WaitGroup + for j := 0; j < 100; j++ { + wg.Add(1) + go func() { + defer wg.Done() + th() + }() + } + wg.Wait() + time.Sleep(5 * time.Millisecond) + } + // 35 ms passed + is.Equal(3, callCount) + + purge() + is.Equal(4, callCount) + + // pause a little bit without calling + time.Sleep(20 * time.Millisecond) + is.Equal(4, callCount) + +}