diff --git a/apps/agent/cmd/agent/agent.go b/apps/agent/cmd/agent/agent.go index 8b2780c77e..b7cb435cda 100644 --- a/apps/agent/cmd/agent/agent.go +++ b/apps/agent/cmd/agent/agent.go @@ -9,7 +9,6 @@ import ( "runtime/debug" "strings" "syscall" - "time" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fmsg" @@ -22,11 +21,9 @@ import ( "github.com/unkeyed/unkey/apps/agent/pkg/metrics" "github.com/unkeyed/unkey/apps/agent/pkg/profiling" "github.com/unkeyed/unkey/apps/agent/pkg/prometheus" - "github.com/unkeyed/unkey/apps/agent/pkg/tinybird" "github.com/unkeyed/unkey/apps/agent/pkg/tracing" "github.com/unkeyed/unkey/apps/agent/pkg/uid" "github.com/unkeyed/unkey/apps/agent/pkg/version" - "github.com/unkeyed/unkey/apps/agent/services/eventrouter" "github.com/unkeyed/unkey/apps/agent/services/ratelimit" "github.com/unkeyed/unkey/apps/agent/services/vault" "github.com/unkeyed/unkey/apps/agent/services/vault/storage" @@ -244,25 +241,6 @@ func run(c *cli.Context) error { return err } - if cfg.Services.EventRouter != nil { - var er *eventrouter.Service - er, err = eventrouter.New(eventrouter.Config{ - Logger: logger, - Metrics: m, - BatchSize: cfg.Services.EventRouter.Tinybird.BatchSize, - BufferSize: cfg.Services.EventRouter.Tinybird.BufferSize, - FlushInterval: time.Duration(cfg.Services.EventRouter.Tinybird.FlushInterval) * time.Second, - Tinybird: tinybird.New("https://api.tinybird.co", cfg.Services.EventRouter.Tinybird.Token), - Clickhouse: ch, - AuthToken: cfg.AuthToken, - }) - if err != nil { - return err - } - srv.WithEventRouter(er) - - } - connectSrv, err := connect.New(connect.Config{Logger: logger, Image: cfg.Image, Metrics: m}) if err != nil { return err diff --git a/apps/agent/config.docker.json b/apps/agent/config.docker.json index 9f1655fb0b..7972eb6b9e 100644 --- a/apps/agent/config.docker.json +++ b/apps/agent/config.docker.json @@ -20,14 +20,6 @@ } }, "services": { - "eventRouter": { - "tinybird": { - "token": "${TINYBIRD_TOKEN}", - "batchSize": 1000, - "flushInterval": 1, - "bufferSize": 10000 - } - }, "vault": { "s3Url": "${VAULT_S3_URL}", "s3Bucket": "${VAULT_S3_BUCKET}", diff --git a/apps/agent/config.production.json b/apps/agent/config.production.json index ea4aade134..ee8d7083d5 100644 --- a/apps/agent/config.production.json +++ b/apps/agent/config.production.json @@ -30,14 +30,6 @@ } }, "services": { - "eventRouter": { - "tinybird": { - "token": "${TINYBIRD_TOKEN}", - "batchSize": 1000, - "flushInterval": 1, - "bufferSize": 10000 - } - }, "vault": { "s3Url": "${VAULT_S3_URL}", "s3Bucket": "${VAULT_S3_BUCKET}", diff --git a/apps/agent/config.staging.json b/apps/agent/config.staging.json index 15b0683710..7fba639265 100644 --- a/apps/agent/config.staging.json +++ b/apps/agent/config.staging.json @@ -12,14 +12,6 @@ "region": "fly::${FLY_REGION}", "authToken": "${AUTH_TOKEN}", "services": { - "eventRouter": { - "tinybird": { - "token": "${TINYBIRD_TOKEN}", - "batchSize": 1000, - "flushInterval": 1, - "bufferSize": 10000 - } - }, "vault": { "s3Url": "${VAULT_S3_URL}", "s3Bucket": "${VAULT_S3_BUCKET}", diff --git a/apps/agent/pkg/api/server.go b/apps/agent/pkg/api/server.go index d113431aab..6aaf5fe7c0 100644 --- a/apps/agent/pkg/api/server.go +++ b/apps/agent/pkg/api/server.go @@ -8,7 +8,6 @@ import ( "github.com/unkeyed/unkey/apps/agent/pkg/api/validation" "github.com/unkeyed/unkey/apps/agent/pkg/logging" "github.com/unkeyed/unkey/apps/agent/pkg/metrics" - "github.com/unkeyed/unkey/apps/agent/services/eventrouter" "github.com/unkeyed/unkey/apps/agent/services/ratelimit" "github.com/unkeyed/unkey/apps/agent/services/vault" ) @@ -94,15 +93,6 @@ func New(config Config) (*Server, error) { return s, nil } -func (s *Server) WithEventRouter(svc *eventrouter.Service) { - s.Lock() - defer s.Unlock() - - pattern, handlerFunc := svc.CreateHandler() - - s.mux.HandleFunc(pattern, handlerFunc) -} - // Calling this function multiple times will have no effect. func (s *Server) Listen(addr string) error { s.Lock() diff --git a/apps/agent/pkg/config/agent.go b/apps/agent/pkg/config/agent.go index e5ee09c23e..5924449d74 100644 --- a/apps/agent/pkg/config/agent.go +++ b/apps/agent/pkg/config/agent.go @@ -36,14 +36,6 @@ type Agent struct { } `json:"heartbeat,omitempty" description:"Send heartbeat to a URL"` Services struct { - EventRouter *struct { - Tinybird *struct { - Token string `json:"token" minLength:"1" description:"The token to use for tinybird authentication"` - FlushInterval int `json:"flushInterval" min:"1" description:"Interval in seconds to flush events"` - BufferSize int `json:"bufferSize" min:"1" description:"Size of the buffer"` - BatchSize int `json:"batchSize" min:"1" description:"Size of the batch"` - } `json:"tinybird,omitempty" description:"Send events to tinybird"` - } `json:"eventRouter,omitempty" description:"Route events"` Vault struct { S3Bucket string `json:"s3Bucket" minLength:"1" description:"The bucket to store secrets in"` S3Url string `json:"s3Url" minLength:"1" description:"The url to store secrets in"` diff --git a/apps/agent/pkg/prometheus/metrics.go b/apps/agent/pkg/prometheus/metrics.go index ca382c6213..1f1f97dc30 100644 --- a/apps/agent/pkg/prometheus/metrics.go +++ b/apps/agent/pkg/prometheus/metrics.go @@ -57,11 +57,6 @@ var ( Subsystem: "cache", Name: "rejected", }, []string{"resource"}) - EventRouterFlushedRows = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "agent", - Subsystem: "event_router", - Name: "flushed_rows", - }, []string{"datasource"}) RatelimitPushPullEvents = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "agent", Subsystem: "ratelimit", diff --git a/apps/agent/pkg/testutils/containers/agent.go b/apps/agent/pkg/testutils/containers/agent.go index 5540fe3e98..8596a50d04 100644 --- a/apps/agent/pkg/testutils/containers/agent.go +++ b/apps/agent/pkg/testutils/containers/agent.go @@ -64,7 +64,6 @@ func NewAgent(t *testing.T, clusterSize int) []Agent { "VAULT_S3_ACCESS_KEY_ID": s3.AccessKeyId, "VAULT_S3_ACCESS_KEY_SECRET": s3.AccessKeySecret, "VAULT_MASTER_KEYS": "Ch9rZWtfMmdqMFBJdVhac1NSa0ZhNE5mOWlLSnBHenFPENTt7an5MRogENt9Si6wms4pQ2XIvqNSIgNpaBenJmXgcInhu6Nfv2U=", - "TINYBIRD_TOKEN": "I can't wait until we use clickhouse for local development", }, WaitingFor: wait.ForHTTP("/v1/liveness"), }, diff --git a/apps/agent/pkg/tinybird/tinybird.go b/apps/agent/pkg/tinybird/tinybird.go deleted file mode 100644 index c6ffa82463..0000000000 --- a/apps/agent/pkg/tinybird/tinybird.go +++ /dev/null @@ -1,71 +0,0 @@ -package tinybird - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "strings" - - "github.com/unkeyed/unkey/apps/agent/pkg/openapi" -) - -type Client struct { - baseUrl string - token string - httpClient *http.Client -} - -func New(baseUrl string, token string) *Client { - return &Client{ - baseUrl: baseUrl, - token: token, - httpClient: http.DefaultClient, - } -} - -func (c *Client) Ingest(datasource string, rows []any) error { - - body := "" - for _, row := range rows { - str, err := json.Marshal(row) - if err != nil { - return err - } - body += string(str) + "\n" - } - - req, err := http.NewRequest(http.MethodPost, c.baseUrl+"/v0/events?name="+datasource, strings.NewReader(body)) - if err != nil { - return err - } - req.Header.Set("Authorization", "Bearer "+c.token) - - resp, err := c.httpClient.Do(req) - if err != nil { - return fmt.Errorf("error performing POST request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return errors.New("error ingesting rows, status code: " + resp.Status) - - } - - res := openapi.V0EventsResponseBody{} - resBody, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %w", err) - - } - err = json.Unmarshal(resBody, &res) - if err != nil { - return fmt.Errorf("error decoding response: %w", err) - } - if res.SuccessfulRows != len(rows) { - return errors.New("error ingesting all rows") - } - - return nil -} diff --git a/apps/agent/schema.json b/apps/agent/schema.json index c5a6c5b831..ee3e30d4d0 100644 --- a/apps/agent/schema.json +++ b/apps/agent/schema.json @@ -207,41 +207,6 @@ "services": { "type": "object", "properties": { - "eventRouter": { - "type": "object", - "description": "Route events", - "properties": { - "tinybird": { - "type": "object", - "description": "Send events to tinybird", - "properties": { - "batchSize": { - "type": "integer", - "description": "Size of the batch", - "format": "int32" - }, - "bufferSize": { - "type": "integer", - "description": "Size of the buffer", - "format": "int32" - }, - "flushInterval": { - "type": "integer", - "description": "Interval in seconds to flush events", - "format": "int32" - }, - "token": { - "type": "string", - "description": "The token to use for tinybird authentication", - "minLength": 1 - } - }, - "additionalProperties": false, - "required": ["token", "flushInterval", "bufferSize", "batchSize"] - } - }, - "additionalProperties": false - }, "vault": { "type": "object", "description": "Store secrets", diff --git a/apps/agent/services/eventrouter/service.go b/apps/agent/services/eventrouter/service.go deleted file mode 100644 index d8cd4bf88d..0000000000 --- a/apps/agent/services/eventrouter/service.go +++ /dev/null @@ -1,248 +0,0 @@ -package eventrouter - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "time" - - "github.com/unkeyed/unkey/apps/agent/pkg/auth" - "github.com/unkeyed/unkey/apps/agent/pkg/batch" - "github.com/unkeyed/unkey/apps/agent/pkg/clickhouse" - "github.com/unkeyed/unkey/apps/agent/pkg/clickhouse/schema" - "github.com/unkeyed/unkey/apps/agent/pkg/logging" - "github.com/unkeyed/unkey/apps/agent/pkg/metrics" - "github.com/unkeyed/unkey/apps/agent/pkg/openapi" - "github.com/unkeyed/unkey/apps/agent/pkg/prometheus" - "github.com/unkeyed/unkey/apps/agent/pkg/tinybird" - "github.com/unkeyed/unkey/apps/agent/pkg/tracing" -) - -type event struct { - datasource string - row any -} - -type Config struct { - BatchSize int - BufferSize int - FlushInterval time.Duration - - Tinybird *tinybird.Client - Logger logging.Logger - Metrics metrics.Metrics - Clickhouse clickhouse.Bufferer - AuthToken string -} - -type Service struct { - logger logging.Logger - metrics metrics.Metrics - batcher batch.BatchProcessor[event] - tb *tinybird.Client - authToken string - clickhouse clickhouse.Bufferer -} - -func New(config Config) (*Service, error) { - - flush := func(ctx context.Context, events []event) { - if len(events) == 0 { - return - } - // config.Metrics.RecordFlush() - eventsByDatasource := map[string][]any{} - for _, e := range events { - if _, ok := eventsByDatasource[e.datasource]; !ok { - eventsByDatasource[e.datasource] = []any{} - } - eventsByDatasource[e.datasource] = append(eventsByDatasource[e.datasource], e.row) - } - for datasource, rows := range eventsByDatasource { - err := config.Tinybird.Ingest(datasource, rows) - if err != nil { - config.Logger.Err(err).Str("datasource", datasource).Int("rows", len(rows)).Msg("Error ingesting") - } - prometheus.EventRouterFlushedRows.With(map[string]string{ - "datasource": datasource, - }).Add(float64(len(rows))) - - if datasource == "key_verifications__v2" { - - for _, row := range rows { - e, ok := row.(tinybirdKeyVerification) - if !ok { - config.Logger.Error().Str("e", fmt.Sprintf("%T: %+v", row, row)).Msg("Error casting key verification") - continue - } - // dual write to clickhouse - outcome := "VALID" - if e.DeniedReason != "" { - outcome = e.DeniedReason - } - config.Clickhouse.BufferKeyVerification(schema.KeyVerificationRequestV1{ - RequestID: e.RequestID, - Time: e.Time, - WorkspaceID: e.WorkspaceId, - KeySpaceID: e.KeySpaceId, - KeyID: e.KeyId, - Region: e.Region, - Outcome: outcome, - IdentityID: e.OwnerId, - }) - } - } - - } - } - - batcher := batch.New(batch.Config[event]{ - BatchSize: config.BatchSize, - BufferSize: config.BufferSize, - FlushInterval: config.FlushInterval, - Flush: flush, - }) - return &Service{ - logger: config.Logger, - metrics: config.Metrics, - batcher: *batcher, - tb: config.Tinybird, - authToken: config.AuthToken, - }, nil -} - -// this is what we currently send to tinybird -// we need to parse it and transform it into a clickhouse event, then dual write to both stores -type tinybirdKeyVerification struct { - ApiId string `json:"apiId"` - EdgeRegion string `json:"edgeRegion"` - IpAddress string `json:"ipAddress"` - KeyId string `json:"keyId"` - Ratelimited bool `json:"ratelimited"` - Region string `json:"region"` - RequestedResource string `json:"requestedResource"` - Time int64 `json:"time"` - UsageExceeded bool `json:"usageExceeded"` - UserAgent string `json:"userAgent"` - WorkspaceId string `json:"workspaceId"` - DeniedReason string `json:"deniedReason,omitempty"` - OwnerId string `json:"ownerId,omitempty"` - KeySpaceId string `json:"keySpaceId,omitempty"` - RequestID string `json:"requestId,omitempty"` - RequestBody string `json:"requestBody,omitempty"` - ResponeBody string `json:"responseBody,omitempty"` -} - -func (s *Service) CreateHandler() (string, http.HandlerFunc) { - return "POST /v0/events", func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - s.logger.Info().Msg("Received events") - - ctx, span := tracing.Start(r.Context(), tracing.NewSpanName("eventrouter", "v0/events")) - defer span.End() - - err := auth.Authorize(ctx, s.authToken, r.Header.Get("Authorization")) - if err != nil { - s.logger.Warn().Err(err).Msg("failed to authorize request") - w.WriteHeader(403) - _, err = w.Write([]byte("Unauthorized")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - datasource := r.URL.Query().Get("name") - if datasource == "" { - w.WriteHeader(400) - _, err = w.Write([]byte("missing ?name= parameter")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - successfulRows := 0 - switch datasource { - case "key_verifications__v2": - events, decodeErr := decode[tinybirdKeyVerification](r.Body) - if decodeErr != nil { - s.logger.Err(decodeErr).Msg("Error decoding request") - w.WriteHeader(400) - _, err = w.Write([]byte("Error decoding request")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - for _, e := range events { - s.batcher.Buffer(event{datasource, e}) - } - successfulRows = len(events) - default: - events, decodeErr := decode[any](r.Body) - if decodeErr != nil { - s.logger.Err(decodeErr).Msg("Error decoding request") - w.WriteHeader(400) - _, err = w.Write([]byte("Error decoding request")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - for _, e := range events { - s.batcher.Buffer(event{datasource, e}) - } - successfulRows = len(events) - } - - response := openapi.V0EventsResponseBody{ - SuccessfulRows: successfulRows, - QuarantinedRows: 0, - } - - b, err := json.Marshal(response) - if err != nil { - s.logger.Err(err).Msg("Error marshalling response") - w.WriteHeader(500) - _, err = w.Write([]byte("Error marshalling response")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - w.Header().Add("Content-Type", "application/json") - _, err = w.Write(b) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - } -} - -// decode reads the body of the request and decodes it into a slice of T -// the reader will be closed automatically -func decode[T any](body io.ReadCloser) ([]T, error) { - defer body.Close() - - dec := json.NewDecoder(body) - - rows := []T{} - - for { - var v T - err := dec.Decode(&v) - if err != nil { - if err == io.EOF { - break - } - return nil, err - } - rows = append(rows, v) - } - - return rows, nil - -} diff --git a/apps/api/src/routes/v1_apis_listKeys.ts b/apps/api/src/routes/v1_apis_listKeys.ts index 5f7f2f89c4..2e1c80a1d3 100644 --- a/apps/api/src/routes/v1_apis_listKeys.ts +++ b/apps/api/src/routes/v1_apis_listKeys.ts @@ -211,7 +211,7 @@ export const registerV1ApisListKeys = (app: App) => } if (keySpace.sizeLastUpdatedAt < Date.now() - 60_000) { - const count = await db.primary + const count = await db.readonly .select({ count: sql`count(*)` }) .from(schema.keys) .where(and(eq(schema.keys.keyAuthId, keySpace.id), isNull(schema.keys.deletedAt))); diff --git a/apps/docs/contributing/services/agent/configuration.mdx b/apps/docs/contributing/services/agent/configuration.mdx index 058943fb7a..8d2618f2de 100644 --- a/apps/docs/contributing/services/agent/configuration.mdx +++ b/apps/docs/contributing/services/agent/configuration.mdx @@ -9,7 +9,7 @@ Inside the config file, you may use `${ENV_NAME}` to reference environment varia ## $schema -As the agent is in active development, the configuration spec might change frequently and this document could be outdated. +As the agent is in active development, the configuration spec might change frequently and this document could be outdated. The generated and up to date json schema can be found [on GitHub](https://github.com/unkeyed/unkey/blob/main/apps/agent/schema.json). @@ -50,7 +50,7 @@ The generated and up to date json schema can be found [on GitHub](https://github Enable tracing to an external sink. - + Enable axiom.co The dataset to send traces to. @@ -106,30 +106,6 @@ The generated and up to date json schema can be found [on GitHub](https://github - - - - The token to use for http authentication - - - - - The token to use for tinybird authentication - - - Interval in seconds to flush events - - - Size of the buffer, if the buffer is full, new events will not be dropped but - the agent will block until there is space in the buffer. - - - Size of the batch to send to tinybird. - - - - - @@ -225,15 +201,6 @@ The generated and up to date json schema can be found [on GitHub](https://github "ratelimit": { "authToken": "${AUTH_TOKEN}" }, - "eventRouter": { - "authToken": "${AUTH_TOKEN}", - "tinybird": { - "token": "${TINYBIRD_TOKEN}", - "batchSize": 1000, - "flushInterval": 1, - "bufferSize": 10000 - } - }, "vault": { "s3Url": "${VAULT_S3_URL}", "s3Bucket": "${VAULT_S3_BUCKET}", diff --git a/apps/workflows/src/index.ts b/apps/workflows/src/index.ts index 5eef281d89..34f3ad3069 100644 --- a/apps/workflows/src/index.ts +++ b/apps/workflows/src/index.ts @@ -3,6 +3,6 @@ import type { Env } from "./lib/env"; export default { async scheduled(event: ScheduledEvent, env: Env, _ctx: ExecutionContext) { const instance = await env.REFILL_REMAINING.create(); - console.log(JSON.stringify({ event, instance })); + console.info(JSON.stringify({ event, instance })); }, }; diff --git a/deployment/docker-compose.yaml b/deployment/docker-compose.yaml index 49c9e2d1a1..1d0e65ebd3 100644 --- a/deployment/docker-compose.yaml +++ b/deployment/docker-compose.yaml @@ -53,7 +53,6 @@ services: VAULT_S3_ACCESS_KEY_ID: "minio_root_user" VAULT_S3_ACCESS_KEY_SECRET: "minio_root_password" VAULT_MASTER_KEYS: "Ch9rZWtfMmdqMFBJdVhac1NSa0ZhNE5mOWlLSnBHenFPENTt7an5MRogENt9Si6wms4pQ2XIvqNSIgNpaBenJmXgcInhu6Nfv2U=" - TINYBIRD_TOKEN: "I can't wait until we use clickhouse for local development" CLICKHOUSE_URL: "clickhouse://default:password@clickhouse:9000"