Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/job_test_agent_local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Test Agent Local
on:
workflow_call:



jobs:
test_agent_local:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
- name: Install
uses: ./.github/actions/install
with:
go: true


- name: Build
run: task build
working-directory: apps/agent

- name: Test
run: go test -cover -json -timeout=60m -failfast ./pkg/... ./services/... | tparse -all -progress
working-directory: apps/agent
4 changes: 3 additions & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ jobs:
name: Test API
uses: ./.github/workflows/job_test_api_local.yaml


test_agent_local:
name: Test Agent Local
uses: ./.github/workflows/job_test_agent_local.yaml
# test_agent_integration:
# name: Test Agent Integration
# runs-on: ubuntu-latest
Expand Down
1 change: 0 additions & 1 deletion apps/agent/pkg/circuitbreaker/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (cb *CB[Res]) preflight(ctx context.Context) error {
now := cb.config.clock.Now()

if now.After(cb.resetCountersAt) {
cb.logger.Info().Msg("resetting circuit breaker")
cb.requests = 0
cb.successes = 0
cb.failures = 0
Expand Down
28 changes: 6 additions & 22 deletions apps/agent/pkg/clock/real_clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,15 @@ package clock

import "time"

type TestClock struct {
now time.Time
type RealClock struct {
}

func NewTestClock(now ...time.Time) *TestClock {
if len(now) == 0 {
now = append(now, time.Now())
}
return &TestClock{now: now[0]}
func New() *RealClock {
return &RealClock{}
}

var _ Clock = &TestClock{}
var _ Clock = &RealClock{}

func (c *TestClock) Now() time.Time {
return c.now
}

// Tick advances the clock by the given duration and returns the new time.
func (c *TestClock) Tick(d time.Duration) time.Time {
c.now = c.now.Add(d)
return c.now
}

// Set sets the clock to the given time and returns the new time.
func (c *TestClock) Set(t time.Time) time.Time {
c.now = t
return c.now
func (c *RealClock) Now() time.Time {
return time.Now()
}
28 changes: 22 additions & 6 deletions apps/agent/pkg/clock/test_clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,31 @@ package clock

import "time"

type RealClock struct {
type TestClock struct {
now time.Time
}

func New() *RealClock {
return &RealClock{}
func NewTestClock(now ...time.Time) *TestClock {
if len(now) == 0 {
now = append(now, time.Now())
}
return &TestClock{now: now[0]}
}

var _ Clock = &RealClock{}
var _ Clock = &TestClock{}

func (c *RealClock) Now() time.Time {
return time.Now()
func (c *TestClock) Now() time.Time {
return c.now
}

// Tick advances the clock by the given duration and returns the new time.
func (c *TestClock) Tick(d time.Duration) time.Time {
c.now = c.now.Add(d)
return c.now
}

// Set sets the clock to the given time and returns the new time.
func (c *TestClock) Set(t time.Time) time.Time {
c.now = t
return c.now
}
19 changes: 11 additions & 8 deletions apps/agent/services/ratelimit/mitigate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func (s *service) Mitigate(ctx context.Context, req *ratelimitv1.MitigateRequest
bucket, _ := s.getBucket(bucketKey{req.Identifier, req.Limit, duration})
bucket.Lock()
defer bucket.Unlock()

bucket.windows[req.Window.GetSequence()] = req.Window

return &ratelimitv1.MitigateResponse{}, nil
Expand Down Expand Up @@ -51,16 +50,20 @@ func (s *service) broadcastMitigation(req mitigateWindowRequest) {
return
}
for _, peer := range peers {
_, err := peer.client.Mitigate(ctx, connect.NewRequest(&ratelimitv1.MitigateRequest{
Identifier: req.identifier,
Limit: req.limit,
Duration: req.duration.Milliseconds(),
Window: req.window,
}))
_, err := s.mitigateCircuitBreaker.Do(ctx, func(innerCtx context.Context) (*connect.Response[ratelimitv1.MitigateResponse], error) {
innerCtx, cancel := context.WithTimeout(innerCtx, 10*time.Second)
defer cancel()
return peer.client.Mitigate(innerCtx, connect.NewRequest(&ratelimitv1.MitigateRequest{
Identifier: req.identifier,
Limit: req.limit,
Duration: req.duration.Milliseconds(),
Window: req.window,
}))
})
if err != nil {
s.logger.Err(err).Msg("failed to call mitigate")
} else {
s.logger.Info().Str("peerId", peer.id).Msg("broadcasted mitigation")
s.logger.Debug().Str("peerId", peer.id).Msg("broadcasted mitigation")
}
}
}
8 changes: 5 additions & 3 deletions apps/agent/services/ratelimit/ratelimit_mitigation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func TestExceedingTheLimitShouldNotifyAllNodes(t *testing.T) {
t.Skip()

for _, clusterSize := range []int{1, 3, 5} {
t.Run(fmt.Sprintf("Cluster Size %d", clusterSize), func(t *testing.T) {
logger := logging.New(nil)
Expand Down Expand Up @@ -94,23 +94,25 @@ func TestExceedingTheLimitShouldNotifyAllNodes(t *testing.T) {
ctx := context.Background()

// Saturate the window
for i := int64(0); i <= limit; i++ {
for i := int64(0); i < limit; i++ {
rl := util.RandomElement(ratelimiters)
res, err := rl.Ratelimit(ctx, req)
require.NoError(t, err)
t.Logf("saturate res: %+v", res)
require.True(t, res.Success)

}

time.Sleep(time.Second * 5)

// Let's hit everry node again
// They should all be mitigated
for i, rl := range ratelimiters {

res, err := rl.Ratelimit(ctx, req)
require.NoError(t, err)
t.Logf("res from %d: %+v", i, res)
// require.False(t, res.Success)
require.False(t, res.Success)
}

})
Expand Down
6 changes: 2 additions & 4 deletions apps/agent/services/ratelimit/ratelimit_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
"github.com/unkeyed/unkey/apps/agent/pkg/util"
)

func TestReplication(t *testing.T) {
t.Skip()
func TestSync(t *testing.T) {
type Node struct {
srv *service
cluster cluster.Cluster
Expand Down Expand Up @@ -106,7 +105,7 @@ func TestReplication(t *testing.T) {
}

// Figure out who is the origin
_, err := nodes[1].srv.Ratelimit(ctx, req)
_, err := nodes[0].srv.Ratelimit(ctx, req)
require.NoError(t, err)

time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -138,7 +137,6 @@ func TestReplication(t *testing.T) {
require.True(t, ok)
bucket.RLock()
window := bucket.getCurrentWindow(now)
t.Logf("window on origin: %+v", window)
counter := window.Counter
bucket.RUnlock()

Expand Down
Loading