Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions go/apps/api/routes/v2_ratelimit_limit/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
namespace, hit, err := h.RatelimitNamespaceCache.SWR(ctx,
cache.ScopedKey{WorkspaceID: auth.AuthorizedWorkspaceID, Key: namespaceKey},
func(ctx context.Context) (db.FindRatelimitNamespace, error) {
response, err := db.Query.FindRatelimitNamespace(ctx, h.DB.RO(), db.FindRatelimitNamespaceParams{
WorkspaceID: auth.AuthorizedWorkspaceID,
Namespace: namespaceKey,
})
result := db.FindRatelimitNamespace{} // nolint:exhaustruct

response, err := db.WithRetry(func() (db.FindRatelimitNamespaceRow, error) {
return db.Query.FindRatelimitNamespace(ctx, h.DB.RO(), db.FindRatelimitNamespaceParams{
WorkspaceID: auth.AuthorizedWorkspaceID,
Namespace: namespaceKey,
})
})
if err != nil {
return result, err
}
Expand Down Expand Up @@ -186,6 +189,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
Cost: cost,
Time: time.Time{},
}

if h.TestMode {
header := s.Request().Header.Get("X-Test-Time")
if header != "" {
Expand Down Expand Up @@ -216,6 +220,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
Passed: result.Success,
})
}

res := Response{
Meta: openapi.Meta{
RequestId: s.RequestID(),
Expand Down
6 changes: 2 additions & 4 deletions go/internal/services/caches/op.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package caches

import (
"database/sql"
"errors"

"github.com/unkeyed/unkey/go/pkg/cache"
"github.com/unkeyed/unkey/go/pkg/db"
)

// DefaultFindFirstOp returns the appropriate cache operation based on the sql error
Expand All @@ -14,7 +12,7 @@ func DefaultFindFirstOp(err error) cache.Op {
return cache.WriteValue
}

if errors.Is(err, sql.ErrNoRows) {
if db.IsNotFound(err) {
// the response is empty, we need to store that the row does not exist
return cache.WriteNull
}
Expand Down
6 changes: 5 additions & 1 deletion go/internal/services/keys/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ func (s *service) Get(ctx context.Context, sess *zen.Session, rawKey string) (*K

h := hash.Sha256(rawKey)
key, hit, err := s.keyCache.SWR(ctx, h, func(ctx context.Context) (db.FindKeyForVerificationRow, error) {
return db.Query.FindKeyForVerification(ctx, s.db.RO(), h)
// Use database retry with exponential backoff, skipping non-transient errors
return db.WithRetry(func() (db.FindKeyForVerificationRow, error) {
return db.Query.FindKeyForVerification(ctx, s.db.RO(), h)
})
}, caches.DefaultFindFirstOp)

if err != nil {
if db.IsNotFound(err) {
// nolint:exhaustruct
Expand Down
4 changes: 3 additions & 1 deletion go/internal/services/usagelimiter/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ func (s *service) Limit(ctx context.Context, req UsageRequest) (UsageResponse, e
ctx, span := tracing.Start(ctx, "usagelimiter.Limit")
defer span.End()

limit, err := db.Query.FindKeyCredits(ctx, s.db.RW(), req.KeyId)
limit, err := db.WithRetry(func() (sql.NullInt32, error) {
return db.Query.FindKeyCredits(ctx, s.db.RO(), req.KeyId)
})
if err != nil {
if db.IsNotFound(err) {
return UsageResponse{Valid: false, Remaining: 0}, nil
Expand Down
5 changes: 4 additions & 1 deletion go/internal/services/usagelimiter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ func (s *counterService) initializeFromDatabase(ctx context.Context, req UsageRe
ctx, span := tracing.Start(ctx, "usagelimiter.counter.initializeFromDatabase")
defer span.End()

limit, err := db.Query.FindKeyCredits(ctx, s.db.RO(), req.KeyId)
limit, err := db.WithRetry(func() (sql.NullInt32, error) {
return db.Query.FindKeyCredits(ctx, s.db.RO(), req.KeyId)
})

if err != nil {
if db.IsNotFound(err) {
return UsageResponse{Valid: false, Remaining: 0}, nil
Expand Down
67 changes: 67 additions & 0 deletions go/pkg/db/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package db

import (
"time"

"github.com/unkeyed/unkey/go/pkg/retry"
)

const (
// DefaultBackoff is the base duration for exponential backoff in database retries
DefaultBackoff = 50 * time.Millisecond
)

// WithRetry executes a database operation with optimized retry configuration.
// It retries transient errors with exponential backoff but skips non-retryable errors
// like "not found" or "duplicate key" to avoid unnecessary delays.
//
// Configuration:
// - 3 attempts maximum
// - Exponential backoff: 50ms, 100ms, 200ms
// - Skips retries for "not found" and "duplicate key" errors
//
// Usage:
//
// result, err := db.WithRetry(func() (SomeType, error) {
// return db.Query.SomeOperation(ctx, db.RO(), params)
// })
func WithRetry[T any](fn func() (T, error)) (T, error) {
retrier := retry.New(
retry.Attempts(3),
retry.Backoff(func(n int) time.Duration {
// Predefined backoff delays: 50ms, 100ms, 200ms
delays := []time.Duration{
DefaultBackoff, // 50ms for attempt 1
DefaultBackoff * 2, // 100ms for attempt 2
DefaultBackoff * 4, // 200ms for attempt 3
}
if n <= 0 || n > len(delays) {
return DefaultBackoff // fallback to base delay
}
return delays[n-1]
}),
retry.ShouldRetry(func(err error) bool {
// Don't retry if resource is not found - this is a valid response
if IsNotFound(err) {
return false
}

// Don't retry duplicate key errors - these won't succeed on retry
if IsDuplicateKeyError(err) {
return false
}

// Retry all other errors (network issues, timeouts, deadlocks, etc.)
return true
}),
)

var result T
err := retrier.Do(func() error {
var retryErr error
result, retryErr = fn()
return retryErr
})

return result, err
}
Loading