Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
chronark committed Feb 11, 2025
2 parents d8b3d8e + 2747350 commit 776bdbc
Show file tree
Hide file tree
Showing 65 changed files with 1,222 additions and 2,100 deletions.
9 changes: 5 additions & 4 deletions Taskfile.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
version: '3'
version: "3"

tasks:
pull:
cmds:
- docker compose -f ./deployment/docker-compose.yaml pull
build:
deps: [pull]
cmds:
- docker compose -f ./deployment/docker-compose.yaml build

Expand All @@ -13,7 +17,6 @@ tasks:
cmds:
- docker compose -f ./deployment/docker-compose.yaml up -d


migrate:
cmds:
- task: migrate-db
Expand All @@ -34,7 +37,6 @@ tasks:
cmds:
- goose down-to 0


migrate-db:
env:
DRIZZLE_DATABASE_URL: "mysql://unkey:password@localhost:3306/unkey"
Expand All @@ -49,7 +51,6 @@ tasks:
- task: seed
- pnpm test:integration


generate-sql:
dir: internal/db
cmds:
Expand Down
8 changes: 4 additions & 4 deletions apps/agent/services/ratelimit/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ func (b bucketKey) toString() string {
// getBucket returns a bucket for the given key and will create one if it does not exist.
// It returns the bucket and a boolean indicating if the bucket existed before.
func (s *service) getBucket(key bucketKey) (*bucket, bool) {
s.bucketsLock.RLock()
s.bucketsMu.RLock()
b, ok := s.buckets[key.toString()]
s.bucketsLock.RUnlock()
s.bucketsMu.RUnlock()
if !ok {
b = &bucket{
limit: key.limit,
duration: key.duration,
windows: make(map[int64]*ratelimitv1.Window),
}
s.bucketsLock.Lock()
s.bucketsMu.Lock()
s.buckets[key.toString()] = b
s.bucketsLock.Unlock()
s.bucketsMu.Unlock()
}
return b, ok
}
Expand Down
2 changes: 1 addition & 1 deletion apps/agent/services/ratelimit/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type peer struct {
}

// getAllPeers returns clients for all nodes in the cluster except ourselves
func (s *service) getAllPeers(ctx context.Context) ([]peer, error) {
func (s *service) getAllPeers(context.Context) ([]peer, error) {
peers := []peer{}
for _, p := range s.cluster.Peers() {
if p.Id == s.cluster.NodeId() {
Expand Down
8 changes: 4 additions & 4 deletions apps/agent/services/ratelimit/ratelimit_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func TestSync(t *testing.T) {

originIndex := 0
for i, n := range nodes {
n.srv.bucketsLock.RLock()
n.srv.bucketsMu.RLock()
buckets := len(n.srv.buckets)
n.srv.bucketsLock.RUnlock()
n.srv.bucketsMu.RUnlock()
t.Logf("node %d: found %d buckets", i, buckets)
if buckets > 0 {
originIndex = i
Expand All @@ -131,9 +131,9 @@ func TestSync(t *testing.T) {
}

require.Eventually(t, func() bool {
nodes[originIndex].srv.bucketsLock.RLock()
nodes[originIndex].srv.bucketsMu.RLock()
bucket, ok := nodes[originIndex].srv.getBucket(key)
nodes[originIndex].srv.bucketsLock.RUnlock()
nodes[originIndex].srv.bucketsMu.RUnlock()
require.True(t, ok)
bucket.RLock()
window := bucket.getCurrentWindow(now)
Expand Down
4 changes: 2 additions & 2 deletions apps/agent/services/ratelimit/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type service struct {

shutdownCh chan struct{}

bucketsLock sync.RWMutex
bucketsMu sync.RWMutex
// identifier+sequence -> bucket
buckets map[string]*bucket
leaseIdToKeyMapLock sync.RWMutex
Expand Down Expand Up @@ -61,7 +61,7 @@ func New(cfg Config) (*service, error) {
syncBuffer: nil,
mitigateBuffer: nil,
shutdownCh: make(chan struct{}),
bucketsLock: sync.RWMutex{},
bucketsMu: sync.RWMutex{},
buckets: make(map[string]*bucket),
leaseIdToKeyMapLock: sync.RWMutex{},
leaseIdToKeyMap: make(map[string]string),
Expand Down
8 changes: 4 additions & 4 deletions apps/agent/services/ratelimit/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ type commitLeaseRequest struct {
// removeExpiredIdentifiers removes buckets that are no longer relevant
// for ratelimit decisions
func (r *service) removeExpiredIdentifiers() {
r.bucketsLock.Lock()
defer r.bucketsLock.Unlock()
r.bucketsMu.Lock()
defer r.bucketsMu.Unlock()

activeRatelimits.Set(float64(len(r.buckets)))
now := time.Now()
Expand Down Expand Up @@ -260,8 +260,8 @@ func (r *service) SetCounter(ctx context.Context, requests ...setCounterRequest)
// return nil
// }

// r.bucketsLock.Lock()
// defer r.bucketsLock.Unlock()
// r.bucketsMu.Lock()
// defer r.bucketsMu.Unlock()
// window, ok := r.buckets[key]
// if !ok {
// r.logger.Warn().Str("key", key).Msg("key not found")
Expand Down
4 changes: 2 additions & 2 deletions apps/agent/services/ratelimit/sliding_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func TestTakeCreatesWindows(t *testing.T) {
require.Equal(t, int64(0), res.previousWindow.Counter)
require.Equal(t, int64(1), res.currentWindow.Counter)

rl.bucketsLock.RLock()
rl.bucketsMu.RLock()
bucket, ok := rl.buckets[bucketKey{identifier, limit, duration}.toString()]
rl.bucketsLock.RUnlock()
rl.bucketsMu.RUnlock()
require.True(t, ok)

bucket.Lock()
Expand Down
Loading

0 comments on commit 776bdbc

Please sign in to comment.