Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/dev/seed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
deps = [
"//internal/services/keys",
"//pkg/array",
"//pkg/batch",
"//pkg/cli",
"//pkg/clickhouse",
"//pkg/clickhouse/schema",
Expand Down
16 changes: 8 additions & 8 deletions cmd/dev/seed/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func seedLocal(ctx context.Context, cmd *cli.Command) error {
}

keyService, err := keys.New(keys.Config{
DB: database,
RateLimiter: nil,
RBAC: nil,
Clickhouse: nil,
Region: "local",
UsageLimiter: nil,
KeyCache: nil,
QuotaCache: nil,
DB: database,
RateLimiter: nil,
RBAC: nil,
KeyVerifications: nil,
Region: "local",
UsageLimiter: nil,
KeyCache: nil,
QuotaCache: nil,
})
if err != nil {
return fmt.Errorf("failed to create key service: %w", err)
Expand Down
35 changes: 22 additions & 13 deletions cmd/dev/seed/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand/v2"
"time"

"github.com/unkeyed/unkey/pkg/batch"
"github.com/unkeyed/unkey/pkg/cli"
"github.com/unkeyed/unkey/pkg/clickhouse"
"github.com/unkeyed/unkey/pkg/clickhouse/schema"
Expand Down Expand Up @@ -46,6 +47,16 @@ func seedSentinel(ctx context.Context, cmd *cli.Command) error {
return fmt.Errorf("failed to connect to ClickHouse: %w", err)
}

sentinelRequests := clickhouse.NewBuffer[schema.SentinelRequest](ch, "default.sentinel_requests_raw_v1", clickhouse.BufferConfig{
Name: "seed-sentinel-requests",
BatchSize: 50_000,
BufferSize: 50_000,
FlushInterval: 5 * time.Second,
Consumers: 2,
Drop: true,
OnFlushError: nil,
})

// Get or find deployment ID
deploymentID := cmd.String("deployment-id")
if deploymentID == "" {
Expand All @@ -58,20 +69,20 @@ func seedSentinel(ctx context.Context, cmd *cli.Command) error {

// Create seeder and run
seeder := &SentinelSeeder{
deploymentID: deploymentID,
numRequests: cmd.RequireInt("num-requests"),
db: database,
clickhouse: ch,
deploymentID: deploymentID,
numRequests: cmd.RequireInt("num-requests"),
db: database,
sentinelRequests: sentinelRequests,
}

return seeder.Seed(ctx)
}

type SentinelSeeder struct {
deploymentID string
numRequests int
db db.Database
clickhouse clickhouse.ClickHouse
deploymentID string
numRequests int
db db.Database
sentinelRequests *batch.BatchProcessor[schema.SentinelRequest]
}

func (s *SentinelSeeder) Seed(ctx context.Context) error {
Expand Down Expand Up @@ -227,7 +238,7 @@ func (s *SentinelSeeder) generateRequests(
totalLatency := instanceLatency + sentinelLatency

// Buffer it IMMEDIATELY
s.clickhouse.BufferSentinelRequest(schema.SentinelRequest{
s.sentinelRequests.Buffer(schema.SentinelRequest{
RequestID: uid.New("req"),
Time: timestamp.UnixMilli(),
WorkspaceID: deployment.WorkspaceID,
Expand Down Expand Up @@ -263,10 +274,8 @@ func (s *SentinelSeeder) generateRequests(

log.Printf(" Buffered all %d requests, waiting for flush...", s.numRequests)

// Flush by closing (like verifications.go line 496)
if err := s.clickhouse.Close(); err != nil {
return fmt.Errorf("failed to close clickhouse: %w", err)
}
// Flush by closing the buffer
s.sentinelRequests.Close()

log.Printf(" All requests sent to ClickHouse")
return nil
Expand Down
40 changes: 24 additions & 16 deletions cmd/dev/seed/verifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/unkeyed/unkey/internal/services/keys"
"github.com/unkeyed/unkey/pkg/array"
"github.com/unkeyed/unkey/pkg/batch"
"github.com/unkeyed/unkey/pkg/cli"
"github.com/unkeyed/unkey/pkg/clickhouse"
"github.com/unkeyed/unkey/pkg/clickhouse/schema"
Expand Down Expand Up @@ -56,16 +57,26 @@ func seedVerifications(ctx context.Context, cmd *cli.Command) error {
return fmt.Errorf("failed to connect to ClickHouse: %w", err)
}

keyVerifications := clickhouse.NewBuffer[schema.KeyVerification](ch, "default.key_verifications_raw_v2", clickhouse.BufferConfig{
Name: "seed-key-verifications",
BatchSize: 50_000,
BufferSize: 50_000,
FlushInterval: 5 * time.Second,
Consumers: 2,
Drop: true,
OnFlushError: nil,
})
Comment thread
Flo4604 marked this conversation as resolved.

// Create key service for proper key generation
keyService, err := keys.New(keys.Config{
DB: database,
RateLimiter: nil,
RBAC: nil,
Clickhouse: ch,
Region: "test",
UsageLimiter: nil,
KeyCache: nil,
QuotaCache: nil,
DB: database,
RateLimiter: nil,
RBAC: nil,
KeyVerifications: keyVerifications,
Region: "test",
UsageLimiter: nil,
KeyCache: nil,
QuotaCache: nil,
})
if err != nil {
return fmt.Errorf("failed to create key service: %w", err)
Expand Down Expand Up @@ -93,7 +104,7 @@ func seedVerifications(ctx context.Context, cmd *cli.Command) error {
daysBack: cmd.Int("days-back"),
daysForward: cmd.Int("days-forward"),
db: database,
clickhouse: ch,
keyVerifications: keyVerifications,
keyService: keyService,
}

Expand All @@ -110,7 +121,7 @@ type Seeder struct {
daysBack int
daysForward int
db db.Database
clickhouse clickhouse.ClickHouse
keyVerifications *batch.BatchProcessor[schema.KeyVerification]
keyService keys.KeyService
}

Expand Down Expand Up @@ -464,8 +475,8 @@ func (s *Seeder) generateVerifications(_ context.Context, workspaceID string, ke
credit = 0
}

// Use BufferKeyVerification to let the clickhouse client batch automatically
s.clickhouse.BufferKeyVerification(schema.KeyVerification{
// Use batch processor to buffer key verifications
s.keyVerifications.Buffer(schema.KeyVerification{
RequestID: uid.New("req"),
Time: timestamp.UnixMilli(),
WorkspaceID: workspaceID,
Expand All @@ -488,10 +499,7 @@ func (s *Seeder) generateVerifications(_ context.Context, workspaceID string, ke

log.Printf(" Buffered all %d verifications, waiting for flush...", s.numVerifications)

err := s.clickhouse.Close()
if err != nil {
return fmt.Errorf("failed to close clickhouse: %w", err)
}
s.keyVerifications.Close()

log.Printf(" All verifications sent to ClickHouse")
return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/services/keys/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ go_library(
"//internal/services/usagelimiter",
"//pkg/assert",
"//pkg/base58",
"//pkg/batch",
"//pkg/cache",
"//pkg/clickhouse",
"//pkg/clickhouse/schema",
"//pkg/codes",
"//pkg/db",
Expand Down
4 changes: 2 additions & 2 deletions internal/services/keys/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *service) Get(ctx context.Context, sess *zen.Session, sha256Hash string)
session: sess,
rBAC: s.rbac,
region: s.region,
clickhouse: s.clickhouse,
keyVerifications: s.keyVerifications,
rateLimiter: s.rateLimiter,
usageLimiter: s.usageLimiter,
AuthorizedWorkspaceID: key.WorkspaceID,
Expand Down Expand Up @@ -228,7 +228,7 @@ func (s *service) Get(ctx context.Context, sess *zen.Session, sha256Hash string)
kv := &KeyVerifier{
tags: []string{},
Key: key.FindKeyForVerificationRow,
clickhouse: s.clickhouse,
keyVerifications: s.keyVerifications,
rateLimiter: s.rateLimiter,
usageLimiter: s.usageLimiter,
AuthorizedWorkspaceID: key.WorkspaceID,
Expand Down
50 changes: 29 additions & 21 deletions internal/services/keys/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,35 @@ package keys
import (
"github.com/unkeyed/unkey/internal/services/ratelimit"
"github.com/unkeyed/unkey/internal/services/usagelimiter"
"github.com/unkeyed/unkey/pkg/batch"
"github.com/unkeyed/unkey/pkg/cache"
"github.com/unkeyed/unkey/pkg/clickhouse"
"github.com/unkeyed/unkey/pkg/clickhouse/schema"
"github.com/unkeyed/unkey/pkg/db"
"github.com/unkeyed/unkey/pkg/rbac"
)

// Config holds the configuration for creating a new keys service instance.
type Config struct {
DB db.Database // Database connection
RateLimiter ratelimit.Service // Rate limiting service
RBAC *rbac.RBAC // Role-based access control
Clickhouse clickhouse.ClickHouse // Clickhouse for telemetry
Region string // Geographic region identifier
UsageLimiter usagelimiter.Service // Redis Counter for usage limiting
DB db.Database // Database connection
RateLimiter ratelimit.Service // Rate limiting service
RBAC *rbac.RBAC // Role-based access control
Region string // Geographic region identifier
UsageLimiter usagelimiter.Service // Redis Counter for usage limiting

// KeyVerifications buffers key verification events for ClickHouse.
KeyVerifications *batch.BatchProcessor[schema.KeyVerification]

KeyCache cache.Cache[string, db.CachedKeyData] // Cache for key lookups with pre-parsed data
QuotaCache cache.Cache[string, db.Quotas] // Cache for workspace quota lookups
}

type service struct {
db db.Database
rateLimiter ratelimit.Service
usageLimiter usagelimiter.Service
rbac *rbac.RBAC
clickhouse clickhouse.ClickHouse
region string
db db.Database
rateLimiter ratelimit.Service
usageLimiter usagelimiter.Service
rbac *rbac.RBAC
keyVerifications *batch.BatchProcessor[schema.KeyVerification]
region string

// hash -> cached key data (includes pre-parsed IP whitelist)
keyCache cache.Cache[string, db.CachedKeyData]
Expand All @@ -39,15 +42,20 @@ type service struct {

// New creates a new keys service instance with the provided configuration.
func New(config Config) (*service, error) {
kv := config.KeyVerifications
if kv == nil {
kv = batch.NewNoop[schema.KeyVerification]()
}

return &service{
db: config.DB,
rbac: config.RBAC,
rateLimiter: config.RateLimiter,
usageLimiter: config.UsageLimiter,
clickhouse: config.Clickhouse,
region: config.Region,
keyCache: config.KeyCache,
quotaCache: config.QuotaCache,
db: config.DB,
rbac: config.RBAC,
rateLimiter: config.RateLimiter,
usageLimiter: config.UsageLimiter,
keyVerifications: kv,
region: config.Region,
keyCache: config.KeyCache,
quotaCache: config.QuotaCache,
}, nil
Comment thread
Flo4604 marked this conversation as resolved.
}

Expand Down
12 changes: 6 additions & 6 deletions internal/services/keys/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/unkeyed/unkey/internal/services/keys/metrics"
"github.com/unkeyed/unkey/internal/services/ratelimit"
"github.com/unkeyed/unkey/internal/services/usagelimiter"
"github.com/unkeyed/unkey/pkg/clickhouse"
"github.com/unkeyed/unkey/pkg/batch"
"github.com/unkeyed/unkey/pkg/clickhouse/schema"
"github.com/unkeyed/unkey/pkg/db"
"github.com/unkeyed/unkey/pkg/rbac"
Expand Down Expand Up @@ -52,10 +52,10 @@ type KeyVerifier struct {
spentCredits int64 // Credits spent during verification

// Services
rateLimiter ratelimit.Service // Rate limiting service
usageLimiter usagelimiter.Service // Usage limiting service
rBAC *rbac.RBAC // Role-based access control service
clickhouse clickhouse.ClickHouse // Clickhouse for telemetry
rateLimiter ratelimit.Service // Rate limiting service
usageLimiter usagelimiter.Service // Usage limiting service
rBAC *rbac.RBAC // Role-based access control service
keyVerifications *batch.BatchProcessor[schema.KeyVerification] // Buffer for key verification telemetry
}

// GetRatelimitConfigs returns the rate limit configurations
Expand Down Expand Up @@ -126,7 +126,7 @@ func (k *KeyVerifier) Verify(ctx context.Context, opts ...VerifyOption) error {
func (k *KeyVerifier) log() {
latency := time.Since(k.startTime).Milliseconds()

k.clickhouse.BufferKeyVerification(schema.KeyVerification{
k.keyVerifications.Buffer(schema.KeyVerification{
RequestID: k.session.RequestID(),
WorkspaceID: k.Key.WorkspaceID,
Time: time.Now().UnixMilli(),
Expand Down
7 changes: 5 additions & 2 deletions internal/services/ratelimit/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ import (
// go svc.replayRequests()
// }
func (s *service) replayRequests() {
for req := range s.replayBuffer.Consume() {
err := s.syncWithOrigin(context.Background(), req)
for ptr := range s.replayBuffer.Consume() {
if ptr == nil {
continue
}
err := s.syncWithOrigin(context.Background(), *ptr)
if err != nil {
logger.Error("failed to replay request", "error", err.Error())
}
Expand Down
15 changes: 11 additions & 4 deletions internal/services/usagelimiter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,11 @@ func (s *counterService) initializeFromDatabase(ctx context.Context, req UsageRe

// replayRequests processes buffered credit changes and updates the database
func (s *counterService) replayRequests() {
for change := range s.replayBuffer.Consume() {
err := s.syncWithDB(context.Background(), change)
for ptr := range s.replayBuffer.Consume() {
Comment thread
Flo4604 marked this conversation as resolved.
if ptr == nil {
continue
}
err := s.syncWithDB(context.Background(), *ptr)
if err != nil {
logger.Error("failed to replay credit change", "error", err)
}
Expand Down Expand Up @@ -356,8 +359,12 @@ func (s *counterService) Close() error {

// Process remaining items directly
select {
case change := <-s.replayBuffer.Consume():
err := s.syncWithDB(context.Background(), change)
case ptr, ok := <-s.replayBuffer.Consume():
if !ok || ptr == nil {
logger.Debug("replay buffer channel closed, done draining")
return nil
}
err := s.syncWithDB(context.Background(), *ptr)
if err != nil {
logger.Error("failed to sync credit change during shutdown", "error", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/batch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "batch",
srcs = [
"doc.go",
"noop.go",
"process.go",
],
importpath = "github.com/unkeyed/unkey/pkg/batch",
Expand Down
Loading
Loading