Skip to content

Commit

Permalink
Merge branch 'Lee-Minjea-feat/throttle'
Browse files Browse the repository at this point in the history
  • Loading branch information
samber committed Jan 26, 2025
2 parents d587677 + 5cd3266 commit 19d8355
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 1 deletion.
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# lo - Iterate over slices, maps, channels...

[![tag](https://img.shields.io/github/tag/samber/lo.svg)](https://github.com/samber/lo/releases)
Expand Down Expand Up @@ -297,6 +298,7 @@ Concurrency helpers:
- [AttemptWhileWithDelay](#attemptwhilewithdelay)
- [Debounce](#debounce)
- [DebounceBy](#debounceby)
- [Throttle](#throttle)
- [Synchronize](#synchronize)
- [Async](#async)
- [Transaction](#transaction)
Expand Down Expand Up @@ -3417,6 +3419,64 @@ cancel("second key")

[[play](https://go.dev/play/p/d3Vpt6pxhY8)]

### Throttle

Creates a throttled instance that invokes given functions only once in every interval.

This returns 2 functions, First one is throttled function and Second one is a function to reset interval.

```go
f := func() {
println("Called once in every 100ms")
}

throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()
throttle()
```

`NewThrottleWithCount` is NewThrottle with count limit, throttled function will be invoked count times in every interval.

```go
f := func() {
println("Called three times in every 100ms")
}

throttle, reset := lo.NewThrottleWithCount(100 * time.Millisecond, f)

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()
throttle()
```

`NewThrottleBy` and `NewThrottleByWithCount` are NewThrottle with sharding key, throttled function will be invoked count times in every interval.

```go
f := func(key string) {
println(key, "Called three times in every 100ms")
}

throttle, reset := lo.NewThrottleByWithCount(100 * time.Millisecond, f)

for j := 0; j < 10; j++ {
throttle("foo")
time.Sleep(30 * time.Millisecond)
}

reset()
throttle()
```

### Synchronize

Wraps the underlying callback in a mutex. It receives an optional mutex.
Expand Down
87 changes: 86 additions & 1 deletion retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,89 @@ func (t *Transaction[T]) Process(state T) (T, error) {
return state, err
}

// throttle ?
// @TODO: single mutex per key ?
type throttleBy[T comparable] struct {
mu *sync.Mutex
timer *time.Timer
interval time.Duration
callbacks []func(key T)
countLimit int
count map[T]int
}

func (th *throttleBy[T]) throttledFunc(key T) {
th.mu.Lock()
defer th.mu.Unlock()

if _, ok := th.count[key]; !ok {
th.count[key] = 0
}

if th.count[key] < th.countLimit {
th.count[key]++

for _, f := range th.callbacks {
f(key)
}

}
if th.timer == nil {
th.timer = time.AfterFunc(th.interval, func() {
th.reset()
})
}
}

func (th *throttleBy[T]) reset() {
th.mu.Lock()
defer th.mu.Unlock()

if th.timer != nil {
th.timer.Stop()
}

th.count = map[T]int{}
th.timer = nil
}

// NewThrottle creates a throttled instance that invokes given functions only once in every interval.
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
func NewThrottle(interval time.Duration, f ...func()) (throttle func(), reset func()) {
return NewThrottleWithCount(interval, 1, f...)
}

// NewThrottleWithCount is NewThrottle with count limit, throttled function will be invoked count times in every interval.
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (throttle func(), reset func()) {
callbacks := Map(f, func(item func(), _ int) func(struct{}) {
return func(struct{}) {
item()
}
})

throttleFn, reset := NewThrottleByWithCount[struct{}](interval, count, callbacks...)
return func() {
throttleFn(struct{}{})
}, reset
}

// NewThrottleBy creates a throttled instance that invokes given functions only once in every interval.
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
func NewThrottleBy[T comparable](interval time.Duration, f ...func(key T)) (throttle func(key T), reset func()) {
return NewThrottleByWithCount[T](interval, 1, f...)
}

// NewThrottleByWithCount is NewThrottleBy with count limit, throttled function will be invoked count times in every interval.
func NewThrottleByWithCount[T comparable](interval time.Duration, count int, f ...func(key T)) (throttle func(key T), reset func()) {
if count <= 0 {
count = 1
}

th := &throttleBy[T]{
mu: new(sync.Mutex),
interval: interval,
callbacks: f,
countLimit: count,
count: map[T]int{},
}
return th.throttledFunc, th.reset
}
89 changes: 89 additions & 0 deletions retry_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,92 @@ func ExampleTransaction_error() {
// -5
// error
}

func ExampleNewThrottle() {
throttle, reset := NewThrottle(100*time.Millisecond, func() {
fmt.Println("Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
}

func ExampleNewThrottleWithCount() {
throttle, reset := NewThrottleWithCount(100*time.Millisecond, 2, func() {
fmt.Println("Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
}

func ExampleNewThrottleBy() {
throttle, reset := NewThrottleBy(100*time.Millisecond, func(key string) {
fmt.Println(key, "Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle("foo")
throttle("bar")
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
}

func ExampleNewThrottleByWithCount() {
throttle, reset := NewThrottleByWithCount(100*time.Millisecond, 2, func(key string) {
fmt.Println(key, "Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle("foo")
throttle("bar")
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
}
Loading

0 comments on commit 19d8355

Please sign in to comment.