Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NewThrottle #427

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ Concurrency helpers:
- [AttemptWhileWithDelay](#attemptwhilewithdelay)
- [Debounce](#debounce)
- [DebounceBy](#debounceby)
- [Throttle](#throttle)
- [Synchronize](#synchronize)
- [Async](#async)
- [Transaction](#transaction)
Expand Down Expand Up @@ -2529,6 +2530,47 @@ cancel("second key")

[[play](https://go.dev/play/p/d3Vpt6pxhY8)]

### Throttle
`NewThrottle` creates a throttled instance that invokes given functions only once in every interval.
This returns 2 functions, First one is throttled function and Second one is a function to reset interval.

```go

f := func() {
println("Called once in every 100ms")
}

throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()
throttle()

```

`NewThrottleWithCount` is NewThrottle with count limit, throttled function will be invoked count times in every interval.
```go

f := func() {
println("Called three times in every 100ms")
}

throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()
throttle()

```

### Synchronize

Wraps the underlying callback in a mutex. It receives an optional mutex.
Expand Down
60 changes: 59 additions & 1 deletion retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,62 @@ func (t *Transaction[T]) Process(state T) (T, error) {
return state, err
}

// throttle ?
type throttle struct {
mu *sync.Mutex
timer *time.Timer
interval time.Duration
callbacks []func()
countLimit int
count int
}

func (th *throttle) throttledFunc() {
th.mu.Lock()
defer th.mu.Unlock()
if th.count < th.countLimit {
th.count++

for _, f := range th.callbacks {
f()
}

}
if th.timer == nil {
th.timer = time.AfterFunc(th.interval, func() {
th.reset()
})
}
}

func (th *throttle) reset() {
th.mu.Lock()
defer th.mu.Unlock()

if th.timer != nil {
th.timer.Stop()
}

th.count = 0
th.timer = nil

}

// NewThrottle creates a throttled instance that invokes given functions only once in every interval.
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
func NewThrottle(interval time.Duration, f ...func()) (func(), func()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also think about a NewThrottle(100ms, 3, cb) that would trigger the first 3 calls, then ignore others during 100ms. Like we do for a basic rate limiter.

Maybe it is a different helper.

WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also think about a NewThrottle(100ms, 3, cb) that would trigger the first 3 calls, then ignore others during 100ms. Like we do for a basic rate limiter.

Maybe it is a different helper.

WDYT?

That would be a interesting option, but I am also afraid that might makes the function's signature complicated.
So I think, it's better to be implemented in other helper or function with different signature.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 helpers NewThrottle and NewThrottleWithCount ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewThrottleWithCount looks nice,
I'll add it

return NewThrottleWithCount(interval, 1, f...)
}

// NewThrottleWithCount is NewThrottle with count limit, throttled function will be invoked count times in every interval.
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (func(), func()) {
if count <= 0 {
count = 1
}
th := &throttle{
mu: new(sync.Mutex),
interval: interval,
callbacks: f,
countLimit: count,
}
return th.throttledFunc, th.reset
}
61 changes: 61 additions & 0 deletions retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,3 +498,64 @@ func TestTransaction(t *testing.T) {
is.Equal(assert.AnError, err)
}
}

func TestNewThrottle(t *testing.T) {
t.Parallel()
is := assert.New(t)
callCount := 0
f1 := func() {
callCount++
}
th, reset := NewThrottle(10*time.Millisecond, f1)

is.Equal(0, callCount)
for j := 0; j < 100; j++ {
th()
}
is.Equal(1, callCount)

time.Sleep(15 * time.Millisecond)

for j := 0; j < 100; j++ {
th()
}

is.Equal(2, callCount)

// reset counter
reset()
th()
is.Equal(3, callCount)

}

func TestNewThrottleWithCount(t *testing.T) {
t.Parallel()
is := assert.New(t)
callCount := 0
f1 := func() {
callCount++
}
th, reset := NewThrottleWithCount(10*time.Millisecond, 3, f1)

// the function does not throttle for initial count number
for i := 0; i < 20; i++ {
th()
}
is.Equal(3, callCount)

time.Sleep(11 * time.Millisecond)

for i := 0; i < 20; i++ {
th()
}

is.Equal(6, callCount)

reset()
for i := 0; i < 20; i++ {
th()
}

is.Equal(9, callCount)
}