Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
38cf1bf
Implement debouncing throttler
window9u Feb 22, 2025
7386170
Add tests
window9u Feb 22, 2025
87a50e9
Rename package `throttle` -> `limit`
window9u Feb 24, 2025
ca8485e
Solve concurrent error in test
window9u Feb 24, 2025
f7b9d6d
Refactor test
window9u Feb 24, 2025
3782427
Rename package `throttle` -> `limit`
window9u Feb 24, 2025
f044456
Rename components
window9u Feb 25, 2025
b9ab305
Change `debouncing` timing to after callback end
window9u Feb 25, 2025
8bf4303
Remove Schedule
window9u Feb 26, 2025
68e05c9
Add limit publisher
window9u Feb 26, 2025
1cd6410
Filter self produced event in client
window9u Feb 26, 2025
99f0ef0
Merge branch 'main' into rate-limiter
hackerwins Feb 28, 2025
9e73b0a
Revert
window9u Mar 3, 2025
90114a7
Remove limiter package
window9u Mar 3, 2025
ac19906
Refactor limiter
window9u Mar 5, 2025
7e7efc0
Add webhook manager component
window9u Mar 6, 2025
d9be94d
Lint
window9u Mar 6, 2025
02ea226
Fix error handling
window9u Mar 6, 2025
1222c09
Rename constant
window9u Mar 7, 2025
5bdad0c
Execute remain job when closing
window9u Mar 7, 2025
982bcf5
Add expire batch configuration option
window9u Mar 7, 2025
b16b7ef
Add NewEventWebhookInfo
window9u Mar 7, 2025
5dcab93
Add close event webhook manager
window9u Mar 7, 2025
5ed551c
Remove golang rate package
window9u Mar 7, 2025
9e93ec1
Lint
window9u Mar 7, 2025
dbf0817
Check Event Webhook Requirement
window9u Mar 11, 2025
38f408d
Decrease Throttle window and Debounce time
window9u Mar 11, 2025
01f655e
Refactor event webhook test as spec changed
window9u Mar 11, 2025
e2c5efa
Merge branch 'main' into rate-limiter
window9u Mar 11, 2025
acd606c
Wait previous debouncing before flushing
window9u Mar 11, 2025
73b6c25
Refactor tests
window9u Mar 11, 2025
e794ac6
Add wait group in test
window9u Mar 11, 2025
7d05a69
Add detail stimulation test
window9u Mar 12, 2025
261ef28
refactor test with `occurs` type
window9u Mar 12, 2025
b141293
Add comment
window9u Mar 12, 2025
526b370
Set `verifySignature` to helper function
window9u Mar 12, 2025
48e7f65
Add bench test for webhook
window9u Mar 12, 2025
f839b16
Refactor limit event stream test
window9u Mar 12, 2025
bd7c930
Lint
window9u Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions api/types/event_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@

package types

import (
"encoding/json"
"fmt"
"time"
)

// DateFormat defines the standard format used for timestamps.
const DateFormat = "2006-01-02T15:04:05.000Z"

// EventWebhookType represents event webhook type
type EventWebhookType string

Expand All @@ -29,14 +38,63 @@ func IsValidEventType(eventType string) bool {
return eventType == string(DocRootChanged)
}

// EventWebhookAttribute represents the attribute of the webhook.
// EventWebhookAttribute represents metadata associated with a webhook event.
type EventWebhookAttribute struct {
Key string `json:"key"`
IssuedAt string `json:"issuedAt"`
}

// EventWebhookRequest represents the request of the webhook.
// EventWebhookRequest represents a webhook event request payload.
type EventWebhookRequest struct {
Type EventWebhookType `json:"type"`
Attributes EventWebhookAttribute `json:"attributes"`
}

// NewRequestBody builds the JSON request body for a webhook event.
func NewRequestBody(docKey string, event EventWebhookType) ([]byte, error) {
req := EventWebhookRequest{
Type: event,
Attributes: EventWebhookAttribute{
Key: docKey,
IssuedAt: time.Now().UTC().Format(DateFormat),
},
}

body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal event webhook request: %w", err)
}
return body, nil
}

// EventWebhookInfo holds the webhook EventRefKey and its associated Attribute.
type EventWebhookInfo struct {
EventRefKey EventRefKey
Attribute WebhookAttribute
}

// NewEventWebhookInfo initializes an EventWebhookInfo with the given parameters.
func NewEventWebhookInfo(
docRefKey DocRefKey,
event EventWebhookType,
signingKey, url, docKey string,
) EventWebhookInfo {
return EventWebhookInfo{
EventRefKey: EventRefKey{
DocRefKey: docRefKey,
EventWebhookType: event,
},
Attribute: WebhookAttribute{
SigningKey: signingKey,
URL: url,
DocKey: docKey,
},
}
}

// WebhookAttribute defines attributes necessary for webhook handling.
type WebhookAttribute struct {
SigningKey string
URL string
DocKey string
}
11 changes: 11 additions & 0 deletions api/types/resource_ref_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,14 @@ type SnapshotRefKey struct {
func (r SnapshotRefKey) String() string {
return fmt.Sprintf("Snapshot (%s.%s.%d)", r.ProjectID, r.DocID, r.ServerSeq)
}

// EventRefKey represents an identifier used to reference an event.
type EventRefKey struct {
DocRefKey
EventWebhookType
}

// String returns the string representation of the given EventRefKey.
func (r EventRefKey) String() string {
return fmt.Sprintf("DocEvent (%s.%s.%s)", r.ProjectID, r.DocID, r.EventWebhookType)
}
28 changes: 28 additions & 0 deletions pkg/limit/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package limit

import "time"

// Bucket represents a single-token bucket that refills every specified time window.
type Bucket struct {
window time.Duration // The interval at which the bucket refills.
last time.Time // The last time a token was granted.
}

// NewBucket creates a new Bucket with the given initial time and refill window.
func NewBucket(now time.Time, window time.Duration) Bucket {
return Bucket{
window: window,
last: now,
}
}

// Allow checks if a token can be granted at the given time.
// It returns true if the time has advanced past the refill window, otherwise false.
func (b *Bucket) Allow(now time.Time) bool {
if now.Before(b.last.Add(b.window)) {
return false
}

b.last = now
return true
}
181 changes: 181 additions & 0 deletions pkg/limit/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2025 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package limit provides rate-limiting functionality with debouncing support.
package limit

import (
"container/list"
"sync"
"time"
)

// Limiter provides rate limiting functionality with a debouncing callback.
// It maintains a single token bucket.
type Limiter[K comparable] struct {
mu sync.Mutex
wg sync.WaitGroup
closing chan struct{}

expireInterval time.Duration
throttleWindow time.Duration
debouncingTime time.Duration
expireBatchSize int

// evictionList holds the limiter entries in order of recency.
evictionList *list.List
// entries maps keys to their corresponding list element for quick lookup.
entries map[K]*list.Element
}

// NewLimiter creates and returns a new Limiter instance.
// Parameters:
//
// expireInterval: How often to check for expired entries.
// throttleWindow: The time window for rate limiting.
// debouncingTime: The time-to-live for each rate bucket entry.
func NewLimiter[K comparable](expireNum int, expire, throttle, debouncing time.Duration) *Limiter[K] {
lim := &Limiter[K]{
closing: make(chan struct{}),
expireInterval: expire,
throttleWindow: throttle,
debouncingTime: debouncing,
expireBatchSize: expireNum,
evictionList: list.New(),
entries: make(map[K]*list.Element),
}

// Start the background expiration process.
lim.wg.Add(1)
go lim.expirationLoop()
return lim
}

// limiterEntry represents an entry in the Limiter for a specific key.
type limiterEntry[K comparable] struct {
key K
bucket Bucket
expireTime time.Time
debouncingCallback func()
}

// Allow checks if an event is allowed for the given key based on the rate bucket.
// If allowed, it clears any pending debouncing callback; otherwise, it stores the provided callback.
// It returns true if the event is allowed immediately.
func (l *Limiter[K]) Allow(key K, callback func()) bool {
l.mu.Lock()
defer l.mu.Unlock()

now := time.Now()
if elem, exists := l.entries[key]; exists {
entry := elem.Value.(*limiterEntry[K])
allowed := entry.bucket.Allow(now)
if allowed {
entry.debouncingCallback = nil
} else {
entry.debouncingCallback = callback
}
// Update recency and extend TTL.
l.evictionList.MoveToFront(elem)
entry.expireTime = now.Add(l.throttleWindow + l.debouncingTime)
return allowed
}

// Create a new rate bucket for a new key.
bucket := NewBucket(now, l.throttleWindow)
entry := &limiterEntry[K]{
key: key,
bucket: bucket,
expireTime: now.Add(l.throttleWindow + l.debouncingTime),
}
elem := l.evictionList.PushFront(entry)
l.entries[key] = elem
return true
}

// expirationLoop runs in a separate goroutine to periodically remove expired entries.
func (l *Limiter[K]) expirationLoop() {
ticker := time.NewTicker(l.expireInterval)
defer func() {
ticker.Stop()
l.wg.Done()
}()

for {
select {
case <-ticker.C:
expiredEntries := l.collectEntries(true)
l.runDebounce(expiredEntries)
case <-l.closing:
return
}
}
}

// collectEntries gathers expired entries and removes them from the limiter.
func (l *Limiter[K]) collectEntries(onlyExpired bool) []*limiterEntry[K] {
now := time.Now()
expiredEntries := make([]*limiterEntry[K], 0, l.expireBatchSize)

l.mu.Lock()
defer l.mu.Unlock()

for range l.expireBatchSize {
Comment thread
window9u marked this conversation as resolved.
elem := l.evictionList.Back()
if elem == nil {
break
}

entry := elem.Value.(*limiterEntry[K])
if onlyExpired && now.Before(entry.expireTime) {
break
}

if entry.debouncingCallback != nil {
expiredEntries = append(expiredEntries, entry)
}
l.evictionList.Remove(elem)
delete(l.entries, entry.key)
}

return expiredEntries
}

Comment on lines +153 to +156

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential duplicated Close definition starts here (lines 153-156).

The file later has lines 168–180, which also define func (l *Limiter[K]) Close(). This duplication likely breaks compilation. Merge them into a single method.

// runDebounce runs the debouncing callbacks for expired entries asynchronously.
func (l *Limiter[K]) runDebounce(entries []*limiterEntry[K]) {
l.wg.Add(1)
go func() {
defer l.wg.Done()
for _, entry := range entries {
entry.debouncingCallback()
}
}()
}

// Close terminates the expiration loop and cleans up resources.
func (l *Limiter[K]) Close() {
close(l.closing)

// Wait for all previous expiration job done.
l.wg.Wait()

for l.evictionList.Len() > 0 {
expiredEntries := l.collectEntries(false)
l.runDebounce(expiredEntries)
}

l.wg.Wait()
}
Loading