diff --git a/cmd/dev/seed/BUILD.bazel b/cmd/dev/seed/BUILD.bazel index 9b7f8ae92e..e3dccfffea 100644 --- a/cmd/dev/seed/BUILD.bazel +++ b/cmd/dev/seed/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//internal/services/keys", "//pkg/array", + "//pkg/batch", "//pkg/cli", "//pkg/clickhouse", "//pkg/clickhouse/schema", diff --git a/cmd/dev/seed/local.go b/cmd/dev/seed/local.go index 193ad7ac93..c6266c3d19 100644 --- a/cmd/dev/seed/local.go +++ b/cmd/dev/seed/local.go @@ -42,14 +42,14 @@ func seedLocal(ctx context.Context, cmd *cli.Command) error { } keyService, err := keys.New(keys.Config{ - DB: database, - RateLimiter: nil, - RBAC: nil, - Clickhouse: nil, - Region: "local", - UsageLimiter: nil, - KeyCache: nil, - QuotaCache: nil, + DB: database, + RateLimiter: nil, + RBAC: nil, + KeyVerifications: nil, + Region: "local", + UsageLimiter: nil, + KeyCache: nil, + QuotaCache: nil, }) if err != nil { return fmt.Errorf("failed to create key service: %w", err) diff --git a/cmd/dev/seed/sentinel.go b/cmd/dev/seed/sentinel.go index 840755f087..01d6aaff73 100644 --- a/cmd/dev/seed/sentinel.go +++ b/cmd/dev/seed/sentinel.go @@ -8,6 +8,7 @@ import ( "math/rand/v2" "time" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cli" "github.com/unkeyed/unkey/pkg/clickhouse" "github.com/unkeyed/unkey/pkg/clickhouse/schema" @@ -46,6 +47,16 @@ func seedSentinel(ctx context.Context, cmd *cli.Command) error { return fmt.Errorf("failed to connect to ClickHouse: %w", err) } + sentinelRequests := clickhouse.NewBuffer[schema.SentinelRequest](ch, "default.sentinel_requests_raw_v1", clickhouse.BufferConfig{ + Name: "seed-sentinel-requests", + BatchSize: 50_000, + BufferSize: 50_000, + FlushInterval: 5 * time.Second, + Consumers: 2, + Drop: true, + OnFlushError: nil, + }) + // Get or find deployment ID deploymentID := cmd.String("deployment-id") if deploymentID == "" { @@ -58,20 +69,20 @@ func seedSentinel(ctx context.Context, cmd *cli.Command) error { // Create seeder and run seeder := &SentinelSeeder{ - deploymentID: deploymentID, - numRequests: cmd.RequireInt("num-requests"), - db: database, - clickhouse: ch, + deploymentID: deploymentID, + numRequests: cmd.RequireInt("num-requests"), + db: database, + sentinelRequests: sentinelRequests, } return seeder.Seed(ctx) } type SentinelSeeder struct { - deploymentID string - numRequests int - db db.Database - clickhouse clickhouse.ClickHouse + deploymentID string + numRequests int + db db.Database + sentinelRequests *batch.BatchProcessor[schema.SentinelRequest] } func (s *SentinelSeeder) Seed(ctx context.Context) error { @@ -227,7 +238,7 @@ func (s *SentinelSeeder) generateRequests( totalLatency := instanceLatency + sentinelLatency // Buffer it IMMEDIATELY - s.clickhouse.BufferSentinelRequest(schema.SentinelRequest{ + s.sentinelRequests.Buffer(schema.SentinelRequest{ RequestID: uid.New("req"), Time: timestamp.UnixMilli(), WorkspaceID: deployment.WorkspaceID, @@ -263,10 +274,8 @@ func (s *SentinelSeeder) generateRequests( log.Printf(" Buffered all %d requests, waiting for flush...", s.numRequests) - // Flush by closing (like verifications.go line 496) - if err := s.clickhouse.Close(); err != nil { - return fmt.Errorf("failed to close clickhouse: %w", err) - } + // Flush by closing the buffer + s.sentinelRequests.Close() log.Printf(" All requests sent to ClickHouse") return nil diff --git a/cmd/dev/seed/verifications.go b/cmd/dev/seed/verifications.go index 0a3d4f7930..6a888e1c5e 100644 --- a/cmd/dev/seed/verifications.go +++ b/cmd/dev/seed/verifications.go @@ -12,6 +12,7 @@ import ( "github.com/unkeyed/unkey/internal/services/keys" "github.com/unkeyed/unkey/pkg/array" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cli" "github.com/unkeyed/unkey/pkg/clickhouse" "github.com/unkeyed/unkey/pkg/clickhouse/schema" @@ -56,16 +57,26 @@ func seedVerifications(ctx context.Context, cmd *cli.Command) error { return fmt.Errorf("failed to connect to ClickHouse: %w", err) } + keyVerifications := clickhouse.NewBuffer[schema.KeyVerification](ch, "default.key_verifications_raw_v2", clickhouse.BufferConfig{ + Name: "seed-key-verifications", + BatchSize: 50_000, + BufferSize: 50_000, + FlushInterval: 5 * time.Second, + Consumers: 2, + Drop: true, + OnFlushError: nil, + }) + // Create key service for proper key generation keyService, err := keys.New(keys.Config{ - DB: database, - RateLimiter: nil, - RBAC: nil, - Clickhouse: ch, - Region: "test", - UsageLimiter: nil, - KeyCache: nil, - QuotaCache: nil, + DB: database, + RateLimiter: nil, + RBAC: nil, + KeyVerifications: keyVerifications, + Region: "test", + UsageLimiter: nil, + KeyCache: nil, + QuotaCache: nil, }) if err != nil { return fmt.Errorf("failed to create key service: %w", err) @@ -93,7 +104,7 @@ func seedVerifications(ctx context.Context, cmd *cli.Command) error { daysBack: cmd.Int("days-back"), daysForward: cmd.Int("days-forward"), db: database, - clickhouse: ch, + keyVerifications: keyVerifications, keyService: keyService, } @@ -110,7 +121,7 @@ type Seeder struct { daysBack int daysForward int db db.Database - clickhouse clickhouse.ClickHouse + keyVerifications *batch.BatchProcessor[schema.KeyVerification] keyService keys.KeyService } @@ -464,8 +475,8 @@ func (s *Seeder) generateVerifications(_ context.Context, workspaceID string, ke credit = 0 } - // Use BufferKeyVerification to let the clickhouse client batch automatically - s.clickhouse.BufferKeyVerification(schema.KeyVerification{ + // Use batch processor to buffer key verifications + s.keyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New("req"), Time: timestamp.UnixMilli(), WorkspaceID: workspaceID, @@ -488,10 +499,7 @@ func (s *Seeder) generateVerifications(_ context.Context, workspaceID string, ke log.Printf(" Buffered all %d verifications, waiting for flush...", s.numVerifications) - err := s.clickhouse.Close() - if err != nil { - return fmt.Errorf("failed to close clickhouse: %w", err) - } + s.keyVerifications.Close() log.Printf(" All verifications sent to ClickHouse") return nil diff --git a/internal/services/keys/BUILD.bazel b/internal/services/keys/BUILD.bazel index ccd8665b97..a759d2d081 100644 --- a/internal/services/keys/BUILD.bazel +++ b/internal/services/keys/BUILD.bazel @@ -24,8 +24,8 @@ go_library( "//internal/services/usagelimiter", "//pkg/assert", "//pkg/base58", + "//pkg/batch", "//pkg/cache", - "//pkg/clickhouse", "//pkg/clickhouse/schema", "//pkg/codes", "//pkg/db", diff --git a/internal/services/keys/get.go b/internal/services/keys/get.go index 3334ef1476..5a78e09780 100644 --- a/internal/services/keys/get.go +++ b/internal/services/keys/get.go @@ -160,7 +160,7 @@ func (s *service) Get(ctx context.Context, sess *zen.Session, sha256Hash string) session: sess, rBAC: s.rbac, region: s.region, - clickhouse: s.clickhouse, + keyVerifications: s.keyVerifications, rateLimiter: s.rateLimiter, usageLimiter: s.usageLimiter, AuthorizedWorkspaceID: key.WorkspaceID, @@ -228,7 +228,7 @@ func (s *service) Get(ctx context.Context, sess *zen.Session, sha256Hash string) kv := &KeyVerifier{ tags: []string{}, Key: key.FindKeyForVerificationRow, - clickhouse: s.clickhouse, + keyVerifications: s.keyVerifications, rateLimiter: s.rateLimiter, usageLimiter: s.usageLimiter, AuthorizedWorkspaceID: key.WorkspaceID, diff --git a/internal/services/keys/service.go b/internal/services/keys/service.go index e382845d05..5ea9690cad 100644 --- a/internal/services/keys/service.go +++ b/internal/services/keys/service.go @@ -3,32 +3,35 @@ package keys import ( "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/usagelimiter" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cache" - "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/db" "github.com/unkeyed/unkey/pkg/rbac" ) // Config holds the configuration for creating a new keys service instance. type Config struct { - DB db.Database // Database connection - RateLimiter ratelimit.Service // Rate limiting service - RBAC *rbac.RBAC // Role-based access control - Clickhouse clickhouse.ClickHouse // Clickhouse for telemetry - Region string // Geographic region identifier - UsageLimiter usagelimiter.Service // Redis Counter for usage limiting + DB db.Database // Database connection + RateLimiter ratelimit.Service // Rate limiting service + RBAC *rbac.RBAC // Role-based access control + Region string // Geographic region identifier + UsageLimiter usagelimiter.Service // Redis Counter for usage limiting + + // KeyVerifications buffers key verification events for ClickHouse. + KeyVerifications *batch.BatchProcessor[schema.KeyVerification] KeyCache cache.Cache[string, db.CachedKeyData] // Cache for key lookups with pre-parsed data QuotaCache cache.Cache[string, db.Quotas] // Cache for workspace quota lookups } type service struct { - db db.Database - rateLimiter ratelimit.Service - usageLimiter usagelimiter.Service - rbac *rbac.RBAC - clickhouse clickhouse.ClickHouse - region string + db db.Database + rateLimiter ratelimit.Service + usageLimiter usagelimiter.Service + rbac *rbac.RBAC + keyVerifications *batch.BatchProcessor[schema.KeyVerification] + region string // hash -> cached key data (includes pre-parsed IP whitelist) keyCache cache.Cache[string, db.CachedKeyData] @@ -39,15 +42,20 @@ type service struct { // New creates a new keys service instance with the provided configuration. func New(config Config) (*service, error) { + kv := config.KeyVerifications + if kv == nil { + kv = batch.NewNoop[schema.KeyVerification]() + } + return &service{ - db: config.DB, - rbac: config.RBAC, - rateLimiter: config.RateLimiter, - usageLimiter: config.UsageLimiter, - clickhouse: config.Clickhouse, - region: config.Region, - keyCache: config.KeyCache, - quotaCache: config.QuotaCache, + db: config.DB, + rbac: config.RBAC, + rateLimiter: config.RateLimiter, + usageLimiter: config.UsageLimiter, + keyVerifications: kv, + region: config.Region, + keyCache: config.KeyCache, + quotaCache: config.QuotaCache, }, nil } diff --git a/internal/services/keys/verifier.go b/internal/services/keys/verifier.go index cac2fec017..aa9bdc3b12 100644 --- a/internal/services/keys/verifier.go +++ b/internal/services/keys/verifier.go @@ -7,7 +7,7 @@ import ( "github.com/unkeyed/unkey/internal/services/keys/metrics" "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/usagelimiter" - "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/db" "github.com/unkeyed/unkey/pkg/rbac" @@ -52,10 +52,10 @@ type KeyVerifier struct { spentCredits int64 // Credits spent during verification // Services - rateLimiter ratelimit.Service // Rate limiting service - usageLimiter usagelimiter.Service // Usage limiting service - rBAC *rbac.RBAC // Role-based access control service - clickhouse clickhouse.ClickHouse // Clickhouse for telemetry + rateLimiter ratelimit.Service // Rate limiting service + usageLimiter usagelimiter.Service // Usage limiting service + rBAC *rbac.RBAC // Role-based access control service + keyVerifications *batch.BatchProcessor[schema.KeyVerification] // Buffer for key verification telemetry } // GetRatelimitConfigs returns the rate limit configurations @@ -126,7 +126,7 @@ func (k *KeyVerifier) Verify(ctx context.Context, opts ...VerifyOption) error { func (k *KeyVerifier) log() { latency := time.Since(k.startTime).Milliseconds() - k.clickhouse.BufferKeyVerification(schema.KeyVerification{ + k.keyVerifications.Buffer(schema.KeyVerification{ RequestID: k.session.RequestID(), WorkspaceID: k.Key.WorkspaceID, Time: time.Now().UnixMilli(), diff --git a/internal/services/ratelimit/replay.go b/internal/services/ratelimit/replay.go index da6ac92cbb..51605faec9 100644 --- a/internal/services/ratelimit/replay.go +++ b/internal/services/ratelimit/replay.go @@ -37,8 +37,11 @@ import ( // go svc.replayRequests() // } func (s *service) replayRequests() { - for req := range s.replayBuffer.Consume() { - err := s.syncWithOrigin(context.Background(), req) + for ptr := range s.replayBuffer.Consume() { + if ptr == nil { + continue + } + err := s.syncWithOrigin(context.Background(), *ptr) if err != nil { logger.Error("failed to replay request", "error", err.Error()) } diff --git a/internal/services/usagelimiter/redis.go b/internal/services/usagelimiter/redis.go index bcc837e1b0..78cbb212c1 100644 --- a/internal/services/usagelimiter/redis.go +++ b/internal/services/usagelimiter/redis.go @@ -283,8 +283,11 @@ func (s *counterService) initializeFromDatabase(ctx context.Context, req UsageRe // replayRequests processes buffered credit changes and updates the database func (s *counterService) replayRequests() { - for change := range s.replayBuffer.Consume() { - err := s.syncWithDB(context.Background(), change) + for ptr := range s.replayBuffer.Consume() { + if ptr == nil { + continue + } + err := s.syncWithDB(context.Background(), *ptr) if err != nil { logger.Error("failed to replay credit change", "error", err) } @@ -356,8 +359,12 @@ func (s *counterService) Close() error { // Process remaining items directly select { - case change := <-s.replayBuffer.Consume(): - err := s.syncWithDB(context.Background(), change) + case ptr, ok := <-s.replayBuffer.Consume(): + if !ok || ptr == nil { + logger.Debug("replay buffer channel closed, done draining") + return nil + } + err := s.syncWithDB(context.Background(), *ptr) if err != nil { logger.Error("failed to sync credit change during shutdown", "error", err) } diff --git a/pkg/batch/BUILD.bazel b/pkg/batch/BUILD.bazel index 196d4370d4..0488e1d669 100644 --- a/pkg/batch/BUILD.bazel +++ b/pkg/batch/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "batch", srcs = [ "doc.go", + "noop.go", "process.go", ], importpath = "github.com/unkeyed/unkey/pkg/batch", diff --git a/pkg/batch/noop.go b/pkg/batch/noop.go new file mode 100644 index 0000000000..74f868e50f --- /dev/null +++ b/pkg/batch/noop.go @@ -0,0 +1,22 @@ +package batch + +import ( + "context" + "time" +) + +// NewNoop creates a minimal no-op BatchProcessor that discards all buffered items. +// It allocates almost no memory (1-element channel with a noop flush) and is safe +// to call from any goroutine. Use this when ClickHouse (or any other sink) is not +// configured but the caller still needs a non-nil *BatchProcessor[T]. +func NewNoop[T any]() *BatchProcessor[T] { + return New(Config[T]{ + Name: "noop", + BatchSize: 1, + BufferSize: 1, + FlushInterval: time.Hour, + Drop: true, + Consumers: 1, + Flush: func(_ context.Context, _ []T) {}, + }) +} diff --git a/pkg/batch/process.go b/pkg/batch/process.go index 7030bb8a01..773f020f6c 100644 --- a/pkg/batch/process.go +++ b/pkg/batch/process.go @@ -21,7 +21,6 @@ import ( type BatchProcessor[T any] struct { name string buffer *buffer.Buffer[T] - batch []T config Config[T] flush func(ctx context.Context, batch []T, trigger string) } @@ -93,7 +92,6 @@ func New[T any](config Config[T]) *BatchProcessor[T] { Capacity: config.BufferSize, Drop: config.Drop, }), - batch: make([]T, 0, config.BatchSize), flush: func(ctx context.Context, batch []T, trigger string) { batchSize := len(batch) @@ -123,8 +121,11 @@ func New[T any](config Config[T]) *BatchProcessor[T] { // It reads items from the buffer channel and batches them until // either the batch is full or the flush interval elapses. func (bp *BatchProcessor[T]) process() { - - batch := make([]T, 0, bp.config.BatchSize) + // Start with zero capacity — the slice grows to actual usage on the first + // flush cycle and is reused (batch[:0]) afterwards, so steady-state traffic + // never re-allocates. This avoids pre-allocating BatchSize*sizeof(T) for + // buffers that may see far fewer items per interval. + batch := make([]T, 0) t := time.NewTimer(bp.config.FlushInterval) flushAndReset := func(trigger string) { @@ -138,7 +139,7 @@ func (bp *BatchProcessor[T]) process() { c := bp.buffer.Consume() for { select { - case e, ok := <-c: + case ptr, ok := <-c: if !ok { // channel closed t.Stop() @@ -148,7 +149,7 @@ func (bp *BatchProcessor[T]) process() { } return } - batch = append(batch, e) + batch = append(batch, *ptr) if len(batch) >= bp.config.BatchSize { flushAndReset("size_limit") diff --git a/pkg/buffer/buffer.go b/pkg/buffer/buffer.go index addc38d3c9..dff03074be 100644 --- a/pkg/buffer/buffer.go +++ b/pkg/buffer/buffer.go @@ -12,9 +12,9 @@ import ( // Buffer represents a generic buffered channel that can store elements of type T. // It provides configuration for capacity and drop behavior when the buffer is full. type Buffer[T any] struct { - c chan T // The underlying channel storing elements - drop bool // Whether to drop new elements when buffer is full - name string // name of the buffer + c chan *T // Pointer-based channel — 8 bytes per slot instead of sizeof(T) + drop bool // Whether to drop new elements when buffer is full + name string // name of the buffer stopMetrics func() closeOnce sync.Once // Protects isClosed and stopMetrics @@ -53,7 +53,7 @@ func New[T any](config Config) *Buffer[T] { mu: sync.RWMutex{}, closeOnce: sync.Once{}, isClosed: false, - c: make(chan T, config.Capacity), + c: make(chan *T, config.Capacity), drop: config.Drop, name: config.Name, stopMetrics: func() {}, @@ -102,9 +102,12 @@ func (b *Buffer[T]) Buffer(t T) { return } + ptr := new(T) + *ptr = t + if b.drop { select { - case b.c <- t: + case b.c <- ptr: metrics.BufferState.WithLabelValues(b.name, "buffered").Inc() default: @@ -112,7 +115,7 @@ func (b *Buffer[T]) Buffer(t T) { metrics.BufferState.WithLabelValues(b.name, "dropped").Inc() } } else { - b.c <- t + b.c <- ptr metrics.BufferState.WithLabelValues(b.name, "buffered").Inc() } } @@ -133,10 +136,10 @@ func (b *Buffer[T]) Buffer(t T) { // go func() { // for event := range buffer.Consume() { // // Process each event -// fmt.Println(event) +// fmt.Println(*event) // } // }() -func (b *Buffer[T]) Consume() <-chan T { +func (b *Buffer[T]) Consume() <-chan *T { return b.c } diff --git a/pkg/buffer/buffer_test.go b/pkg/buffer/buffer_test.go index 69aa567634..a9a90db3a2 100644 --- a/pkg/buffer/buffer_test.go +++ b/pkg/buffer/buffer_test.go @@ -86,7 +86,7 @@ func TestBuffer(t *testing.T) { for { select { case v := <-b.c: - received = append(received, v) + received = append(received, *v) case <-timeout: break receiveLoop } @@ -143,7 +143,7 @@ func TestCustomTypes(t *testing.T) { select { case received := <-b.c: - assert.Equal(t, event, received, "received event should match buffered event") + assert.Equal(t, event, *received, "received event should match buffered event") default: t.Error("Expected to receive buffered event") } diff --git a/pkg/clickhouse/BUILD.bazel b/pkg/clickhouse/BUILD.bazel index e340ff6bb6..5e2ef65c29 100644 --- a/pkg/clickhouse/BUILD.bazel +++ b/pkg/clickhouse/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "billable_ratelimits.go", "billable_usage.go", "billable_verifications.go", + "buffer.go", "client.go", "deployment_request_count.go", "doc.go", @@ -22,10 +23,10 @@ go_library( deps = [ "//pkg/batch", "//pkg/circuitbreaker", - "//pkg/clickhouse/schema", "//pkg/codes", "//pkg/fault", "//pkg/logger", + "//pkg/ptr", "//pkg/retry", "@com_github_clickhouse_clickhouse_go_v2//:clickhouse-go", "@com_github_clickhouse_clickhouse_go_v2//lib/driver", diff --git a/pkg/clickhouse/billable_ratelimits.go b/pkg/clickhouse/billable_ratelimits.go index cf8c0a9747..c8a89d564b 100644 --- a/pkg/clickhouse/billable_ratelimits.go +++ b/pkg/clickhouse/billable_ratelimits.go @@ -16,7 +16,7 @@ import ( // return fmt.Errorf("failed to get billable ratelimits: %w", err) // } // fmt.Printf("Billable ratelimits: %d\n", count) -func (c *clickhouse) GetBillableRatelimits(ctx context.Context, workspaceID string, year, month int) (int64, error) { +func (c *Client) GetBillableRatelimits(ctx context.Context, workspaceID string, year, month int) (int64, error) { var count int64 query := ` diff --git a/pkg/clickhouse/billable_usage.go b/pkg/clickhouse/billable_usage.go index d5dbf89116..ad7a935913 100644 --- a/pkg/clickhouse/billable_usage.go +++ b/pkg/clickhouse/billable_usage.go @@ -13,7 +13,7 @@ import ( // // The query uses a CTE to combine verifications and ratelimits, then filters workspaces // where total usage >= minUsage. Returns a map from workspace ID to total usage. -func (c *clickhouse) GetBillableUsageAboveThreshold(ctx context.Context, year, month int, minUsage int64) (map[string]int64, error) { +func (c *Client) GetBillableUsageAboveThreshold(ctx context.Context, year, month int, minUsage int64) (map[string]int64, error) { // Use a single query with UNION ALL and GROUP BY to combine verifications and ratelimits, // then filter to only workspaces with usage >= minUsage query := ` diff --git a/pkg/clickhouse/billable_verifications.go b/pkg/clickhouse/billable_verifications.go index 9d991f8735..a815dd4916 100644 --- a/pkg/clickhouse/billable_verifications.go +++ b/pkg/clickhouse/billable_verifications.go @@ -16,7 +16,7 @@ import ( // return fmt.Errorf("failed to get billable verifications: %w", err) // } // fmt.Printf("Billable verifications: %d\n", count) -func (c *clickhouse) GetBillableVerifications(ctx context.Context, workspaceID string, year, month int) (int64, error) { +func (c *Client) GetBillableVerifications(ctx context.Context, workspaceID string, year, month int) (int64, error) { var count int64 query := ` diff --git a/pkg/clickhouse/buffer.go b/pkg/clickhouse/buffer.go new file mode 100644 index 0000000000..dd51fd8965 --- /dev/null +++ b/pkg/clickhouse/buffer.go @@ -0,0 +1,76 @@ +package clickhouse + +import ( + "context" + "time" + + "github.com/unkeyed/unkey/pkg/batch" + "github.com/unkeyed/unkey/pkg/logger" +) + +// BufferConfig configures a batch buffer created via NewBuffer. +type BufferConfig struct { + // Name identifies this buffer for logging and metrics. + Name string + + // BatchSize is the maximum number of items to collect before flushing. + BatchSize int + + // BufferSize is the capacity of the channel buffer holding incoming items. + BufferSize int + + // FlushInterval is the maximum time to wait before flushing a non-empty batch. + FlushInterval time.Duration + + // Consumers specifies how many goroutine workers should process the channel. + Consumers int + + // Drop determines whether to discard items when the buffer is full. + // When true, new items are silently dropped if the buffer is at capacity. + // When false (default), Buffer() will block until space becomes available. + Drop bool + + // OnFlushError is called when a flush fails. If nil, errors are logged + // via logger.Error (best-effort). Callers that need strict error handling + // can supply their own callback. + OnFlushError func(ctx context.Context, table string, rowCount int, err error) +} + +// NewBuffer creates a *batch.BatchProcessor[T] that flushes rows to the given +// ClickHouse table using the client's connection, retry policy, and circuit breaker. +// +// The caller owns the returned processor and must call Close() on it during shutdown +// (before closing the Client) to drain any buffered rows. +// +// Example: +// +// buf := clickhouse.NewBuffer[schema.SentinelRequest](client, "default.sentinel_requests_raw_v1", clickhouse.BufferConfig{ +// Name: "sentinel_requests", +// BatchSize: 50_000, +// BufferSize: 50_000, +// FlushInterval: 5 * time.Second, +// Consumers: 2, +// }) +// defer buf.Close() +func NewBuffer[T any](c *Client, table string, cfg BufferConfig) *batch.BatchProcessor[T] { + onErr := cfg.OnFlushError + if onErr == nil { + onErr = func(_ context.Context, tbl string, _ int, err error) { + logger.Error("failed to flush batch", "table", tbl, "error", err.Error()) + } + } + + return batch.New(batch.Config[T]{ + Name: cfg.Name, + Drop: cfg.Drop, + BatchSize: cfg.BatchSize, + BufferSize: cfg.BufferSize, + FlushInterval: cfg.FlushInterval, + Consumers: cfg.Consumers, + Flush: func(ctx context.Context, rows []T) { + if err := flush(c, ctx, table, rows); err != nil { + onErr(ctx, table, len(rows), err) + } + }, + }) +} diff --git a/pkg/clickhouse/client.go b/pkg/clickhouse/client.go index cdf44887e0..5fd05c8eaa 100644 --- a/pkg/clickhouse/client.go +++ b/pkg/clickhouse/client.go @@ -7,35 +7,25 @@ import ( "time" ch "github.com/ClickHouse/clickhouse-go/v2" - "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/circuitbreaker" - "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/fault" "github.com/unkeyed/unkey/pkg/logger" "github.com/unkeyed/unkey/pkg/retry" ) -// Clickhouse represents a client for interacting with a ClickHouse database. -// It provides batch processing for different event types to efficiently store -// high volumes of data while minimizing connection overhead. -type clickhouse struct { +// Client represents a client for interacting with a ClickHouse database. +// Batch processing for different event types is handled externally via +// NewBuffer[T], which wires a *batch.BatchProcessor to this client's +// connection, retry policy, and circuit breaker. +type Client struct { conn ch.Conn circuitBreaker *circuitbreaker.CB[struct{}] retry *retry.Retry - - // Batched processors for different event types - apiRequests *batch.BatchProcessor[schema.ApiRequest] - keyVerifications *batch.BatchProcessor[schema.KeyVerification] - ratelimits *batch.BatchProcessor[schema.Ratelimit] - buildSteps *batch.BatchProcessor[schema.BuildStepV1] - buildStepLogs *batch.BatchProcessor[schema.BuildStepLogV1] - sentinelRequests *batch.BatchProcessor[schema.SentinelRequest] } var ( - _ Bufferer = (*clickhouse)(nil) - _ Querier = (*clickhouse)(nil) - _ ClickHouse = (*clickhouse)(nil) + _ Querier = (*Client)(nil) + _ ClickHouse = (*Client)(nil) ) // Config contains the configuration options for the ClickHouse client. @@ -46,21 +36,20 @@ type Config struct { } // New creates a new ClickHouse client with the provided configuration. -// It establishes a connection to the ClickHouse server and initializes -// batch processors for different event types. -// -// The client uses batch processing to efficiently handle high volumes -// of events, automatically flushing based on batch size and time interval. +// It establishes a connection to the ClickHouse server but does not create +// any batch processors. Use NewBuffer[T] to create type-safe batch processors +// for specific event types. // // Example: // -// ch, err := clickhouse.New(clickhouse.Config{ -// URL: "clickhouse://user:pass@clickhouse.example.com:9000/db", +// client, err := clickhouse.New(clickhouse.Config{ +// URL: "clickhouse://user:pass@clickhouse.example.com:9000/db", // }) // if err != nil { // return fmt.Errorf("failed to initialize clickhouse: %w", err) // } -func New(config Config) (*clickhouse, error) { +// buf := clickhouse.NewBuffer[schema.ApiRequest](client, "default.api_requests_raw_v2", clickhouse.BufferConfig{...}) +func New(config Config) (*Client, error) { opts, err := ch.ParseDSN(config.URL) if err != nil { return nil, fault.Wrap(err, fault.Internal("parsing clickhouse DSN failed")) @@ -99,7 +88,7 @@ func New(config Config) (*clickhouse, error) { return nil, fault.Wrap(err, fault.Internal("pinging clickhouse failed")) } - c := &clickhouse{ + c := &Client{ conn: conn, circuitBreaker: circuitbreaker.New[struct{}]( "clickhouse_insert", @@ -117,104 +106,8 @@ func New(config Config) (*clickhouse, error) { return !isAuthenticationError(err) }), ), - apiRequests: nil, - keyVerifications: nil, - ratelimits: nil, - buildSteps: nil, - buildStepLogs: nil, - sentinelRequests: nil, } - c.apiRequests = batch.New(batch.Config[schema.ApiRequest]{ - Name: "api_requests", - Drop: true, - BatchSize: 50_000, - BufferSize: 200_000, - FlushInterval: 5 * time.Second, - Consumers: 2, - Flush: func(ctx context.Context, rows []schema.ApiRequest) { - table := "default.api_requests_raw_v2" - if err := flush(c, ctx, table, rows); err != nil { - logger.Error("failed to flush batch", "table", table, "error", err.Error()) - } - }, - }) - - c.keyVerifications = batch.New(batch.Config[schema.KeyVerification]{ - Name: "key_verifications_v2", - Drop: true, - BatchSize: 50_000, - BufferSize: 200_000, - FlushInterval: 5 * time.Second, - Consumers: 2, - Flush: func(ctx context.Context, rows []schema.KeyVerification) { - table := "default.key_verifications_raw_v2" - if err := flush(c, ctx, table, rows); err != nil { - logger.Error("failed to flush batch", "table", table, "error", err.Error()) - } - }, - }) - - c.ratelimits = batch.New(batch.Config[schema.Ratelimit]{ - Name: "ratelimits", - Drop: true, - BatchSize: 50_000, - BufferSize: 200_000, - FlushInterval: 5 * time.Second, - Consumers: 2, - Flush: func(ctx context.Context, rows []schema.Ratelimit) { - table := "default.ratelimits_raw_v2" - if err := flush(c, ctx, table, rows); err != nil { - logger.Error("failed to flush batch", "table", table, "error", err.Error()) - } - }, - }) - - c.buildSteps = batch.New(batch.Config[schema.BuildStepV1]{ - Name: "build_steps_v1", - Drop: true, - BatchSize: 50_000, - BufferSize: 200_000, - FlushInterval: 2 * time.Second, - Consumers: 1, - Flush: func(ctx context.Context, rows []schema.BuildStepV1) { - table := "default.build_steps_v1" - if err := flush(c, ctx, table, rows); err != nil { - logger.Error("failed to flush batch", "table", table, "error", err.Error()) - } - }, - }) - - c.buildStepLogs = batch.New(batch.Config[schema.BuildStepLogV1]{ - Name: "build_step_logs_v1", - Drop: true, - BatchSize: 50_000, - BufferSize: 200_000, - FlushInterval: 2 * time.Second, - Consumers: 1, - Flush: func(ctx context.Context, rows []schema.BuildStepLogV1) { - table := "default.build_step_logs_v1" - if err := flush(c, ctx, table, rows); err != nil { - logger.Error("failed to flush batch", "table", table, "error", err.Error()) - } - }, - }) - - c.sentinelRequests = batch.New(batch.Config[schema.SentinelRequest]{ - Name: "sentinel_requests_v1", - Drop: true, - BatchSize: 50_000, - BufferSize: 200_000, - FlushInterval: 5 * time.Second, - Consumers: 2, - Flush: func(ctx context.Context, rows []schema.SentinelRequest) { - table := "default.sentinel_requests_raw_v1" - if err := flush(c, ctx, table, rows); err != nil { - logger.Error("failed to flush batch", "table", table, "error", err.Error()) - } - }, - }) - return c, nil } @@ -235,85 +128,7 @@ func isAuthenticationError(err error) bool { strings.Contains(errStr, "code: 517") // Wrong password } -// BufferApiRequest adds an API request event to the buffer for batch processing. -// The event will be flushed to ClickHouse automatically based on the configured -// batch size and flush interval. -// -// This method is non-blocking if the buffer has available capacity. If the buffer -// is full and the Drop option is enabled (which is the default), the event will -// be silently dropped. -// -// Example: -// -// ch.BufferApiRequest(schema.ApiRequest{ -// RequestID: requestID, -// Time: time.Now().UnixMilli(), -// WorkspaceID: workspaceID, -// Host: r.Host, -// Method: r.Method, -// Path: r.URL.Path, -// ResponseStatus: status, -// }) -func (c *clickhouse) BufferApiRequest(req schema.ApiRequest) { - c.apiRequests.Buffer(req) -} - -// BufferKeyVerification adds a key verification event to the buffer for batch processing. -// The event will be flushed to ClickHouse automatically based on the configured -// batch size and flush interval. -// -// This method is non-blocking if the buffer has available capacity. If the buffer -// is full and the Drop option is enabled (which is the default), the event will -// be silently dropped. -// -// Example: -// -// ch.BufferKeyVerificationV2(schema.KeyVerificationV2{ -// RequestID: requestID, -// Time: time.Now().UnixMilli(), -// WorkspaceID: workspaceID, -// KeyID: keyID, -// Outcome: "success", -// }) -func (c *clickhouse) BufferKeyVerification(req schema.KeyVerification) { - c.keyVerifications.Buffer(req) -} - -// BufferRatelimit adds a ratelimit event to the buffer for batch processing. -// The event will be flushed to ClickHouse automatically based on the configured -// batch size and flush interval. -// -// This method is non-blocking if the buffer has available capacity. If the buffer -// is full and the Drop option is enabled (which is the default), the event will -// be silently dropped. -// -// Example: -// -// ch.BufferRatelimit(schema.Ratelimit{ -// RequestID: requestID, -// Time: time.Now().UnixMilli(), -// WorkspaceID: workspaceID, -// NamespaceID: namespaceID, -// Identifier: identifier, -// Passed: passed, -// }) -func (c *clickhouse) BufferRatelimit(req schema.Ratelimit) { - c.ratelimits.Buffer(req) -} - -func (c *clickhouse) BufferBuildStep(req schema.BuildStepV1) { - c.buildSteps.Buffer(req) -} - -func (c *clickhouse) BufferBuildStepLog(req schema.BuildStepLogV1) { - c.buildStepLogs.Buffer(req) -} - -func (c *clickhouse) BufferSentinelRequest(req schema.SentinelRequest) { - c.sentinelRequests.Buffer(req) -} - -func (c *clickhouse) Conn() ch.Conn { +func (c *Client) Conn() ch.Conn { return c.conn } @@ -321,7 +136,7 @@ func (c *clickhouse) Conn() ch.Conn { // Each map represents a row with column names as keys and values as ch.Dynamic. // Returns fault-wrapped errors with appropriate codes for resource limits, // user query errors, and system errors. -func (c *clickhouse) QueryToMaps(ctx context.Context, query string, args ...any) ([]map[string]any, error) { +func (c *Client) QueryToMaps(ctx context.Context, query string, args ...any) ([]map[string]any, error) { rows, err := c.conn.Query(ctx, query, args...) if err != nil { return nil, WrapClickHouseError(err) @@ -360,25 +175,18 @@ func (c *clickhouse) QueryToMaps(ctx context.Context, query string, args ...any) // Exec executes a DDL or DML statement that doesn't return rows. // Used for CREATE, ALTER, DROP, GRANT, REVOKE, etc. -func (c *clickhouse) Exec(ctx context.Context, sql string, args ...any) error { +func (c *Client) Exec(ctx context.Context, sql string, args ...any) error { return c.conn.Exec(ctx, sql, args...) } -func (c *clickhouse) Ping(ctx context.Context) error { +func (c *Client) Ping(ctx context.Context) error { return c.conn.Ping(ctx) } -// Close gracefully shuts down the ClickHouse client. -// It closes all batch processors (waiting for them to flush remaining data), -// then closes the underlying ClickHouse connection. -func (c *clickhouse) Close() error { - c.apiRequests.Close() - c.keyVerifications.Close() - c.ratelimits.Close() - c.buildSteps.Close() - c.buildStepLogs.Close() - c.sentinelRequests.Close() - +// Close shuts down the ClickHouse connection. +// Any batch processors created via NewBuffer must be closed separately +// (and before this call) to ensure buffered rows are flushed. +func (c *Client) Close() error { err := c.conn.Close() if err != nil { return fault.Wrap(err, fault.Internal("clickhouse couldn't shut down")) diff --git a/pkg/clickhouse/deployment_request_count.go b/pkg/clickhouse/deployment_request_count.go index 26f9e90be3..407aba1639 100644 --- a/pkg/clickhouse/deployment_request_count.go +++ b/pkg/clickhouse/deployment_request_count.go @@ -17,7 +17,7 @@ import ( // relative to the current wall clock time. // // Returns 0 (not an error) if no requests exist for the deployment in the given window. -func (c *clickhouse) GetDeploymentRequestCount(ctx context.Context, req GetDeploymentRequestCountRequest) (int64, error) { +func (c *Client) GetDeploymentRequestCount(ctx context.Context, req GetDeploymentRequestCountRequest) (int64, error) { query := ` SELECT toInt64(count()) as count FROM default.sentinel_requests_raw_v1 diff --git a/pkg/clickhouse/flush.go b/pkg/clickhouse/flush.go index 9d1c70e05e..c6b7c0b8c0 100644 --- a/pkg/clickhouse/flush.go +++ b/pkg/clickhouse/flush.go @@ -17,7 +17,7 @@ import ( // - Circuit breaker protection // // Returns an error if any part of the batch operation fails after all retries. -func flush[T any](c *clickhouse, ctx context.Context, table string, rows []T) error { +func flush[T any](c *Client, ctx context.Context, table string, rows []T) error { // Apply async insert settings ctx = ch.Context(ctx, ch.WithSettings(ch.Settings{ "async_insert": "1", diff --git a/pkg/clickhouse/interface.go b/pkg/clickhouse/interface.go index 9ea5cd41e2..9a5e9eceac 100644 --- a/pkg/clickhouse/interface.go +++ b/pkg/clickhouse/interface.go @@ -4,41 +4,8 @@ import ( "context" ch "github.com/ClickHouse/clickhouse-go/v2" - "github.com/unkeyed/unkey/pkg/clickhouse/schema" ) -// Bufferer defines the interface for systems that can buffer events for -// batch processing. It provides methods to add different types of events -// to their respective buffers. -// -// This interface allows for different implementations, such as a real -// ClickHouse client or a no-op implementation for testing or development. -type Bufferer interface { - // BufferApiRequest adds an API request event to the buffer. - // These are typically HTTP requests to the API with request and response details. - BufferApiRequest(schema.ApiRequest) - - // BufferKeyVerification adds a key verification event to the buffer. - // These represent API key validation operations with their outcomes. - BufferKeyVerification(schema.KeyVerification) - - // BufferRatelimit adds a ratelimit event to the buffer. - // These represent API ratelimit operations with their outcome. - BufferRatelimit(schema.Ratelimit) - - // BufferRatelimit adds a ratelimit event to the buffer. - // These represent API ratelimit operations with their outcome. - BufferBuildStep(schema.BuildStepV1) - - // BufferRatelimit adds a ratelimit event to the buffer. - // These represent API ratelimit operations with their outcome. - BufferBuildStepLog(schema.BuildStepLogV1) - - // BufferSentinelRequest adds a sentinel request event to the buffer. - // These represent requests routed through sentinel to deployment instances. - BufferSentinelRequest(schema.SentinelRequest) -} - type Querier interface { // Conn returns a connection to the ClickHouse database. Conn() ch.Conn @@ -76,7 +43,6 @@ type Querier interface { } type ClickHouse interface { - Bufferer Querier // Closes the underlying ClickHouse connection. diff --git a/pkg/clickhouse/key_last_used.go b/pkg/clickhouse/key_last_used.go index b4b7f99951..323ff8ace1 100644 --- a/pkg/clickhouse/key_last_used.go +++ b/pkg/clickhouse/key_last_used.go @@ -30,7 +30,7 @@ type GetKeyLastUsedBatchRequest struct { // GetKeyLastUsedBatchPartitioned returns // keys whose cityHash64(key_id) % totalPartitions == partition. This allows // multiple workers to process disjoint slices of the keyspace concurrently. -func (c *clickhouse) GetKeyLastUsedBatchPartitioned(ctx context.Context, req GetKeyLastUsedBatchRequest) ([]KeyLastUsed, error) { +func (c *Client) GetKeyLastUsedBatchPartitioned(ctx context.Context, req GetKeyLastUsedBatchRequest) ([]KeyLastUsed, error) { query := `SELECT key_id, max(time) as last_used diff --git a/pkg/clickhouse/noop.go b/pkg/clickhouse/noop.go index 484eba8011..114f4083f5 100644 --- a/pkg/clickhouse/noop.go +++ b/pkg/clickhouse/noop.go @@ -4,48 +4,16 @@ import ( "context" ch "github.com/ClickHouse/clickhouse-go/v2" - "github.com/unkeyed/unkey/pkg/clickhouse/schema" ) -// noop implements the Bufferer interface but discards all events. +// noop implements the ClickHouse interface but discards all operations. // This is useful for testing or when ClickHouse functionality is not needed, // such as in development environments or when running integration tests. type noop struct{} -var ( - _ Bufferer = (*noop)(nil) -) - -func (n *noop) BufferApiRequest(schema.ApiRequest) { - // Intentionally empty - discards the event -} - -// BufferKeyVerification implements the Bufferer interface but discards the event. -func (n *noop) BufferKeyVerification(schema.KeyVerification) { - // Intentionally empty - discards the event -} - -// BufferRatelimit implements the Bufferer interface but discards the event. -func (n *noop) BufferRatelimit(req schema.Ratelimit) { - // Intentionally empty - discards the event -} +var _ ClickHouse = (*noop)(nil) -// BufferBuildStep implements the Bufferer interface but discards the event. -func (n *noop) BufferBuildStep(req schema.BuildStepV1) { - // Intentionally empty - discards the event -} - -// BufferBuildStepLog implements the Bufferer interface but discards the event. -func (n *noop) BufferBuildStepLog(req schema.BuildStepLogV1) { - // Intentionally empty - discards the event -} - -// BufferSentinelRequest implements the Bufferer interface but discards the event. -func (n *noop) BufferSentinelRequest(req schema.SentinelRequest) { - // Intentionally empty - discards the event -} - -// GetBillableVerifications implements the Bufferer interface but always returns 0. +// GetBillableVerifications implements the Querier interface but always returns 0. func (n *noop) GetBillableVerifications(ctx context.Context, workspaceID string, year, month int) (int64, error) { return 0, nil } @@ -98,30 +66,10 @@ func (n *noop) Close() error { return nil } -// NewNoop creates a new no-op implementation of the Bufferer interface. -// This implementation simply discards all events without processing them. -// -// This is useful for: -// - Development environments where ClickHouse is not available -// - Testing where analytics are not relevant -// - Scenarios where analytics are optional and not configured -// -// Example: +// NewNoop creates a new no-op implementation of the ClickHouse interface. +// This implementation discards all operations without processing them. // -// var bufferer clickhouse.Bufferer -// if config.ClickhouseURL != "" { -// ch, err := clickhouse.New(clickhouse.Config{ -// URL: config.ClickhouseURL, -// }) -// if err != nil { -// logger.Warn("Failed to initialize ClickHouse, analytics will be disabled") -// bufferer = clickhouse.NewNoop() -// } else { -// bufferer = ch -// } -// } else { -// bufferer = clickhouse.NewNoop() -// } +// For no-op batch processors, use batch.NewNoop[T]() instead. func NewNoop() *noop { return &noop{} } diff --git a/pkg/clickhouse/user.go b/pkg/clickhouse/user.go index b35cecc770..02b2b5c2ec 100644 --- a/pkg/clickhouse/user.go +++ b/pkg/clickhouse/user.go @@ -98,7 +98,7 @@ func getTimeRetentionFilter(tableName string, retentionDays int32) string { // ConfigureUser creates or updates a ClickHouse user with all necessary permissions, quotas, and settings. // This is idempotent - it can be run multiple times to update settings. -func (c *clickhouse) ConfigureUser(ctx context.Context, config UserConfig) error { +func (c *Client) ConfigureUser(ctx context.Context, config UserConfig) error { logger.Info("configuring clickhouse user", "workspace_id", config.WorkspaceID, "username", config.Username) // Validate all identifiers to prevent SQL injection diff --git a/pkg/zen/middleware_metrics.go b/pkg/zen/middleware_metrics.go index 09a272fed9..4c00ae6bf5 100644 --- a/pkg/zen/middleware_metrics.go +++ b/pkg/zen/middleware_metrics.go @@ -8,8 +8,10 @@ import ( "github.com/unkeyed/unkey/pkg/clickhouse/schema" ) -type EventBuffer interface { - BufferApiRequest(schema.ApiRequest) +// ApiRequestBuffer abstracts the method used by WithMetrics to buffer API request events. +// *batch.BatchProcessor[schema.ApiRequest] satisfies this interface. +type ApiRequestBuffer interface { + Buffer(schema.ApiRequest) } var skipHeaders = map[string]bool{ @@ -39,7 +41,7 @@ func formatHeader(key, value string) string { // []zen.Middleware{zen.WithMetrics(eventBuffer)}, // route, // ) -func WithMetrics(eventBuffer EventBuffer, info InstanceInfo) Middleware { +func WithMetrics(apiRequestBuffer ApiRequestBuffer, info InstanceInfo) Middleware { return func(next HandleFunc) HandleFunc { return func(ctx context.Context, s *Session) error { start := time.Now() @@ -67,7 +69,7 @@ func WithMetrics(eventBuffer EventBuffer, info InstanceInfo) Middleware { responseHeaders = append(responseHeaders, formatHeader(k, strings.Join(vv, ","))) } - eventBuffer.BufferApiRequest(schema.ApiRequest{ + apiRequestBuffer.Buffer(schema.ApiRequest{ WorkspaceID: s.WorkspaceID, RequestID: s.RequestID(), Time: start.UnixMilli(), diff --git a/pkg/zen/middleware_metrics_test.go b/pkg/zen/middleware_metrics_test.go index c7b13e4947..f80e085e76 100644 --- a/pkg/zen/middleware_metrics_test.go +++ b/pkg/zen/middleware_metrics_test.go @@ -351,7 +351,7 @@ type mockEventBuffer struct { requests []schema.ApiRequest } -func (m *mockEventBuffer) BufferApiRequest(req schema.ApiRequest) { +func (m *mockEventBuffer) Buffer(req schema.ApiRequest) { m.mu.Lock() defer m.mu.Unlock() m.requests = append(m.requests, req) diff --git a/svc/api/BUILD.bazel b/svc/api/BUILD.bazel index 5a308d9ae7..adf5d64211 100644 --- a/svc/api/BUILD.bazel +++ b/svc/api/BUILD.bazel @@ -19,8 +19,10 @@ go_library( "//internal/services/keys", "//internal/services/ratelimit", "//internal/services/usagelimiter", + "//pkg/batch", "//pkg/cache/clustering", "//pkg/clickhouse", + "//pkg/clickhouse/schema", "//pkg/clock", "//pkg/cluster", "//pkg/config", diff --git a/svc/api/internal/testutil/BUILD.bazel b/svc/api/internal/testutil/BUILD.bazel index eabd8a6fa8..ecbd551803 100644 --- a/svc/api/internal/testutil/BUILD.bazel +++ b/svc/api/internal/testutil/BUILD.bazel @@ -20,7 +20,9 @@ go_library( "//internal/services/keys", "//internal/services/ratelimit", "//internal/services/usagelimiter", + "//pkg/batch", "//pkg/clickhouse", + "//pkg/clickhouse/schema", "//pkg/clock", "//pkg/counter", "//pkg/db", diff --git a/svc/api/internal/testutil/http.go b/svc/api/internal/testutil/http.go index 7d90682b09..52b554f6fd 100644 --- a/svc/api/internal/testutil/http.go +++ b/svc/api/internal/testutil/http.go @@ -20,7 +20,9 @@ import ( "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/usagelimiter" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/clock" "github.com/unkeyed/unkey/pkg/counter" "github.com/unkeyed/unkey/pkg/db" @@ -61,6 +63,8 @@ type Harness struct { UsageLimiter usagelimiter.Service Auditlogs auditlogs.AuditLogService ClickHouse clickhouse.ClickHouse + KeyVerifications *batch.BatchProcessor[schema.KeyVerification] + RatelimitEvents *batch.BatchProcessor[schema.Ratelimit] Ratelimit ratelimit.Service Vault vault.VaultServiceClient AnalyticsConnectionManager analytics.ConnectionManager @@ -116,6 +120,28 @@ func NewHarness(t *testing.T) *Harness { }) require.NoError(t, err) + keyVerifications := clickhouse.NewBuffer[schema.KeyVerification](ch, "default.key_verifications_raw_v2", clickhouse.BufferConfig{ + Name: "key_verifications", + BatchSize: 10, + BufferSize: 100, + FlushInterval: 100 * time.Millisecond, + Consumers: 2, + Drop: true, + OnFlushError: nil, + }) + t.Cleanup(keyVerifications.Close) + + ratelimitsfer := clickhouse.NewBuffer[schema.Ratelimit](ch, "default.ratelimits_raw_v2", clickhouse.BufferConfig{ + Name: "ratelimits", + BatchSize: 10, + BufferSize: 100, + FlushInterval: 100 * time.Millisecond, + Consumers: 2, + Drop: true, + OnFlushError: nil, + }) + t.Cleanup(ratelimitsfer.Close) + validator, err := validation.New() require.NoError(t, err) @@ -175,14 +201,14 @@ func NewHarness(t *testing.T) *Harness { require.NoError(t, err) keyService, err := keys.New(keys.Config{ - DB: database, - KeyCache: caches.VerificationKeyByHash, - QuotaCache: caches.WorkspaceQuota, - RateLimiter: ratelimitService, - RBAC: rbac.New(), - Clickhouse: ch, - Region: "test", - UsageLimiter: ulSvc, + DB: database, + KeyCache: caches.VerificationKeyByHash, + QuotaCache: caches.WorkspaceQuota, + RateLimiter: ratelimitService, + RBAC: rbac.New(), + KeyVerifications: keyVerifications, + Region: "test", + UsageLimiter: ulSvc, }) require.NoError(t, err) @@ -195,6 +221,8 @@ func NewHarness(t *testing.T) *Harness { Ratelimit: ratelimitService, Vault: v, ClickHouse: ch, + KeyVerifications: keyVerifications, + RatelimitEvents: ratelimitsfer, DB: database, seeder: seeder, Clock: clk, diff --git a/svc/api/routes/BUILD.bazel b/svc/api/routes/BUILD.bazel index 27bbfec6b8..036240daf0 100644 --- a/svc/api/routes/BUILD.bazel +++ b/svc/api/routes/BUILD.bazel @@ -17,7 +17,9 @@ go_library( "//internal/services/keys", "//internal/services/ratelimit", "//internal/services/usagelimiter", + "//pkg/batch", "//pkg/clickhouse", + "//pkg/clickhouse/schema", "//pkg/db", "//pkg/pprof", "//pkg/zen", diff --git a/svc/api/routes/register.go b/svc/api/routes/register.go index 3eab6640b5..85adf2026c 100644 --- a/svc/api/routes/register.go +++ b/svc/api/routes/register.go @@ -74,7 +74,7 @@ import ( // Conditional routes are registered based on [Services] configuration. func Register(srv *zen.Server, svc *Services, info zen.InstanceInfo) { withObservability := zen.WithObservability() - withMetrics := zen.WithMetrics(svc.ClickHouse, info) + withMetrics := zen.WithMetrics(svc.ApiRequests, info) withLogging := zen.WithLogging(zen.SkipPaths("/_unkey/internal/", "/health/")) withPanicRecovery := zen.WithPanicRecovery() withErrorHandling := middleware.WithErrorHandling() @@ -118,13 +118,13 @@ func Register(srv *zen.Server, svc *Services, info zen.InstanceInfo) { srv.RegisterRoute( defaultMiddlewares, &v2RatelimitLimit.Handler{ - DB: svc.Database, - Keys: svc.Keys, - ClickHouse: svc.ClickHouse, - Ratelimit: svc.Ratelimit, - NamespaceCache: svc.Caches.RatelimitNamespace, - Auditlogs: svc.Auditlogs, - TestMode: srv.Flags().TestMode, + DB: svc.Database, + Keys: svc.Keys, + RatelimitEvents: svc.RatelimitEvents, + Ratelimit: svc.Ratelimit, + NamespaceCache: svc.Caches.RatelimitNamespace, + Auditlogs: svc.Auditlogs, + TestMode: srv.Flags().TestMode, }, ) @@ -132,13 +132,13 @@ func Register(srv *zen.Server, svc *Services, info zen.InstanceInfo) { srv.RegisterRoute( defaultMiddlewares, &v2RatelimitMultiLimit.Handler{ - DB: svc.Database, - Keys: svc.Keys, - ClickHouse: svc.ClickHouse, - Ratelimit: svc.Ratelimit, - NamespaceCache: svc.Caches.RatelimitNamespace, - Auditlogs: svc.Auditlogs, - TestMode: srv.Flags().TestMode, + DB: svc.Database, + Keys: svc.Keys, + RatelimitEvents: svc.RatelimitEvents, + Ratelimit: svc.Ratelimit, + NamespaceCache: svc.Caches.RatelimitNamespace, + Auditlogs: svc.Auditlogs, + TestMode: srv.Flags().TestMode, }, ) @@ -406,11 +406,9 @@ func Register(srv *zen.Server, svc *Services, info zen.InstanceInfo) { srv.RegisterRoute( defaultMiddlewares, &v2KeysVerifyKey.Handler{ - - ClickHouse: svc.ClickHouse, - DB: svc.Database, - Keys: svc.Keys, - Auditlogs: svc.Auditlogs, + DB: svc.Database, + Keys: svc.Keys, + Auditlogs: svc.Auditlogs, }, ) @@ -593,7 +591,6 @@ func Register(srv *zen.Server, svc *Services, info zen.InstanceInfo) { &v2AnalyticsGetVerifications.Handler{ DB: svc.Database, Keys: svc.Keys, - ClickHouse: svc.ClickHouse, AnalyticsConnectionManager: svc.AnalyticsConnectionManager, Caches: svc.Caches, }, diff --git a/svc/api/routes/services.go b/svc/api/routes/services.go index d0b039fd4d..a9085ed4c9 100644 --- a/svc/api/routes/services.go +++ b/svc/api/routes/services.go @@ -10,7 +10,9 @@ import ( "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/usagelimiter" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/db" "github.com/unkeyed/unkey/pkg/zen/validation" ) @@ -30,10 +32,15 @@ type Services struct { // checks for incoming requests. Keys keys.KeyService - // ClickHouse stores analytics data including verification events, - // rate limit events, and request metrics. + // ClickHouse provides query access to ClickHouse for analytics. ClickHouse clickhouse.ClickHouse + // ApiRequests buffers API request events for ClickHouse. + ApiRequests *batch.BatchProcessor[schema.ApiRequest] + + // RatelimitEvents buffers ratelimit events for ClickHouse. + RatelimitEvents *batch.BatchProcessor[schema.Ratelimit] + // Validator performs request payload validation using struct tags. Validator *validation.Validator diff --git a/svc/api/routes/v2_analytics_get_verifications/200_test.go b/svc/api/routes/v2_analytics_get_verifications/200_test.go index beb126b879..ca9d1c3f3a 100644 --- a/svc/api/routes/v2_analytics_get_verifications/200_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/200_test.go @@ -27,7 +27,7 @@ func Test200_Success(t *testing.T) { // Buffer some key verifications for i := range 5 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace.ID, @@ -43,7 +43,6 @@ func Test200_Success(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -88,7 +87,7 @@ func Test200_PermissionFiltersByApiId(t *testing.T) { // Buffer verifications for api1 for i := range 3 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace.ID, @@ -103,7 +102,7 @@ func Test200_PermissionFiltersByApiId(t *testing.T) { // Buffer verifications for api2 (should NOT be returned) for i := range 5 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace.ID, @@ -119,7 +118,6 @@ func Test200_PermissionFiltersByApiId(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -168,7 +166,7 @@ func Test200_PermissionFiltersByKeySpaceId(t *testing.T) { // Buffer verifications for api1 for i := range 3 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace.ID, @@ -183,7 +181,7 @@ func Test200_PermissionFiltersByKeySpaceId(t *testing.T) { // Buffer verifications for api2 (should NOT be returned) for i := range 5 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace.ID, @@ -199,7 +197,6 @@ func Test200_PermissionFiltersByKeySpaceId(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -249,7 +246,7 @@ func Test200_QueryWithin30DaysRetention(t *testing.T) { now := time.Now().UnixMilli() // Buffer verification from 7 days ago (within 30-day retention) - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - (7 * 24 * 60 * 60 * 1000), // 7 days ago WorkspaceID: workspace.ID, @@ -264,7 +261,6 @@ func Test200_QueryWithin30DaysRetention(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -297,7 +293,6 @@ func Test200_QueryAtExact30DayRetentionLimit(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -328,7 +323,6 @@ func Test200_QueryWithCustomRetention90Days(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -374,7 +368,7 @@ func Test200_RLSWorkspaceIsolation(t *testing.T) { // Buffer data for workspace 1 for i := range 5 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace1.ID, @@ -389,7 +383,7 @@ func Test200_RLSWorkspaceIsolation(t *testing.T) { // Buffer data for workspace 2 (should NOT be accessible by workspace1's key) for i := range 10 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace2.ID, @@ -405,7 +399,6 @@ func Test200_RLSWorkspaceIsolation(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -444,7 +437,6 @@ func Test200_QueryWithoutTimeFilter_AutoAddsFilter(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/400_test.go b/svc/api/routes/v2_analytics_get_verifications/400_test.go index ccabcc08c0..f477bbbbed 100644 --- a/svc/api/routes/v2_analytics_get_verifications/400_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/400_test.go @@ -19,7 +19,6 @@ func Test400_EmptyQuery(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -51,7 +50,6 @@ func Test400_InvalidSQLSyntax(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -85,7 +83,6 @@ func Test400_UnknownColumn(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -117,7 +114,6 @@ func Test400_InvalidTable(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -149,7 +145,6 @@ func Test400_NonSelectQuery(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -181,7 +176,6 @@ func Test400_QueryBeyond30Days(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -214,7 +208,6 @@ func Test400_QueryBeyondCustomRetention90Days(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/401_test.go b/svc/api/routes/v2_analytics_get_verifications/401_test.go index 1edc66b188..19a4a673e7 100644 --- a/svc/api/routes/v2_analytics_get_verifications/401_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/401_test.go @@ -14,7 +14,6 @@ func Test401_NoAuthHeader(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -38,7 +37,6 @@ func Test401_InvalidRootKey(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/403_test.go b/svc/api/routes/v2_analytics_get_verifications/403_test.go index 993f3b5041..b13436695d 100644 --- a/svc/api/routes/v2_analytics_get_verifications/403_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/403_test.go @@ -25,7 +25,6 @@ func Test403_NoAnalyticsPermission(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } @@ -62,7 +61,6 @@ func Test403_WrongApiPermission(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/404_test.go b/svc/api/routes/v2_analytics_get_verifications/404_test.go index 4b836ad05d..026dbc3277 100644 --- a/svc/api/routes/v2_analytics_get_verifications/404_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/404_test.go @@ -23,7 +23,6 @@ func Test404_KeySpaceNotFound(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/412_test.go b/svc/api/routes/v2_analytics_get_verifications/412_test.go index 098f71f4dc..8c6bef3685 100644 --- a/svc/api/routes/v2_analytics_get_verifications/412_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/412_test.go @@ -24,7 +24,6 @@ func Test412_AnalyticsNotConfigured(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/422_test.go b/svc/api/routes/v2_analytics_get_verifications/422_test.go index 536ba80369..f60a2b01e2 100644 --- a/svc/api/routes/v2_analytics_get_verifications/422_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/422_test.go @@ -28,7 +28,7 @@ func Test422_ExceedsMaxMemory(t *testing.T) { // Buffer many verifications to ensure memory usage exceeds limit for i := range 50_000 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace.ID, @@ -44,7 +44,6 @@ func Test422_ExceedsMaxMemory(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/429_test.go b/svc/api/routes/v2_analytics_get_verifications/429_test.go index 51366674f9..5a4ac63c89 100644 --- a/svc/api/routes/v2_analytics_get_verifications/429_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/429_test.go @@ -28,7 +28,7 @@ func Test429_QueryQuotaExceeded(t *testing.T) { // Buffer some key verifications for i := range 5 { - h.ClickHouse.BufferKeyVerification(schema.KeyVerification{ + h.KeyVerifications.Buffer(schema.KeyVerification{ RequestID: uid.New(uid.RequestPrefix), Time: now - int64(i*1000), WorkspaceID: workspace.ID, @@ -44,7 +44,6 @@ func Test429_QueryQuotaExceeded(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/503_test.go b/svc/api/routes/v2_analytics_get_verifications/503_test.go index aca9023e48..4c71cd6540 100644 --- a/svc/api/routes/v2_analytics_get_verifications/503_test.go +++ b/svc/api/routes/v2_analytics_get_verifications/503_test.go @@ -53,7 +53,6 @@ func Test503_ClickHouseConnectionFailure(t *testing.T) { route := &Handler{ DB: h.DB, Keys: h.Keys, - ClickHouse: h.ClickHouse, AnalyticsConnectionManager: h.AnalyticsConnectionManager, Caches: h.Caches, } diff --git a/svc/api/routes/v2_analytics_get_verifications/handler.go b/svc/api/routes/v2_analytics_get_verifications/handler.go index 48862c36ea..5f6d18480c 100644 --- a/svc/api/routes/v2_analytics_get_verifications/handler.go +++ b/svc/api/routes/v2_analytics_get_verifications/handler.go @@ -49,7 +49,6 @@ var ( type Handler struct { DB db.Database Keys keys.KeyService - ClickHouse clickhouse.ClickHouse AnalyticsConnectionManager analytics.ConnectionManager Caches caches.Caches } diff --git a/svc/api/routes/v2_keys_verify_key/200_test.go b/svc/api/routes/v2_keys_verify_key/200_test.go index bf3fdd9b2c..32aab974ef 100644 --- a/svc/api/routes/v2_keys_verify_key/200_test.go +++ b/svc/api/routes/v2_keys_verify_key/200_test.go @@ -21,10 +21,9 @@ func TestSuccess(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/400_test.go b/svc/api/routes/v2_keys_verify_key/400_test.go index 9de2103b0d..b99aa0f82a 100644 --- a/svc/api/routes/v2_keys_verify_key/400_test.go +++ b/svc/api/routes/v2_keys_verify_key/400_test.go @@ -17,10 +17,9 @@ func TestBadRequest(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/401_test.go b/svc/api/routes/v2_keys_verify_key/401_test.go index 76b18465eb..77826420c7 100644 --- a/svc/api/routes/v2_keys_verify_key/401_test.go +++ b/svc/api/routes/v2_keys_verify_key/401_test.go @@ -15,10 +15,9 @@ func TestUnauthorized(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/403_test.go b/svc/api/routes/v2_keys_verify_key/403_test.go index 1bc3e60777..cb465643a3 100644 --- a/svc/api/routes/v2_keys_verify_key/403_test.go +++ b/svc/api/routes/v2_keys_verify_key/403_test.go @@ -16,10 +16,9 @@ func TestForbidden_NoVerifyPermissions(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/404_test.go b/svc/api/routes/v2_keys_verify_key/404_test.go index bc96aff281..f41d9e7c5b 100644 --- a/svc/api/routes/v2_keys_verify_key/404_test.go +++ b/svc/api/routes/v2_keys_verify_key/404_test.go @@ -17,10 +17,9 @@ func TestNotFound(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/412_test.go b/svc/api/routes/v2_keys_verify_key/412_test.go index 3175613559..af742d21fa 100644 --- a/svc/api/routes/v2_keys_verify_key/412_test.go +++ b/svc/api/routes/v2_keys_verify_key/412_test.go @@ -17,10 +17,9 @@ func TestPreconditionFailed(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/BUILD.bazel b/svc/api/routes/v2_keys_verify_key/BUILD.bazel index 5088fc3951..ce8d4f67f8 100644 --- a/svc/api/routes/v2_keys_verify_key/BUILD.bazel +++ b/svc/api/routes/v2_keys_verify_key/BUILD.bazel @@ -8,7 +8,6 @@ go_library( deps = [ "//internal/services/auditlogs", "//internal/services/keys", - "//pkg/clickhouse", "//pkg/codes", "//pkg/db", "//pkg/fault", diff --git a/svc/api/routes/v2_keys_verify_key/handler.go b/svc/api/routes/v2_keys_verify_key/handler.go index c8682e0a31..57a762666d 100644 --- a/svc/api/routes/v2_keys_verify_key/handler.go +++ b/svc/api/routes/v2_keys_verify_key/handler.go @@ -10,7 +10,6 @@ import ( "github.com/unkeyed/unkey/internal/services/auditlogs" "github.com/unkeyed/unkey/internal/services/keys" - "github.com/unkeyed/unkey/pkg/clickhouse" "github.com/unkeyed/unkey/pkg/codes" "github.com/unkeyed/unkey/pkg/db" "github.com/unkeyed/unkey/pkg/fault" @@ -30,10 +29,9 @@ const DefaultCost = 1 // Handler implements zen.Route interface for the v2 keys.verify endpoint type Handler struct { - DB db.Database - Keys keys.KeyService - Auditlogs auditlogs.AuditLogService - ClickHouse clickhouse.ClickHouse + DB db.Database + Keys keys.KeyService + Auditlogs auditlogs.AuditLogService } // Method returns the HTTP method this route responds to diff --git a/svc/api/routes/v2_keys_verify_key/migration_test.go b/svc/api/routes/v2_keys_verify_key/migration_test.go index 59dee5a4f1..59c5a993c5 100644 --- a/svc/api/routes/v2_keys_verify_key/migration_test.go +++ b/svc/api/routes/v2_keys_verify_key/migration_test.go @@ -31,10 +31,9 @@ func TestKeyVerificationWithMigration(t *testing.T) { } verifyRoute := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(verifyRoute) diff --git a/svc/api/routes/v2_keys_verify_key/multilimit_test.go b/svc/api/routes/v2_keys_verify_key/multilimit_test.go index 9a8ca1e0a6..abcacad5c2 100644 --- a/svc/api/routes/v2_keys_verify_key/multilimit_test.go +++ b/svc/api/routes/v2_keys_verify_key/multilimit_test.go @@ -17,10 +17,9 @@ func TestMultiLimit(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/multiple_ratelimits_overcommit_test.go b/svc/api/routes/v2_keys_verify_key/multiple_ratelimits_overcommit_test.go index 8ed96f117a..d61c799b0e 100644 --- a/svc/api/routes/v2_keys_verify_key/multiple_ratelimits_overcommit_test.go +++ b/svc/api/routes/v2_keys_verify_key/multiple_ratelimits_overcommit_test.go @@ -32,10 +32,9 @@ func TestMultipleRatelimitsCounterLeakBug(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/ratelimit_response_test.go b/svc/api/routes/v2_keys_verify_key/ratelimit_response_test.go index f1c84361ee..ba47ecb1cd 100644 --- a/svc/api/routes/v2_keys_verify_key/ratelimit_response_test.go +++ b/svc/api/routes/v2_keys_verify_key/ratelimit_response_test.go @@ -18,10 +18,9 @@ func TestRatelimitResponse(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_keys_verify_key/resend_demo_test.go b/svc/api/routes/v2_keys_verify_key/resend_demo_test.go index 7dffb812b6..537e8105cf 100644 --- a/svc/api/routes/v2_keys_verify_key/resend_demo_test.go +++ b/svc/api/routes/v2_keys_verify_key/resend_demo_test.go @@ -23,10 +23,9 @@ func TestResendDemo(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - DB: h.DB, - Keys: h.Keys, - Auditlogs: h.Auditlogs, - ClickHouse: h.ClickHouse, + DB: h.DB, + Keys: h.Keys, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_ratelimit_limit/200_test.go b/svc/api/routes/v2_ratelimit_limit/200_test.go index aa0c1298d2..e46bb4fc0b 100644 --- a/svc/api/routes/v2_ratelimit_limit/200_test.go +++ b/svc/api/routes/v2_ratelimit_limit/200_test.go @@ -23,12 +23,12 @@ func TestLimitSuccessfully(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - Keys: h.Keys, - ClickHouse: h.ClickHouse, - Ratelimit: h.Ratelimit, - DB: h.DB, - NamespaceCache: h.Caches.RatelimitNamespace, - Auditlogs: h.Auditlogs, + Keys: h.Keys, + RatelimitEvents: h.RatelimitEvents, + Ratelimit: h.Ratelimit, + DB: h.DB, + NamespaceCache: h.Caches.RatelimitNamespace, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_ratelimit_limit/BUILD.bazel b/svc/api/routes/v2_ratelimit_limit/BUILD.bazel index 8111ed6c01..1bd08ed73e 100644 --- a/svc/api/routes/v2_ratelimit_limit/BUILD.bazel +++ b/svc/api/routes/v2_ratelimit_limit/BUILD.bazel @@ -12,8 +12,8 @@ go_library( "//internal/services/ratelimit", "//internal/services/ratelimit/namespace", "//pkg/auditlog", + "//pkg/batch", "//pkg/cache", - "//pkg/clickhouse", "//pkg/clickhouse/schema", "//pkg/codes", "//pkg/db", diff --git a/svc/api/routes/v2_ratelimit_limit/accuracy_test.go b/svc/api/routes/v2_ratelimit_limit/accuracy_test.go index 776858ad78..9d3ff04388 100644 --- a/svc/api/routes/v2_ratelimit_limit/accuracy_test.go +++ b/svc/api/routes/v2_ratelimit_limit/accuracy_test.go @@ -56,12 +56,12 @@ func TestRateLimitAccuracy(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - Keys: h.Keys, - ClickHouse: h.ClickHouse, - Ratelimit: h.Ratelimit, - DB: h.DB, - NamespaceCache: h.Caches.RatelimitNamespace, - Auditlogs: h.Auditlogs, + Keys: h.Keys, + RatelimitEvents: h.RatelimitEvents, + Ratelimit: h.Ratelimit, + DB: h.DB, + NamespaceCache: h.Caches.RatelimitNamespace, + Auditlogs: h.Auditlogs, } h.Register(route) ctx := context.Background() diff --git a/svc/api/routes/v2_ratelimit_limit/handler.go b/svc/api/routes/v2_ratelimit_limit/handler.go index 58a4d14a91..4bb65cddaa 100644 --- a/svc/api/routes/v2_ratelimit_limit/handler.go +++ b/svc/api/routes/v2_ratelimit_limit/handler.go @@ -13,8 +13,8 @@ import ( "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/ratelimit/namespace" "github.com/unkeyed/unkey/pkg/auditlog" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cache" - "github.com/unkeyed/unkey/pkg/clickhouse" "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/codes" "github.com/unkeyed/unkey/pkg/db" @@ -36,14 +36,14 @@ type ( // Handler implements zen.Route interface for the v2 ratelimit limit endpoint type Handler struct { - DB db.Database - Keys keys.KeyService - ClickHouse clickhouse.Bufferer - Ratelimit ratelimit.Service - NamespaceCache cache.Cache[cache.ScopedKey, db.FindRatelimitNamespace] - Auditlogs auditlogs.AuditLogService - TestMode bool - createFlight sf.Group[db.FindRatelimitNamespace] + DB db.Database + Keys keys.KeyService + RatelimitEvents *batch.BatchProcessor[schema.Ratelimit] + Ratelimit ratelimit.Service + NamespaceCache cache.Cache[cache.ScopedKey, db.FindRatelimitNamespace] + Auditlogs auditlogs.AuditLogService + TestMode bool + createFlight sf.Group[db.FindRatelimitNamespace] } // Method returns the HTTP method this route responds to @@ -166,7 +166,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error { } latency := time.Since(t0).Milliseconds() if s.ShouldLogRequestToClickHouse() { - h.ClickHouse.BufferRatelimit(schema.Ratelimit{ + h.RatelimitEvents.Buffer(schema.Ratelimit{ RequestID: s.RequestID(), WorkspaceID: auth.AuthorizedWorkspaceID, Time: time.Now().UnixMilli(), diff --git a/svc/api/routes/v2_ratelimit_multi_limit/200_test.go b/svc/api/routes/v2_ratelimit_multi_limit/200_test.go index 098a4a0dcb..31976f9763 100644 --- a/svc/api/routes/v2_ratelimit_multi_limit/200_test.go +++ b/svc/api/routes/v2_ratelimit_multi_limit/200_test.go @@ -23,12 +23,12 @@ func TestLimitSuccessfully(t *testing.T) { h := testutil.NewHarness(t) route := &handler.Handler{ - Keys: h.Keys, - ClickHouse: h.ClickHouse, - Ratelimit: h.Ratelimit, - DB: h.DB, - NamespaceCache: h.Caches.RatelimitNamespace, - Auditlogs: h.Auditlogs, + Keys: h.Keys, + RatelimitEvents: h.RatelimitEvents, + Ratelimit: h.Ratelimit, + DB: h.DB, + NamespaceCache: h.Caches.RatelimitNamespace, + Auditlogs: h.Auditlogs, } h.Register(route) diff --git a/svc/api/routes/v2_ratelimit_multi_limit/BUILD.bazel b/svc/api/routes/v2_ratelimit_multi_limit/BUILD.bazel index 35d4eb2e96..b64d494d52 100644 --- a/svc/api/routes/v2_ratelimit_multi_limit/BUILD.bazel +++ b/svc/api/routes/v2_ratelimit_multi_limit/BUILD.bazel @@ -12,8 +12,8 @@ go_library( "//internal/services/ratelimit", "//internal/services/ratelimit/namespace", "//pkg/auditlog", + "//pkg/batch", "//pkg/cache", - "//pkg/clickhouse", "//pkg/clickhouse/schema", "//pkg/codes", "//pkg/db", diff --git a/svc/api/routes/v2_ratelimit_multi_limit/handler.go b/svc/api/routes/v2_ratelimit_multi_limit/handler.go index 3d170751e0..b95f291226 100644 --- a/svc/api/routes/v2_ratelimit_multi_limit/handler.go +++ b/svc/api/routes/v2_ratelimit_multi_limit/handler.go @@ -13,8 +13,8 @@ import ( "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/ratelimit/namespace" "github.com/unkeyed/unkey/pkg/auditlog" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cache" - "github.com/unkeyed/unkey/pkg/clickhouse" "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/codes" "github.com/unkeyed/unkey/pkg/db" @@ -35,13 +35,13 @@ type ( // Handler implements zen.Route interface for the v2 ratelimit multiLimit endpoint type Handler struct { - DB db.Database - Keys keys.KeyService - ClickHouse clickhouse.Bufferer - Ratelimit ratelimit.Service - NamespaceCache cache.Cache[cache.ScopedKey, db.FindRatelimitNamespace] - Auditlogs auditlogs.AuditLogService - TestMode bool + DB db.Database + Keys keys.KeyService + RatelimitEvents *batch.BatchProcessor[schema.Ratelimit] + Ratelimit ratelimit.Service + NamespaceCache cache.Cache[cache.ScopedKey, db.FindRatelimitNamespace] + Auditlogs auditlogs.AuditLogService + TestMode bool } // Method returns the HTTP method this route responds to @@ -201,7 +201,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error { if s.ShouldLogRequestToClickHouse() { for i, result := range results { meta := checkMetadata[i] - h.ClickHouse.BufferRatelimit(schema.Ratelimit{ + h.RatelimitEvents.Buffer(schema.Ratelimit{ RequestID: s.RequestID(), WorkspaceID: auth.AuthorizedWorkspaceID, Time: start.UnixMilli(), diff --git a/svc/api/run.go b/svc/api/run.go index b6e37f4a91..49743a4e54 100644 --- a/svc/api/run.go +++ b/svc/api/run.go @@ -23,8 +23,10 @@ import ( "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/usagelimiter" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cache/clustering" "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/clock" "github.com/unkeyed/unkey/pkg/cluster" "github.com/unkeyed/unkey/pkg/counter" @@ -122,13 +124,52 @@ func Run(ctx context.Context, cfg Config) error { } var ch clickhouse.ClickHouse = clickhouse.NewNoop() + apiRequests := batch.NewNoop[schema.ApiRequest]() + keyVerifications := batch.NewNoop[schema.KeyVerification]() + ratelimits := batch.NewNoop[schema.Ratelimit]() + if cfg.ClickHouse.URL != "" { - ch, err = clickhouse.New(clickhouse.Config{ + chClient, chErr := clickhouse.New(clickhouse.Config{ URL: cfg.ClickHouse.URL, }) - if err != nil { - return fmt.Errorf("unable to create clickhouse: %w", err) + if chErr != nil { + return fmt.Errorf("unable to create clickhouse: %w", chErr) } + ch = chClient + + apiRequests = clickhouse.NewBuffer[schema.ApiRequest](chClient, "default.api_requests_raw_v2", clickhouse.BufferConfig{ + Name: "api_requests", + BatchSize: 10_000, + BufferSize: 20_000, + FlushInterval: 5 * time.Second, + Consumers: 2, + Drop: true, + OnFlushError: nil, + }) + keyVerifications = clickhouse.NewBuffer[schema.KeyVerification](chClient, "default.key_verifications_raw_v2", clickhouse.BufferConfig{ + Name: "key_verifications", + BatchSize: 10_000, + BufferSize: 20_000, + FlushInterval: 5 * time.Second, + Consumers: 2, + Drop: true, + OnFlushError: nil, + }) + ratelimits = clickhouse.NewBuffer[schema.Ratelimit](chClient, "default.ratelimits_raw_v2", clickhouse.BufferConfig{ + Name: "ratelimits", + BatchSize: 10_000, + BufferSize: 20_000, + FlushInterval: 5 * time.Second, + Consumers: 2, + Drop: true, + OnFlushError: nil, + }) + + // Close buffers before connection (LIFO) + r.Defer(func() error { apiRequests.Close(); return nil }) + r.Defer(func() error { keyVerifications.Close(); return nil }) + r.Defer(func() error { ratelimits.Close(); return nil }) + r.Defer(chClient.Close) } // Caches will be created after invalidation consumer is set up @@ -269,14 +310,14 @@ func Run(ctx context.Context, cfg Config) error { } keySvc, err := keys.New(keys.Config{ - DB: database, - KeyCache: caches.VerificationKeyByHash, - QuotaCache: caches.WorkspaceQuota, - RateLimiter: rlSvc, - RBAC: rbac.New(), - Clickhouse: ch, - Region: cfg.Region, - UsageLimiter: ulSvc, + DB: database, + KeyCache: caches.VerificationKeyByHash, + QuotaCache: caches.WorkspaceQuota, + RateLimiter: rlSvc, + RBAC: rbac.New(), + KeyVerifications: keyVerifications, + Region: cfg.Region, + UsageLimiter: ulSvc, }) if err != nil { return fmt.Errorf("unable to create key service: %w", err) @@ -323,6 +364,8 @@ func Run(ctx context.Context, cfg Config) error { routes.Register(srv, &routes.Services{ Database: database, ClickHouse: ch, + ApiRequests: apiRequests, + RatelimitEvents: ratelimits, Keys: keySvc, Validator: validator, Ratelimit: rlSvc, diff --git a/svc/ctrl/integration/harness/BUILD.bazel b/svc/ctrl/integration/harness/BUILD.bazel index 6911a1c258..ebad01ea88 100644 --- a/svc/ctrl/integration/harness/BUILD.bazel +++ b/svc/ctrl/integration/harness/BUILD.bazel @@ -8,7 +8,9 @@ go_library( deps = [ "//gen/proto/hydra/v1:hydra", "//gen/rpc/vault", + "//pkg/batch", "//pkg/clickhouse", + "//pkg/clickhouse/schema", "//pkg/db", "//pkg/dockertest", "//pkg/healthcheck", diff --git a/svc/ctrl/integration/harness/harness.go b/svc/ctrl/integration/harness/harness.go index 8c61466dec..93b058769d 100644 --- a/svc/ctrl/integration/harness/harness.go +++ b/svc/ctrl/integration/harness/harness.go @@ -20,7 +20,9 @@ import ( "github.com/stretchr/testify/require" hydrav1 "github.com/unkeyed/unkey/gen/proto/hydra/v1" "github.com/unkeyed/unkey/gen/rpc/vault" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/db" "github.com/unkeyed/unkey/pkg/dockertest" "github.com/unkeyed/unkey/pkg/healthcheck" @@ -204,6 +206,8 @@ func New(t *testing.T, opts ...Option) *Harness { GitHub: nil, DepotConfig: deploy.DepotConfig{APIUrl: "", ProjectRegion: ""}, + BuildSteps: batch.NewNoop[schema.BuildStepV1](), + BuildStepLogs: batch.NewNoop[schema.BuildStepLogV1](), RegistryConfig: deploy.RegistryConfig{URL: "", Username: "", Password: ""}, BuildPlatform: deploy.BuildPlatform{Platform: "", Architecture: ""}, AllowUnauthenticatedDeployments: false, diff --git a/svc/ctrl/worker/BUILD.bazel b/svc/ctrl/worker/BUILD.bazel index 98ad607303..e97639ceda 100644 --- a/svc/ctrl/worker/BUILD.bazel +++ b/svc/ctrl/worker/BUILD.bazel @@ -14,8 +14,10 @@ go_library( "//gen/proto/vault/v1/vaultv1connect", "//gen/rpc/vault", "//pkg/assert", + "//pkg/batch", "//pkg/cache", "//pkg/clickhouse", + "//pkg/clickhouse/schema", "//pkg/clock", "//pkg/config", "//pkg/db", diff --git a/svc/ctrl/worker/deploy/BUILD.bazel b/svc/ctrl/worker/deploy/BUILD.bazel index c2eca01804..a20ebc844c 100644 --- a/svc/ctrl/worker/deploy/BUILD.bazel +++ b/svc/ctrl/worker/deploy/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//gen/proto/vault/v1:vault", "//gen/rpc/vault", "//pkg/assert", + "//pkg/batch", "//pkg/clickhouse", "//pkg/clickhouse/schema", "//pkg/db", diff --git a/svc/ctrl/worker/deploy/build.go b/svc/ctrl/worker/deploy/build.go index f85c9fd845..ef803051d4 100644 --- a/svc/ctrl/worker/deploy/build.go +++ b/svc/ctrl/worker/deploy/build.go @@ -518,7 +518,7 @@ func (w *Workflow) processBuildStatus( if vertex.Completed != nil && !completed[vertex.Digest] { completed[vertex.Digest] = true - w.clickhouse.BufferBuildStep(schema.BuildStepV1{ + w.buildSteps.Buffer(schema.BuildStepV1{ Error: vertex.Error, StartedAt: ptr.SafeDeref(vertex.Started).UnixMilli(), CompletedAt: ptr.SafeDeref(vertex.Completed).UnixMilli(), @@ -534,7 +534,7 @@ func (w *Workflow) processBuildStatus( } for _, log := range status.Logs { - w.clickhouse.BufferBuildStepLog(schema.BuildStepLogV1{ + w.buildStepLogs.Buffer(schema.BuildStepLogV1{ WorkspaceID: workspaceID, ProjectID: projectID, DeploymentID: deploymentID, diff --git a/svc/ctrl/worker/deploy/service.go b/svc/ctrl/worker/deploy/service.go index e51cefe356..d6c65e0d2b 100644 --- a/svc/ctrl/worker/deploy/service.go +++ b/svc/ctrl/worker/deploy/service.go @@ -3,7 +3,9 @@ package deploy import ( hydrav1 "github.com/unkeyed/unkey/gen/proto/hydra/v1" "github.com/unkeyed/unkey/gen/rpc/vault" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/db" githubclient "github.com/unkeyed/unkey/svc/ctrl/worker/github" ) @@ -53,6 +55,8 @@ type Workflow struct { registryConfig RegistryConfig buildPlatform BuildPlatform clickhouse clickhouse.ClickHouse + buildSteps *batch.BatchProcessor[schema.BuildStepV1] + buildStepLogs *batch.BatchProcessor[schema.BuildStepLogV1] allowUnauthenticatedDeployments bool dashboardURL string } @@ -85,9 +89,15 @@ type Config struct { // BuildPlatform specifies the target platform for all builds. BuildPlatform BuildPlatform - // Clickhouse receives build step telemetry for observability. + // Clickhouse provides query access for deployment request counts. Clickhouse clickhouse.ClickHouse + // BuildSteps buffers build step events for ClickHouse. + BuildSteps *batch.BatchProcessor[schema.BuildStepV1] + + // BuildStepLogs buffers build step log events for ClickHouse. + BuildStepLogs *batch.BatchProcessor[schema.BuildStepLogV1] + // AllowUnauthenticatedDeployments controls whether builds can skip GitHub authentication. // Set to true only for local development with public repositories. AllowUnauthenticatedDeployments bool @@ -111,6 +121,8 @@ func New(cfg Config) *Workflow { registryConfig: cfg.RegistryConfig, buildPlatform: cfg.BuildPlatform, clickhouse: cfg.Clickhouse, + buildSteps: cfg.BuildSteps, + buildStepLogs: cfg.BuildStepLogs, allowUnauthenticatedDeployments: cfg.AllowUnauthenticatedDeployments, dashboardURL: cfg.DashboardURL, } diff --git a/svc/ctrl/worker/run.go b/svc/ctrl/worker/run.go index 0748916277..6aeedfbd1a 100644 --- a/svc/ctrl/worker/run.go +++ b/svc/ctrl/worker/run.go @@ -17,8 +17,10 @@ import ( hydrav1 "github.com/unkeyed/unkey/gen/proto/hydra/v1" "github.com/unkeyed/unkey/gen/proto/vault/v1/vaultv1connect" "github.com/unkeyed/unkey/gen/rpc/vault" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cache" "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/clock" "github.com/unkeyed/unkey/pkg/db" "github.com/unkeyed/unkey/pkg/healthcheck" @@ -147,6 +149,9 @@ func Run(ctx context.Context, cfg Config) error { } var ch clickhouse.ClickHouse = clickhouse.NewNoop() + buildSteps := batch.NewNoop[schema.BuildStepV1]() + buildStepLogs := batch.NewNoop[schema.BuildStepLogV1]() + if cfg.ClickHouse.URL != "" { chClient, chErr := clickhouse.New(clickhouse.Config{ URL: cfg.ClickHouse.URL, @@ -155,6 +160,30 @@ func Run(ctx context.Context, cfg Config) error { logger.Error("failed to create clickhouse client, continuing with noop", "error", chErr) } else { ch = chClient + + buildSteps = clickhouse.NewBuffer[schema.BuildStepV1](chClient, "default.build_steps_v1", clickhouse.BufferConfig{ + Name: "build_steps", + BatchSize: 1_000, + BufferSize: 2_000, + FlushInterval: 2 * time.Second, + Consumers: 1, + Drop: true, + OnFlushError: nil, + }) + buildStepLogs = clickhouse.NewBuffer[schema.BuildStepLogV1](chClient, "default.build_step_logs_v1", clickhouse.BufferConfig{ + Name: "build_step_logs", + BatchSize: 1_000, + BufferSize: 2_000, + FlushInterval: 2 * time.Second, + Consumers: 1, + Drop: true, + OnFlushError: nil, + }) + + // Close connection last (LIFO: first registered closes last) + r.Defer(chClient.Close) + r.Defer(func() error { buildSteps.Close(); return nil }) + r.Defer(func() error { buildStepLogs.Close(); return nil }) } } @@ -177,6 +206,8 @@ func Run(ctx context.Context, cfg Config) error { BuildPlatform: deploy.BuildPlatform(buildPlatform), DepotConfig: deploy.DepotConfig(cfg.GetDepotConfig()), Clickhouse: ch, + BuildSteps: buildSteps, + BuildStepLogs: buildStepLogs, AllowUnauthenticatedDeployments: cfg.GitHub.AllowUnauthenticatedDeployments, DashboardURL: cfg.DashboardURL, }), diff --git a/svc/krane/internal/sentinel/apply.go b/svc/krane/internal/sentinel/apply.go index 43d2022d7a..d04afbe00c 100644 --- a/svc/krane/internal/sentinel/apply.go +++ b/svc/krane/internal/sentinel/apply.go @@ -138,7 +138,10 @@ func (c *Controller) ensureSentinelExists(ctx context.Context, sentinel *ctrlv1. ReadonlyReplica: "${UNKEY_DATABASE_REPLICA}", }, ClickHouse: sentinelcfg.ClickHouseConfig{ - URL: "${UNKEY_CLICKHOUSE_URL}", + URL: "${UNKEY_CLICKHOUSE_URL}", + BatchSize: 0, + BufferSize: 0, + Consumers: 0, }, Redis: sentinelcfg.RedisConfig{ URL: "${UNKEY_REDIS_URL}", diff --git a/svc/sentinel/BUILD.bazel b/svc/sentinel/BUILD.bazel index 3532bdb05c..06f556077f 100644 --- a/svc/sentinel/BUILD.bazel +++ b/svc/sentinel/BUILD.bazel @@ -12,9 +12,11 @@ go_library( "//internal/services/keys", "//internal/services/ratelimit", "//internal/services/usagelimiter", + "//pkg/batch", "//pkg/cache", "//pkg/cache/clustering", "//pkg/clickhouse", + "//pkg/clickhouse/schema", "//pkg/clock", "//pkg/cluster", "//pkg/config", diff --git a/svc/sentinel/config.go b/svc/sentinel/config.go index 3b45015f37..e144248f86 100644 --- a/svc/sentinel/config.go +++ b/svc/sentinel/config.go @@ -12,6 +12,19 @@ type ClickHouseConfig struct { // URL is the ClickHouse connection string. // Example: "clickhouse://default:password@clickhouse:9000?secure=false&skip_verify=true" URL string `toml:"url"` + + // BatchSize is the maximum number of items to collect before flushing to ClickHouse. + // Applies to all event buffers (sentinel requests, key verifications). + // Defaults to 5000. + BatchSize int `toml:"batch_size" config:"default=5000,min=1"` + + // BufferSize is the capacity of the channel buffer holding incoming items. + // When full, new items are silently dropped. Defaults to 10000. + BufferSize int `toml:"buffer_size" config:"default=10000,min=1"` + + // Consumers is the number of goroutines that drain each buffer. + // Defaults to 1. + Consumers int `toml:"consumers" config:"default=1,min=1"` } // RedisConfig configures the Redis connection used for rate limiting diff --git a/svc/sentinel/engine/BUILD.bazel b/svc/sentinel/engine/BUILD.bazel index 5649ec0aaa..08a8134a82 100644 --- a/svc/sentinel/engine/BUILD.bazel +++ b/svc/sentinel/engine/BUILD.bazel @@ -41,7 +41,6 @@ go_test( "//internal/services/ratelimit", "//internal/services/usagelimiter", "//pkg/cache", - "//pkg/clickhouse", "//pkg/clock", "//pkg/counter", "//pkg/db", diff --git a/svc/sentinel/engine/integration_test.go b/svc/sentinel/engine/integration_test.go index 85c7d7df26..3bcd21e2f2 100644 --- a/svc/sentinel/engine/integration_test.go +++ b/svc/sentinel/engine/integration_test.go @@ -15,7 +15,6 @@ import ( "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/usagelimiter" "github.com/unkeyed/unkey/pkg/cache" - "github.com/unkeyed/unkey/pkg/clickhouse" "github.com/unkeyed/unkey/pkg/clock" "github.com/unkeyed/unkey/pkg/counter" "github.com/unkeyed/unkey/pkg/db" @@ -97,13 +96,14 @@ func newTestHarness(t *testing.T) *testHarness { require.NoError(t, err) keyService, err := keys.New(keys.Config{ - DB: database, - RateLimiter: rateLimiter, - RBAC: rbac.New(), - Clickhouse: clickhouse.NewNoop(), - Region: "test", - UsageLimiter: usageLimiter, - KeyCache: keyCache, + DB: database, + RateLimiter: rateLimiter, + RBAC: rbac.New(), + KeyVerifications: nil, + Region: "test", + UsageLimiter: usageLimiter, + KeyCache: keyCache, + QuotaCache: nil, }) require.NoError(t, err) diff --git a/svc/sentinel/middleware/BUILD.bazel b/svc/sentinel/middleware/BUILD.bazel index ea36946ea4..7bd22cd8fc 100644 --- a/svc/sentinel/middleware/BUILD.bazel +++ b/svc/sentinel/middleware/BUILD.bazel @@ -11,7 +11,7 @@ go_library( importpath = "github.com/unkeyed/unkey/svc/sentinel/middleware", visibility = ["//visibility:public"], deps = [ - "//pkg/clickhouse", + "//pkg/batch", "//pkg/clickhouse/schema", "//pkg/clock", "//pkg/codes", diff --git a/svc/sentinel/middleware/logging.go b/svc/sentinel/middleware/logging.go index c44e02513d..2aaad34c8a 100644 --- a/svc/sentinel/middleware/logging.go +++ b/svc/sentinel/middleware/logging.go @@ -5,7 +5,7 @@ import ( "net/http" "strings" - "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/clock" "github.com/unkeyed/unkey/pkg/zen" @@ -14,7 +14,7 @@ import ( // WithSentinelLogging logs completed sentinel requests to ClickHouse. // Timing/response data populated by handler via context during proxy execution. -func WithSentinelLogging(ch clickhouse.ClickHouse, clk clock.Clock, sentinelID, region string) zen.Middleware { +func WithSentinelLogging(buf *batch.BatchProcessor[schema.SentinelRequest], clk clock.Clock, sentinelID, region string) zen.Middleware { return func(next zen.HandleFunc) zen.HandleFunc { return func(ctx context.Context, s *zen.Session) error { // nolint:exhaustruct @@ -37,7 +37,7 @@ func WithSentinelLogging(ch clickhouse.ClickHouse, clk clock.Clock, sentinelID, req := s.Request() - ch.BufferSentinelRequest(schema.SentinelRequest{ + buf.Buffer(schema.SentinelRequest{ RequestID: tracking.RequestID, Time: tracking.StartTime.UnixMilli(), WorkspaceID: tracking.Deployment.WorkspaceID, diff --git a/svc/sentinel/routes/BUILD.bazel b/svc/sentinel/routes/BUILD.bazel index 5740a7ba9e..313a8d37af 100644 --- a/svc/sentinel/routes/BUILD.bazel +++ b/svc/sentinel/routes/BUILD.bazel @@ -9,7 +9,8 @@ go_library( importpath = "github.com/unkeyed/unkey/svc/sentinel/routes", visibility = ["//visibility:public"], deps = [ - "//pkg/clickhouse", + "//pkg/batch", + "//pkg/clickhouse/schema", "//pkg/clock", "//pkg/config", "//pkg/pprof", diff --git a/svc/sentinel/routes/register.go b/svc/sentinel/routes/register.go index c1180cff49..9f2afdf075 100644 --- a/svc/sentinel/routes/register.go +++ b/svc/sentinel/routes/register.go @@ -14,7 +14,7 @@ import ( func Register(srv *zen.Server, svc *Services) { withPanicRecovery := zen.WithPanicRecovery() withObservability := middleware.WithObservability(svc.EnvironmentID, svc.Region) - withSentinelLogging := middleware.WithSentinelLogging(svc.ClickHouse, svc.Clock, svc.SentinelID, svc.Region) + withSentinelLogging := middleware.WithSentinelLogging(svc.SentinelRequests, svc.Clock, svc.SentinelID, svc.Region) withProxyErrorHandling := middleware.WithProxyErrorHandling() withTimeout := zen.WithTimeout(svc.RequestTimeout) withLogging := zen.WithLogging(zen.SkipPaths("/_unkey/internal/", "/health/")) diff --git a/svc/sentinel/routes/services.go b/svc/sentinel/routes/services.go index 3b35a31ca5..e3f505427c 100644 --- a/svc/sentinel/routes/services.go +++ b/svc/sentinel/routes/services.go @@ -3,7 +3,8 @@ package routes import ( "time" - "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/batch" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/clock" "github.com/unkeyed/unkey/pkg/config" "github.com/unkeyed/unkey/svc/sentinel/engine" @@ -18,7 +19,7 @@ type Services struct { EnvironmentID string SentinelID string Region string - ClickHouse clickhouse.ClickHouse + SentinelRequests *batch.BatchProcessor[schema.SentinelRequest] MaxRequestBodySize int64 RequestTimeout time.Duration Engine engine.Evaluator diff --git a/svc/sentinel/run.go b/svc/sentinel/run.go index 1ff7fd3af0..ab8b84db02 100644 --- a/svc/sentinel/run.go +++ b/svc/sentinel/run.go @@ -12,9 +12,11 @@ import ( "github.com/unkeyed/unkey/internal/services/keys" "github.com/unkeyed/unkey/internal/services/ratelimit" "github.com/unkeyed/unkey/internal/services/usagelimiter" + "github.com/unkeyed/unkey/pkg/batch" "github.com/unkeyed/unkey/pkg/cache" "github.com/unkeyed/unkey/pkg/cache/clustering" "github.com/unkeyed/unkey/pkg/clickhouse" + "github.com/unkeyed/unkey/pkg/clickhouse/schema" "github.com/unkeyed/unkey/pkg/clock" "github.com/unkeyed/unkey/pkg/cluster" "github.com/unkeyed/unkey/pkg/counter" @@ -109,15 +111,41 @@ func Run(ctx context.Context, cfg Config) error { } r.Defer(database.Close) - var ch clickhouse.ClickHouse = clickhouse.NewNoop() + sentinelRequests := batch.NewNoop[schema.SentinelRequest]() + keyVerifications := batch.NewNoop[schema.KeyVerification]() + + var chClient *clickhouse.Client if cfg.ClickHouse.URL != "" { - ch, err = clickhouse.New(clickhouse.Config{ + chClient, err = clickhouse.New(clickhouse.Config{ URL: cfg.ClickHouse.URL, }) if err != nil { return fmt.Errorf("unable to create clickhouse: %w", err) } - r.Defer(ch.Close) + + sentinelRequests = clickhouse.NewBuffer[schema.SentinelRequest](chClient, "default.sentinel_requests_raw_v1", clickhouse.BufferConfig{ + Name: "sentinel_requests", + BatchSize: cfg.ClickHouse.BatchSize, + BufferSize: cfg.ClickHouse.BufferSize, + FlushInterval: 5 * time.Second, + Consumers: cfg.ClickHouse.Consumers, + Drop: true, + OnFlushError: nil, + }) + keyVerifications = clickhouse.NewBuffer[schema.KeyVerification](chClient, "default.key_verifications_raw_v2", clickhouse.BufferConfig{ + Name: "key_verifications", + BatchSize: cfg.ClickHouse.BatchSize, + BufferSize: cfg.ClickHouse.BufferSize, + FlushInterval: 5 * time.Second, + Consumers: cfg.ClickHouse.Consumers, + Drop: true, + OnFlushError: nil, + }) + + // Close buffers before connection (LIFO) + r.Defer(func() error { sentinelRequests.Close(); return nil }) + r.Defer(func() error { keyVerifications.Close(); return nil }) + r.Defer(chClient.Close) } // Initialize gossip-based cache invalidation @@ -173,7 +201,7 @@ func Run(ctx context.Context, cfg Config) error { // Initialize middleware engine for KeyAuth and other sentinel policies. // Uses Redis if configured, in-memory counters otherwise. - middlewareEngine, err := initMiddlewareEngine(r, cfg, database, ch, clk) + middlewareEngine, err := initMiddlewareEngine(r, cfg, database, keyVerifications, clk) if err != nil { return fmt.Errorf("unable to create middleware engine: %w", err) } @@ -185,7 +213,7 @@ func Run(ctx context.Context, cfg Config) error { EnvironmentID: cfg.EnvironmentID, SentinelID: cfg.SentinelID, Region: cfg.Region, - ClickHouse: ch, + SentinelRequests: sentinelRequests, MaxRequestBodySize: maxRequestBodySize, RequestTimeout: cfg.RequestTimeout, Engine: middlewareEngine, @@ -239,7 +267,7 @@ func Run(ctx context.Context, cfg Config) error { // at startup does not prevent the engine from being created — the Redis client // reconnects lazily, and both the rate limiter and usage limiter degrade // gracefully (local windows / DB fallback) when Redis is temporarily unavailable. -func initMiddlewareEngine(r *runner.Runner, cfg Config, database db.Database, ch clickhouse.ClickHouse, clk clock.Clock) (engine.Evaluator, error) { +func initMiddlewareEngine(r *runner.Runner, cfg Config, database db.Database, keyVerifications *batch.BatchProcessor[schema.KeyVerification], clk clock.Clock) (engine.Evaluator, error) { var ctr counter.Counter if cfg.Redis.URL != "" { redisCtr, redisErr := counter.NewRedis(counter.RedisConfig{ @@ -303,14 +331,14 @@ func initMiddlewareEngine(r *runner.Runner, cfg Config, database db.Database, ch } keyService, err := keys.New(keys.Config{ - DB: database, - RateLimiter: rateLimiter, - RBAC: rbac.New(), - Clickhouse: ch, - Region: cfg.Region, - UsageLimiter: usageLimiter, - KeyCache: keyCache, - QuotaCache: nil, + DB: database, + RateLimiter: rateLimiter, + RBAC: rbac.New(), + KeyVerifications: keyVerifications, + Region: cfg.Region, + UsageLimiter: usageLimiter, + KeyCache: keyCache, + QuotaCache: nil, }) if err != nil { return nil, fmt.Errorf("failed to create key service: %w", err)