Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/apps/api/integration/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (h *Harness) RunAPI(config ApiConfig) *ApiCluster {
VaultS3: nil,
KafkaBrokers: kafkaBrokers, // Use host brokers for test runner connections
DebugCacheHeaders: true, // Enable cache debug headers for integration tests
PprofEnabled: true,
PprofUsername: "unkey",
PprofPassword: "password",
}

// Start API server in goroutine
Expand Down
4 changes: 2 additions & 2 deletions go/apps/api/routes/chproxy_metrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
fault.Public("The provided token is invalid."))
}

events, err := zen.BindBody[[]schema.ApiRequestV1](s)
events, err := zen.BindBody[[]schema.ApiRequest](s)
if err != nil {
return err
}
Expand All @@ -59,7 +59,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {

// Buffer all events to ClickHouse
for _, event := range events {
h.ClickHouse.BufferRequest(event)
h.ClickHouse.BufferApiRequest(event)
}

return s.JSON(http.StatusOK, map[string]string{"status": "OK"})
Expand Down
2 changes: 1 addition & 1 deletion go/apps/api/routes/chproxy_ratelimits/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
fault.Public("The provided token is invalid."))
}

events, err := zen.BindBody[[]schema.RatelimitRequestV1](s)
events, err := zen.BindBody[[]schema.Ratelimit](s)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/apps/api/routes/chproxy_verifications/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
fault.Public("The provided token is invalid."))
}

events, err := zen.BindBody[[]schema.KeyVerificationRequestV1](s)
events, err := zen.BindBody[[]schema.KeyVerification](s)
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions go/apps/api/routes/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ import (

// here we register all of the routes.
// this function runs during startup.
func Register(srv *zen.Server, svc *Services) {
func Register(srv *zen.Server, svc *Services, info zen.InstanceInfo) {
withObservability := zen.WithObservability()
withMetrics := zen.WithMetrics(svc.ClickHouse)

withMetrics := zen.WithMetrics(svc.ClickHouse, info)
withLogging := zen.WithLogging(svc.Logger)
withPanicRecovery := zen.WithPanicRecovery(svc.Logger)
withErrorHandling := zen.WithErrorHandling(svc.Logger)
Expand Down
10 changes: 5 additions & 5 deletions go/apps/api/routes/v2_analytics_get_verifications/200_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Test200_Success(t *testing.T) {

// Buffer some key verifications
for i := range 5 {
h.ClickHouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
h.ClickHouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New(uid.RequestPrefix),
Time: now - int64(i*1000),
WorkspaceID: workspace.ID,
Expand Down Expand Up @@ -89,7 +89,7 @@ func Test200_PermissionFiltersByApiId(t *testing.T) {

// Buffer verifications for api1
for i := range 3 {
h.ClickHouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
h.ClickHouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New(uid.RequestPrefix),
Time: now - int64(i*1000),
WorkspaceID: workspace.ID,
Expand All @@ -104,7 +104,7 @@ func Test200_PermissionFiltersByApiId(t *testing.T) {

// Buffer verifications for api2 (should NOT be returned)
for i := range 5 {
h.ClickHouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
h.ClickHouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New(uid.RequestPrefix),
Time: now - int64(i*1000),
WorkspaceID: workspace.ID,
Expand Down Expand Up @@ -170,7 +170,7 @@ func Test200_PermissionFiltersByKeySpaceId(t *testing.T) {

// Buffer verifications for api1
for i := range 3 {
h.ClickHouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
h.ClickHouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New(uid.RequestPrefix),
Time: now - int64(i*1000),
WorkspaceID: workspace.ID,
Expand All @@ -185,7 +185,7 @@ func Test200_PermissionFiltersByKeySpaceId(t *testing.T) {

// Buffer verifications for api2 (should NOT be returned)
for i := range 5 {
h.ClickHouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
h.ClickHouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New(uid.RequestPrefix),
Time: now - int64(i*1000),
WorkspaceID: workspace.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Test422_ExceedsMaxMemory(t *testing.T) {

// Buffer many verifications to ensure memory usage exceeds limit
for i := range 50_000 {
h.ClickHouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
h.ClickHouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New(uid.RequestPrefix),
Time: now - int64(i*1000),
WorkspaceID: workspace.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func Test429_QueryQuotaExceeded(t *testing.T) {
now := h.Clock.Now().UnixMilli()

// Buffer some key verifications
for i := 0; i < 5; i++ {
h.ClickHouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
for i := range 5 {
h.ClickHouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New(uid.RequestPrefix),
Time: now - int64(i*1000),
WorkspaceID: workspace.ID,
Expand Down
4 changes: 2 additions & 2 deletions go/apps/api/routes/v2_ratelimit_limit/200_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func TestLimitSuccessfully(t *testing.T) {
res := testutil.CallRoute[handler.Request, handler.Response](h, route, headers, req)
require.Equal(t, 200, res.Status, "expected 200, received: %v", res.Body)

row := schema.RatelimitV2{}
row := schema.Ratelimit{}
require.Eventually(t, func() bool {

data, err := clickhouse.Select[schema.RatelimitV2](
data, err := clickhouse.Select[schema.Ratelimit](
ctx,
h.ClickHouse.Conn(),
"SELECT * FROM default.ratelimits_raw_v2 WHERE workspace_id = {workspace_id:String} AND namespace_id = {namespace_id:String}",
Expand Down
14 changes: 10 additions & 4 deletions go/apps/api/routes/v2_ratelimit_limit/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
}

// Apply override if found, otherwise use request values
limit, duration, overrideId, err := getLimitAndDuration(req, namespace)
limit, duration, overrideID, err := getLimitAndDuration(req, namespace)
if err != nil {
return fault.Wrap(err,
fault.Code(codes.App.Internal.UnexpectedError.URN()),
Expand Down Expand Up @@ -277,22 +277,28 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
}
}

t0 := time.Now()
result, err := h.Ratelimit.Ratelimit(ctx, limitReq)
if err != nil {
return fault.Wrap(err,
fault.Internal("rate limit failed"),
fault.Public("We're unable to process the rate limit request."),
)
}

latency := time.Since(t0).Milliseconds()
if s.ShouldLogRequestToClickHouse() {
h.ClickHouse.BufferRatelimit(schema.RatelimitRequestV1{
h.ClickHouse.BufferRatelimit(schema.Ratelimit{
RequestID: s.RequestID(),
WorkspaceID: auth.AuthorizedWorkspaceID,
Time: time.Now().UnixMilli(),
NamespaceID: namespace.ID,
Identifier: req.Identifier,
Passed: result.Success,
Latency: float64(latency),
OverrideID: overrideID,
Limit: uint64(result.Limit),
Remaining: uint64(result.Remaining),
ResetAt: result.Reset.UnixMilli(),
})
}

Expand All @@ -305,7 +311,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error {
Limit: limit,
Remaining: result.Remaining,
Reset: result.Reset.UnixMilli(),
OverrideId: overrideId,
OverrideId: overrideID,
},
}

Expand Down
6 changes: 5 additions & 1 deletion go/apps/api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,11 @@ func Run(ctx context.Context, cfg Config) error {
PprofPassword: cfg.PprofPassword,
UsageLimiter: ulSvc,
AnalyticsConnectionManager: analyticsConnMgr,
})
},
zen.InstanceInfo{
ID: cfg.InstanceID,
Region: cfg.Region,
})

if cfg.Listener == nil {
// Create listener from HttpPort (production)
Expand Down
4 changes: 2 additions & 2 deletions go/apps/gw/server/middleware_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// EventBuffer defines the interface for buffering events to be sent to ClickHouse.
type EventBuffer interface {
BufferApiRequest(schema.ApiRequestV2)
BufferApiRequest(schema.ApiRequest)
}

// WithMetrics returns middleware that collects metrics about each request,
Expand Down Expand Up @@ -62,7 +62,7 @@ func WithMetrics(eventBuffer EventBuffer, region string) Middleware {
ipAddress = s.Location()
}

eventBuffer.BufferApiRequest(schema.ApiRequestV2{
eventBuffer.BufferApiRequest(schema.ApiRequest{
QueryString: "",
QueryParams: map[string][]string{},
WorkspaceID: s.WorkspaceID,
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/dev/seed/verifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (s *Seeder) generateVerifications(ctx context.Context, workspaceID string,
}

// Use BufferKeyVerification to let the clickhouse client batch automatically
s.clickhouse.BufferKeyVerificationV2(schema.KeyVerificationV2{
s.clickhouse.BufferKeyVerification(schema.KeyVerification{
RequestID: uid.New("req"),
Time: timestamp.UnixMilli(),
WorkspaceID: workspaceID,
Expand Down
24 changes: 14 additions & 10 deletions go/internal/services/keys/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,20 @@ func (k *KeyVerifier) Verify(ctx context.Context, opts ...VerifyOption) error {
}

func (k *KeyVerifier) log() {
k.clickhouse.BufferKeyVerification(schema.KeyVerificationRequestV1{
RequestID: k.session.RequestID(),
WorkspaceID: k.Key.WorkspaceID,
Time: time.Now().UnixMilli(),
Outcome: string(k.Status),
KeySpaceID: k.Key.KeyAuthID,
KeyID: k.Key.ID,
IdentityID: k.Key.IdentityID.String,
Tags: k.tags,
Region: k.region,

k.clickhouse.BufferKeyVerification(schema.KeyVerification{
RequestID: k.session.RequestID(),
WorkspaceID: k.Key.WorkspaceID,
Time: time.Now().UnixMilli(),
Outcome: string(k.Status),
KeySpaceID: k.Key.KeyAuthID,
KeyID: k.Key.ID,
IdentityID: k.Key.IdentityID.String,
Tags: k.tags,
Region: k.region,
ExternalID: k.Key.ExternalID.String,
SpentCredits: 0, // TODO
Latency: 0.0, // TODO
})

keyType := "key"
Expand Down
Loading
Loading