diff --git a/go/Tiltfile b/go/Tiltfile index 67c001275a..f4b0b6294c 100644 --- a/go/Tiltfile +++ b/go/Tiltfile @@ -171,7 +171,7 @@ if start_api: resource_deps=api_deps, labels=['unkey'], auto_init=True, - trigger_mode=TRIGGER_MODE_AUTO if debug_mode else TRIGGER_MODE_MANUAL + trigger_mode=TRIGGER_MODE_MANUAL if debug_mode else TRIGGER_MODE_AUTO ) # Gateway service (1 replica) @@ -204,7 +204,7 @@ if start_gw: resource_deps=gw_deps, labels=['unkey'], auto_init=True, - trigger_mode=TRIGGER_MODE_AUTO if debug_mode else TRIGGER_MODE_MANUAL + trigger_mode=TRIGGER_MODE_MANUAL if debug_mode else TRIGGER_MODE_AUTO ) # Ctrl service (1 replica) @@ -238,7 +238,7 @@ if start_ctrl: resource_deps=ctrl_deps, labels=['unkey'], auto_init=True, - trigger_mode=TRIGGER_MODE_AUTO if debug_mode else TRIGGER_MODE_MANUAL + trigger_mode=TRIGGER_MODE_MANUAL if debug_mode else TRIGGER_MODE_AUTO ) # Metald service (1 replica) diff --git a/go/apps/api/routes/chproxy_metrics/handler.go b/go/apps/api/routes/chproxy_metrics/handler.go index b0bcc9ac7f..260f38b7df 100644 --- a/go/apps/api/routes/chproxy_metrics/handler.go +++ b/go/apps/api/routes/chproxy_metrics/handler.go @@ -59,7 +59,7 @@ func (h *Handler) Handle(ctx context.Context, s *zen.Session) error { // Buffer all events to ClickHouse for _, event := range events { - h.ClickHouse.BufferApiRequest(event) + h.ClickHouse.BufferRequest(event) } return s.JSON(http.StatusOK, map[string]string{"status": "OK"}) diff --git a/go/apps/api/routes/services.go b/go/apps/api/routes/services.go index 1a452b4b34..2de61bb143 100644 --- a/go/apps/api/routes/services.go +++ b/go/apps/api/routes/services.go @@ -7,17 +7,12 @@ import ( "github.com/unkeyed/unkey/go/internal/services/ratelimit" "github.com/unkeyed/unkey/go/internal/services/usagelimiter" "github.com/unkeyed/unkey/go/pkg/clickhouse" - "github.com/unkeyed/unkey/go/pkg/clickhouse/schema" "github.com/unkeyed/unkey/go/pkg/db" "github.com/unkeyed/unkey/go/pkg/otel/logging" "github.com/unkeyed/unkey/go/pkg/vault" "github.com/unkeyed/unkey/go/pkg/zen/validation" ) -type EventBuffer interface { - BufferApiRequest(schema.ApiRequestV1) -} - type Services struct { Logger logging.Logger Database db.Database diff --git a/go/apps/gw/router/gateway_proxy/handler.go b/go/apps/gw/router/gateway_proxy/handler.go index a0142c2f9e..547fc85b44 100644 --- a/go/apps/gw/router/gateway_proxy/handler.go +++ b/go/apps/gw/router/gateway_proxy/handler.go @@ -35,7 +35,7 @@ func (h *Handler) Handle(ctx context.Context, sess *server.Session) error { // Strip port from hostname for database lookup (Host header may include port) hostname := routing.ExtractHostname(req) - config, err := h.RoutingService.GetConfig(ctx, hostname) + configWithWorkspace, err := h.RoutingService.GetConfig(ctx, hostname) if err != nil { return fault.Wrap(err, fault.Code(codes.Gateway.Routing.ConfigNotFound.URN()), @@ -44,6 +44,10 @@ func (h *Handler) Handle(ctx context.Context, sess *server.Session) error { ) } + // Set workspace ID in session + sess.WorkspaceID = configWithWorkspace.WorkspaceID + config := configWithWorkspace.Config + // Handle request validation if configured if h.Validator != nil { err = h.Validator.Validate(ctx, sess, config) @@ -68,8 +72,13 @@ func (h *Handler) Handle(ctx context.Context, sess *server.Session) error { ) } - // Forward the request using the proxy service - err = h.Proxy.Forward(ctx, *targetURL, sess.ResponseWriter(), req) + // Forward the request using the proxy service with response capture + captureWriter, captureFunc := sess.CaptureResponseWriter() + err = h.Proxy.Forward(ctx, *targetURL, captureWriter, req) + + // Capture the response data back to session after forwarding + captureFunc() + if err != nil { return fault.Wrap(err, fault.Code(codes.Gateway.Proxy.ProxyForwardFailed.URN()), diff --git a/go/apps/gw/server/middleware_errors.go b/go/apps/gw/server/middleware_errors.go index 132195198a..434710f92d 100644 --- a/go/apps/gw/server/middleware_errors.go +++ b/go/apps/gw/server/middleware_errors.go @@ -38,6 +38,9 @@ func WithErrorHandling(logger logging.Logger) Middleware { return nil } + // Store the original error for metrics logging + s.SetError(err) + // Get the error URN from the error urn, ok := fault.GetCode(err) if !ok { diff --git a/go/apps/gw/server/middleware_metrics.go b/go/apps/gw/server/middleware_metrics.go index f1c2cd2853..030177e928 100644 --- a/go/apps/gw/server/middleware_metrics.go +++ b/go/apps/gw/server/middleware_metrics.go @@ -6,12 +6,11 @@ import ( "strings" "github.com/unkeyed/unkey/go/pkg/clickhouse/schema" - "github.com/unkeyed/unkey/go/pkg/fault" ) // EventBuffer defines the interface for buffering events to be sent to ClickHouse. type EventBuffer interface { - BufferApiRequest(schema.ApiRequestV1) + BufferApiRequest(schema.ApiRequestV2) } // WithMetrics returns middleware that collects metrics about each request, @@ -52,49 +51,48 @@ func WithMetrics(eventBuffer EventBuffer, region string) Middleware { // Buffer to ClickHouse if enabled // We don't need this ATM - // if eventBuffer != nil && s.r.Header.Get("X-Unkey-Metrics") != "disabled" { - // // Extract IP address from headers - // ips := strings.Split(s.r.Header.Get("X-Forwarded-For"), ",") - // ipAddress := "" - // if len(ips) > 0 { - // ipAddress = strings.TrimSpace(ips[0]) - // } - // if ipAddress == "" { - // ipAddress = s.Location() - // } + if eventBuffer != nil { + // Extract IP address from headers + ips := strings.Split(s.r.Header.Get("X-Forwarded-For"), ",") + ipAddress := "" + if len(ips) > 0 { + ipAddress = strings.TrimSpace(ips[0]) + } + if ipAddress == "" { + ipAddress = s.Location() + } - // eventBuffer.BufferApiRequest(schema.ApiRequestV1{ - // WorkspaceID: s.WorkspaceID, - // RequestID: s.RequestID(), - // Time: s.startTime.UnixMilli(), - // Host: s.r.Host, - // Method: s.r.Method, - // Path: s.r.URL.Path, - // RequestHeaders: requestHeaders, - // RequestBody: string(s.requestBody), - // ResponseStatus: s.responseStatus, - // ResponseHeaders: responseHeaders, - // ResponseBody: string(s.responseBody), - // Error: getErrorMessage(nextErr), - // ServiceLatency: s.Latency().Milliseconds(), - // UserAgent: s.UserAgent(), - // IpAddress: ipAddress, - // Country: "", - // City: "", - // Colo: "", - // Continent: "", - // }) - // } + eventBuffer.BufferApiRequest(schema.ApiRequestV2{ + WorkspaceID: s.WorkspaceID, + RequestID: s.RequestID(), + Time: s.startTime.UnixMilli(), + Host: s.r.Host, + Method: s.r.Method, + Path: s.r.URL.Path, + RequestHeaders: requestHeaders, + RequestBody: string(s.requestBody), + ResponseStatus: int32(s.responseStatus), + ResponseHeaders: responseHeaders, + ResponseBody: string(s.responseBody), + Error: getErrorMessage(s.error), + ServiceLatency: s.Latency().Milliseconds(), + UserAgent: s.UserAgent(), + IpAddress: ipAddress, + Region: region, + }) + } return nextErr } } } -// getErrorMessage extracts the user-facing error message if available. +// getErrorMessage extracts the internal error message for logging. func getErrorMessage(err error) string { if err == nil { return "" } - return fault.UserFacingMessage(err) + + // Fallback for non-fault errors + return err.Error() } diff --git a/go/apps/gw/server/server.go b/go/apps/gw/server/server.go index ae37b95d85..46d3dce81e 100644 --- a/go/apps/gw/server/server.go +++ b/go/apps/gw/server/server.go @@ -202,9 +202,24 @@ func (s *Server) WrapHandler(handler HandleFunc, middlewares []Middleware) http. s.returnSession(sess) }() - sess.init(w, r) + err := sess.init(w, r) + if err != nil { + // Apply default middleware chain for session initialization errors + handleFn := func(ctx context.Context, session *Session) error { + return err // Return the session init error + } + + // Apply the same middleware chain + var wrappedHandler HandleFunc = handleFn + for i := len(middlewares) - 1; i >= 0; i-- { + wrappedHandler = middlewares[i](wrappedHandler) + } + + _ = wrappedHandler(r.Context(), sess) + return + } - err := handle(r.Context(), sess) + err = handle(r.Context(), sess) if err != nil { // Error should have been handled by error middleware // If we get here, something went wrong diff --git a/go/apps/gw/server/session.go b/go/apps/gw/server/session.go index 6fdbdb77bd..29f32b7984 100644 --- a/go/apps/gw/server/session.go +++ b/go/apps/gw/server/session.go @@ -1,6 +1,7 @@ package server import ( + "bytes" "encoding/json" "io" "log" @@ -28,15 +29,43 @@ type Session struct { requestBody []byte responseStatus int responseBody []byte + error error } // init initializes the session with a new request and response writer. -func (s *Session) init(w http.ResponseWriter, r *http.Request) { +func (s *Session) init(w http.ResponseWriter, r *http.Request) error { s.requestID = uid.New(uid.RequestPrefix) s.startTime = time.Now() s.w = w s.r = r s.WorkspaceID = "" + + // Read and cache the request body so metrics middleware can access it even on early errors. + // We need to replace r.Body with a fresh reader afterwards so other middleware + // can still read the body if necessary. + var err error + s.requestBody, err = io.ReadAll(s.r.Body) + closeErr := s.r.Body.Close() + + // Handle read errors + if err != nil { + return fault.Wrap(err, + fault.Internal("unable to read request body"), + fault.Public("The request body could not be read."), + ) + } + + // Handle close error + if closeErr != nil { + return fault.Wrap(closeErr, + fault.Internal("failed to close request body"), + fault.Public("An error occurred processing the request."), + ) + } + + // Replace body with a fresh reader for subsequent middleware + s.r.Body = io.NopCloser(bytes.NewReader(s.requestBody)) + return nil } // RequestID returns the unique request ID for this session. @@ -59,6 +88,30 @@ func (s *Session) ResponseWriter() http.ResponseWriter { return s.w } +// CaptureResponseWriter returns a ResponseWriter that captures the response body. +// It returns the wrapper and a function to retrieve the captured data. +func (s *Session) CaptureResponseWriter() (http.ResponseWriter, func()) { + wrapper := &captureResponseWriter{ + ResponseWriter: s.w, + statusCode: http.StatusOK, // Default to 200 if not set + } + + // Return a function to store captured data back in session + capture := func() { + s.responseStatus = wrapper.statusCode + s.responseBody = wrapper.body + } + + return wrapper, capture +} + +// SetError stores the error for logging purposes. +func (s *Session) SetError(err error) { + if s.error == nil { + s.error = err + } +} + // UserAgent returns the User-Agent header from the request. func (s *Session) UserAgent() string { return s.r.UserAgent() @@ -154,6 +207,7 @@ func (s *Session) reset() { s.requestBody = nil s.responseStatus = 0 s.responseBody = nil + s.error = nil } // wrapResponseWriter wraps http.ResponseWriter to capture the status code. @@ -180,3 +234,32 @@ func (w *wrapResponseWriter) Write(b []byte) (int, error) { return w.ResponseWriter.Write(b) } + +// captureResponseWriter wraps http.ResponseWriter to capture the status code and response body. +type captureResponseWriter struct { + http.ResponseWriter + statusCode int + body []byte + written bool +} + +func (w *captureResponseWriter) WriteHeader(code int) { + if w.written { + return // Already written, don't write again + } + + w.statusCode = code + w.written = true + w.ResponseWriter.WriteHeader(code) +} + +func (w *captureResponseWriter) Write(b []byte) (int, error) { + if !w.written { + w.WriteHeader(http.StatusOK) + } + + // Capture the body + w.body = append(w.body, b...) + + return w.ResponseWriter.Write(b) +} diff --git a/go/apps/gw/services/caches/caches.go b/go/apps/gw/services/caches/caches.go index 88040c644d..4abed672f6 100644 --- a/go/apps/gw/services/caches/caches.go +++ b/go/apps/gw/services/caches/caches.go @@ -6,7 +6,7 @@ import ( "time" validator "github.com/pb33f/libopenapi-validator" - partitionv1 "github.com/unkeyed/unkey/go/gen/proto/partition/v1" + "github.com/unkeyed/unkey/go/apps/gw/services/routing" "github.com/unkeyed/unkey/go/pkg/cache" "github.com/unkeyed/unkey/go/pkg/cache/middleware" "github.com/unkeyed/unkey/go/pkg/clock" @@ -19,7 +19,7 @@ import ( // Each field represents a specialized cache for a specific data entity. type Caches struct { // HostName -> Config - GatewayConfig cache.Cache[string, *partitionv1.GatewayConfig] + GatewayConfig cache.Cache[string, routing.ConfigWithWorkspace] // DeploymentID -> OpenAPI Spec Validator OpenAPISpec cache.Cache[string, validator.Validator] @@ -72,7 +72,7 @@ type Config struct { // // Use the caches // key, err := caches.KeyByHash.Get(ctx, "some-hash") func New(config Config) (Caches, error) { - gatewayConfig, err := cache.New(cache.Config[string, *partitionv1.GatewayConfig]{ + gatewayConfig, err := cache.New(cache.Config[string, routing.ConfigWithWorkspace]{ Fresh: time.Second * 5, Stale: time.Second * 30, Logger: config.Logger, diff --git a/go/apps/gw/services/routing/interface.go b/go/apps/gw/services/routing/interface.go index 5aaaa7572f..d7437d5618 100644 --- a/go/apps/gw/services/routing/interface.go +++ b/go/apps/gw/services/routing/interface.go @@ -14,8 +14,8 @@ import ( // Service handles gateway configuration lookup and VM selection. type Service interface { - // GetTargetByHost finds gateway configuration based on the request host - GetConfig(ctx context.Context, host string) (*partitionv1.GatewayConfig, error) + // GetConfig finds gateway configuration and workspace ID based on the request host + GetConfig(ctx context.Context, host string) (*ConfigWithWorkspace, error) // SelectVM picks an available VM from the gateway's VM list SelectVM(ctx context.Context, config *partitionv1.GatewayConfig) (*url.URL, error) @@ -27,6 +27,6 @@ type Config struct { Logger logging.Logger Clock clock.Clock - GatewayConfigCache cache.Cache[string, *partitionv1.GatewayConfig] + GatewayConfigCache cache.Cache[string, ConfigWithWorkspace] VMCache cache.Cache[string, pdb.Vm] } diff --git a/go/apps/gw/services/routing/service.go b/go/apps/gw/services/routing/service.go index 590cbfa7a6..96617a1caa 100644 --- a/go/apps/gw/services/routing/service.go +++ b/go/apps/gw/services/routing/service.go @@ -24,7 +24,7 @@ type service struct { db db.Database logger logging.Logger - gatewayConfigCache cache.Cache[string, *partitionv1.GatewayConfig] + gatewayConfigCache cache.Cache[string, ConfigWithWorkspace] vmCache cache.Cache[string, pdb.Vm] } @@ -49,32 +49,27 @@ func New(config Config) (*service, error) { }, nil } -// GetTarget retrieves target configuration by ID. -func (s *service) GetConfig(ctx context.Context, host string) (*partitionv1.GatewayConfig, error) { - config, hit, err := s.gatewayConfigCache.SWR(ctx, host, func(ctx context.Context) (*partitionv1.GatewayConfig, error) { +// GetConfig retrieves gateway configuration and workspace ID by hostname. +func (s *service) GetConfig(ctx context.Context, host string) (*ConfigWithWorkspace, error) { + config, hit, err := s.gatewayConfigCache.SWR(ctx, host, func(ctx context.Context) (ConfigWithWorkspace, error) { gatewayRow, err := pdb.Query.FindGatewayByHostname(ctx, s.db.RO(), host) if err != nil { - return nil, err + return ConfigWithWorkspace{}, err } // Unmarshal the protobuf blob from the database var gatewayConfig partitionv1.GatewayConfig if err := protojson.Unmarshal(gatewayRow.Config, &gatewayConfig); err != nil { - return nil, fmt.Errorf("failed to unmarshal gateway config: %w", err) + return ConfigWithWorkspace{}, fmt.Errorf("failed to unmarshal gateway config: %w", err) } - return &gatewayConfig, nil + return ConfigWithWorkspace{ + Config: &gatewayConfig, + WorkspaceID: gatewayRow.WorkspaceID, + }, nil }, caches.DefaultFindFirstOp) - if err != nil { - if db.IsNotFound(err) { - return nil, fault.Wrap(err, - fault.Code(codes.Gateway.Routing.ConfigNotFound.URN()), - fault.Internal("no gateway configuration found for hostname"), - fault.Public("No configuration found for this domain"), - ) - } - + if err != nil && !db.IsNotFound(err) { return nil, fault.Wrap(err, fault.Code(codes.App.Internal.ServiceUnavailable.URN()), fault.Internal("error loading gateway configuration"), @@ -82,14 +77,15 @@ func (s *service) GetConfig(ctx context.Context, host string) (*partitionv1.Gate ) } - if hit == cache.Null { - return nil, fault.New("gateway config null", + if db.IsNotFound(err) || hit == cache.Null { + return nil, fault.Wrap(err, fault.Code(codes.Gateway.Routing.ConfigNotFound.URN()), + fault.Internal("no gateway configuration found for hostname"), fault.Public("No configuration found for this domain"), ) } - return config, nil + return &config, nil } // SelectVM picks an available VM from the gateway's VM list using random selection. diff --git a/go/apps/gw/services/routing/types.go b/go/apps/gw/services/routing/types.go new file mode 100644 index 0000000000..d97dcd3089 --- /dev/null +++ b/go/apps/gw/services/routing/types.go @@ -0,0 +1,9 @@ +package routing + +import partitionv1 "github.com/unkeyed/unkey/go/gen/proto/partition/v1" + +// ConfigWithWorkspace holds gateway configuration and workspace ID. +type ConfigWithWorkspace struct { + Config *partitionv1.GatewayConfig + WorkspaceID string +} diff --git a/go/gen/proto/metald/v1/metald.pb.go b/go/gen/proto/metald/v1/metald.pb.go index 1fd9b87bb1..c93e23fdf6 100644 --- a/go/gen/proto/metald/v1/metald.pb.go +++ b/go/gen/proto/metald/v1/metald.pb.go @@ -24,7 +24,7 @@ var File_metald_v1_metald_proto protoreflect.FileDescriptor const file_metald_v1_metald_proto_rawDesc = "" + "\n" + - "\x16metald/v1/metald.proto\x12\tmetald.v1\x1a\x12metald/v1/vm.proto\x1a\x1ametald/v1/deployment.proto2\xe0\a\n" + + "\x16metald/v1/metald.proto\x12\tmetald.v1\x1a\x1ametald/v1/deployment.proto\x1a\x12metald/v1/vm.proto2\xe0\a\n" + "\tVmService\x12[\n" + "\x10CreateDeployment\x12\".metald.v1.CreateDeploymentRequest\x1a#.metald.v1.CreateDeploymentResponse\x12[\n" + "\x10UpdateDeployment\x12\".metald.v1.UpdateDeploymentRequest\x1a#.metald.v1.UpdateDeploymentResponse\x12[\n" + @@ -108,8 +108,8 @@ func file_metald_v1_metald_proto_init() { if File_metald_v1_metald_proto != nil { return } - file_metald_v1_vm_proto_init() file_metald_v1_deployment_proto_init() + file_metald_v1_vm_proto_init() type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/go/pkg/clickhouse/client.go b/go/pkg/clickhouse/client.go index 258b2d6ce0..c021519af6 100644 --- a/go/pkg/clickhouse/client.go +++ b/go/pkg/clickhouse/client.go @@ -22,6 +22,7 @@ type clickhouse struct { // Batched processors for different event types requests *batch.BatchProcessor[schema.ApiRequestV1] + apiRequests *batch.BatchProcessor[schema.ApiRequestV2] keyVerifications *batch.BatchProcessor[schema.KeyVerificationRequestV1] ratelimits *batch.BatchProcessor[schema.RatelimitRequestV1] } @@ -96,7 +97,7 @@ func New(config Config) (*clickhouse, error) { logger: config.Logger, requests: batch.New(batch.Config[schema.ApiRequestV1]{ - Name: "api_requests", + Name: "requests", Drop: true, BatchSize: 50_000, BufferSize: 200_000, @@ -113,6 +114,24 @@ func New(config Config) (*clickhouse, error) { } }, }), + apiRequests: batch.New(batch.Config[schema.ApiRequestV2]{ + Name: "api_requests", + Drop: true, + BatchSize: 50_000, + BufferSize: 200_000, + FlushInterval: 5 * time.Second, + Consumers: 2, + Flush: func(ctx context.Context, rows []schema.ApiRequestV2) { + table := "default.api_requests_raw_v2" + err := flush(ctx, conn, table, rows) + if err != nil { + config.Logger.Error("failed to flush batch", + "table", table, + "err", err.Error(), + ) + } + }, + }), keyVerifications: batch.New[schema.KeyVerificationRequestV1]( batch.Config[schema.KeyVerificationRequestV1]{ Name: "key_verifications", @@ -178,7 +197,7 @@ func (c *clickhouse) Shutdown(ctx context.Context) error { return nil } -// BufferApiRequest adds an API request event to the buffer for batch processing. +// BufferRequest adds an API request event to the buffer for batch processing. // The event will be flushed to ClickHouse automatically based on the configured // batch size and flush interval. // @@ -188,7 +207,7 @@ func (c *clickhouse) Shutdown(ctx context.Context) error { // // Example: // -// ch.BufferApiRequest(schema.ApiRequestV1{ +// ch.BufferRequest(schema.ApiRequestV1{ // RequestID: requestID, // Time: time.Now().UnixMilli(), // WorkspaceID: workspaceID, @@ -197,10 +216,33 @@ func (c *clickhouse) Shutdown(ctx context.Context) error { // Path: r.URL.Path, // ResponseStatus: status, // }) -func (c *clickhouse) BufferApiRequest(req schema.ApiRequestV1) { +func (c *clickhouse) BufferRequest(req schema.ApiRequestV1) { c.requests.Buffer(req) } +// BufferApiRequest adds an API request event to the buffer for batch processing. +// The event will be flushed to ClickHouse automatically based on the configured +// batch size and flush interval. +// +// This method is non-blocking if the buffer has available capacity. If the buffer +// is full and the Drop option is enabled (which is the default), the event will +// be silently dropped. +// +// Example: +// +// ch.BufferApiRequest(schema.ApiRequestV2{ +// RequestID: requestID, +// Time: time.Now().UnixMilli(), +// WorkspaceID: workspaceID, +// Host: r.Host, +// Method: r.Method, +// Path: r.URL.Path, +// ResponseStatus: status, +// }) +func (c *clickhouse) BufferApiRequest(req schema.ApiRequestV2) { + c.apiRequests.Buffer(req) +} + // BufferKeyVerification adds a key verification event to the buffer for batch processing. // The event will be flushed to ClickHouse automatically based on the configured // batch size and flush interval. diff --git a/go/pkg/clickhouse/doc.go b/go/pkg/clickhouse/doc.go index b59aa89833..aa15fb9a93 100644 --- a/go/pkg/clickhouse/doc.go +++ b/go/pkg/clickhouse/doc.go @@ -25,7 +25,7 @@ // } // // // Buffer events for batch processing -// ch.BufferApiRequest(schema.ApiRequestV1{ +// ch.BufferRequest(schema.ApiRequestV1{ // RequestID: "req_123", // Time: time.Now().UnixMilli(), // WorkspaceID: "ws_abc", diff --git a/go/pkg/clickhouse/interface.go b/go/pkg/clickhouse/interface.go index 62dc3b7be6..0133a4185b 100644 --- a/go/pkg/clickhouse/interface.go +++ b/go/pkg/clickhouse/interface.go @@ -14,9 +14,13 @@ import ( // This interface allows for different implementations, such as a real // ClickHouse client or a no-op implementation for testing or development. type Bufferer interface { + // BufferRequest adds an API request event to the buffer. + // These are typically HTTP requests to the API with request and response details. + BufferRequest(schema.ApiRequestV1) + // BufferApiRequest adds an API request event to the buffer. // These are typically HTTP requests to the API with request and response details. - BufferApiRequest(schema.ApiRequestV1) + BufferApiRequest(schema.ApiRequestV2) // BufferKeyVerification adds a key verification event to the buffer. // These represent API key validation operations with their outcomes. diff --git a/go/pkg/clickhouse/noop.go b/go/pkg/clickhouse/noop.go index 5b929c1b54..55f9b022ba 100644 --- a/go/pkg/clickhouse/noop.go +++ b/go/pkg/clickhouse/noop.go @@ -15,8 +15,13 @@ type noop struct{} var _ Bufferer = (*noop)(nil) var _ Bufferer = (*noop)(nil) +// BufferRequest implements the Bufferer interface but discards the event. +func (n *noop) BufferRequest(schema.ApiRequestV1) { + // Intentionally empty - discards the event +} + // BufferApiRequest implements the Bufferer interface but discards the event. -func (n *noop) BufferApiRequest(schema.ApiRequestV1) { +func (n *noop) BufferApiRequest(schema.ApiRequestV2) { // Intentionally empty - discards the event } diff --git a/go/pkg/partition/db/gateway_find_config_by_hostname.sql_generated.go b/go/pkg/partition/db/gateway_find_config_by_hostname.sql_generated.go index 5c129edc4b..7fded06acf 100644 --- a/go/pkg/partition/db/gateway_find_config_by_hostname.sql_generated.go +++ b/go/pkg/partition/db/gateway_find_config_by_hostname.sql_generated.go @@ -10,24 +10,25 @@ import ( ) const findGatewayByHostname = `-- name: FindGatewayByHostname :one -SELECT hostname, config +SELECT hostname, config, workspace_id FROM gateways WHERE hostname = ? ` type FindGatewayByHostnameRow struct { - Hostname string `db:"hostname"` - Config []byte `db:"config"` + Hostname string `db:"hostname"` + Config []byte `db:"config"` + WorkspaceID string `db:"workspace_id"` } // FindGatewayByHostname // -// SELECT hostname, config +// SELECT hostname, config, workspace_id // FROM gateways // WHERE hostname = ? func (q *Queries) FindGatewayByHostname(ctx context.Context, db DBTX, hostname string) (FindGatewayByHostnameRow, error) { row := db.QueryRowContext(ctx, findGatewayByHostname, hostname) var i FindGatewayByHostnameRow - err := row.Scan(&i.Hostname, &i.Config) + err := row.Scan(&i.Hostname, &i.Config, &i.WorkspaceID) return i, err } diff --git a/go/pkg/partition/db/querier_generated.go b/go/pkg/partition/db/querier_generated.go index 9f4d1f70ef..9327b3a0d3 100644 --- a/go/pkg/partition/db/querier_generated.go +++ b/go/pkg/partition/db/querier_generated.go @@ -19,7 +19,7 @@ type Querier interface { FindCertificateByHostname(ctx context.Context, db DBTX, hostname string) (Certificate, error) //FindGatewayByHostname // - // SELECT hostname, config + // SELECT hostname, config, workspace_id // FROM gateways // WHERE hostname = ? FindGatewayByHostname(ctx context.Context, db DBTX, hostname string) (FindGatewayByHostnameRow, error) diff --git a/go/pkg/partition/db/queries/gateway_find_config_by_hostname.sql b/go/pkg/partition/db/queries/gateway_find_config_by_hostname.sql index 2d7f6a0607..ab07a834b7 100644 --- a/go/pkg/partition/db/queries/gateway_find_config_by_hostname.sql +++ b/go/pkg/partition/db/queries/gateway_find_config_by_hostname.sql @@ -1,4 +1,4 @@ -- name: FindGatewayByHostname :one -SELECT hostname, config +SELECT hostname, config, workspace_id FROM gateways WHERE hostname = ?; diff --git a/go/pkg/zen/middleware_metrics.go b/go/pkg/zen/middleware_metrics.go index a177fad598..9b23e74c23 100644 --- a/go/pkg/zen/middleware_metrics.go +++ b/go/pkg/zen/middleware_metrics.go @@ -14,7 +14,7 @@ import ( ) type EventBuffer interface { - BufferApiRequest(schema.ApiRequestV1) + BufferRequest(schema.ApiRequestV1) } type redactionRule struct { @@ -93,7 +93,7 @@ func WithMetrics(eventBuffer EventBuffer) Middleware { ipAddress = ips[0] } - eventBuffer.BufferApiRequest(schema.ApiRequestV1{ + eventBuffer.BufferRequest(schema.ApiRequestV1{ WorkspaceID: s.WorkspaceID, RequestID: s.RequestID(), Time: start.UnixMilli(), diff --git a/go/proto/assetmanagerd/v1/asset.proto b/go/proto/assetmanagerd/v1/asset.proto index 5d18a640e7..0e1ed8b043 100644 --- a/go/proto/assetmanagerd/v1/asset.proto +++ b/go/proto/assetmanagerd/v1/asset.proto @@ -78,7 +78,7 @@ message Asset { // Metadata map labels = 9; string created_by = 10; // e.g., "builderd", "manual" - int64 created_at = 11; // Unix timestamp + int64 created_at = 11; // Unix timestamp int64 last_accessed_at = 12; // Reference counting for GC @@ -129,7 +129,9 @@ message RegisterAssetRequest { string id = 11; } -message RegisterAssetResponse { Asset asset = 1; } +message RegisterAssetResponse { + Asset asset = 1; +} message GetAssetRequest { string id = 1; @@ -168,7 +170,7 @@ message ListAssetsResponse { message AcquireAssetRequest { string asset_id = 1; string acquired_by = 2; // e.g., "vm-123" - int64 ttl_seconds = 3; // Optional auto-release after TTL + int64 ttl_seconds = 3; // Optional auto-release after TTL } message AcquireAssetResponse { @@ -176,9 +178,13 @@ message AcquireAssetResponse { string lease_id = 2; // Use this for release } -message ReleaseAssetRequest { string lease_id = 1; } +message ReleaseAssetRequest { + string lease_id = 1; +} -message ReleaseAssetResponse { Asset asset = 1; } +message ReleaseAssetResponse { + Asset asset = 1; +} message DeleteAssetRequest { string id = 1; @@ -208,7 +214,7 @@ message GarbageCollectResponse { message PrepareAssetsRequest { repeated string asset_ids = 1; - string target_path = 2; // e.g., jailer chroot path + string target_path = 2; // e.g., jailer chroot path string prepared_for = 3; // e.g., "vm-123" } diff --git a/go/proto/builderd/v1/builder.proto b/go/proto/builderd/v1/builder.proto index c05fc55a87..ff4efd07f8 100644 --- a/go/proto/builderd/v1/builder.proto +++ b/go/proto/builderd/v1/builder.proto @@ -291,7 +291,6 @@ message StreamBuildLogsResponse { map metadata = 5; } - // Request/Response messages message CreateBuildRequest { BuildConfig config = 1; @@ -348,7 +347,6 @@ message StreamBuildLogsRequest { bool follow = 2; // Continue streaming new logs } - message GetBuildStatsRequest { string tenant_id = 1; google.protobuf.Timestamp start_time = 2; diff --git a/go/proto/ctrl/v1/acme.proto b/go/proto/ctrl/v1/acme.proto index aaf437b1b0..bd64abfa83 100644 --- a/go/proto/ctrl/v1/acme.proto +++ b/go/proto/ctrl/v1/acme.proto @@ -2,17 +2,17 @@ syntax = "proto3"; package ctrl.v1; -option go_package = "github.com/unkeyed/unkey/go/gen/proto/ctrl/v1;ctrlv1"; - import "google/protobuf/timestamp.proto"; +option go_package = "github.com/unkeyed/unkey/go/gen/proto/ctrl/v1;ctrlv1"; + message HandleCertificateVerificationRequest { - string domain = 1; - string token = 2; + string domain = 1; + string token = 2; } message HandleCertificateVerificationResponse { - string token = 1; + string token = 1; } service AcmeService { diff --git a/go/proto/ctrl/v1/build.proto b/go/proto/ctrl/v1/build.proto index 83dfe66e06..d75f83838d 100644 --- a/go/proto/ctrl/v1/build.proto +++ b/go/proto/ctrl/v1/build.proto @@ -2,10 +2,10 @@ syntax = "proto3"; package ctrl.v1; -option go_package = "github.com/unkeyed/unkey/go/gen/proto/ctrl/v1;ctrlv1"; - import "google/protobuf/timestamp.proto"; +option go_package = "github.com/unkeyed/unkey/go/gen/proto/ctrl/v1;ctrlv1"; + // Build status enum enum BuildStatus { BUILD_STATUS_UNSPECIFIED = 0; @@ -40,25 +40,25 @@ message Build { string workspace_id = 2; string project_id = 3; string version_id = 4; - + // Build details BuildStatus status = 5; - string error_message = 6; // For failed builds - + string error_message = 6; // For failed builds + // Timestamps google.protobuf.Timestamp created_at = 7; google.protobuf.Timestamp started_at = 8; google.protobuf.Timestamp completed_at = 9; google.protobuf.Timestamp updated_at = 10; - + // Build metadata - string rootfs_image_id = 11; // Output rootfs image + string rootfs_image_id = 11; // Output rootfs image } service BuildService { // Create a new build rpc CreateBuild(CreateBuildRequest) returns (CreateBuildResponse) {} - + // Get build details rpc GetBuild(GetBuildRequest) returns (GetBuildResponse) {} -} \ No newline at end of file +} diff --git a/go/proto/ctrl/v1/deployment.proto b/go/proto/ctrl/v1/deployment.proto index 45e19eab8b..42a2083b4b 100644 --- a/go/proto/ctrl/v1/deployment.proto +++ b/go/proto/ctrl/v1/deployment.proto @@ -14,6 +14,7 @@ enum DeploymentStatus { DEPLOYMENT_STATUS_READY = 5; DEPLOYMENT_STATUS_FAILED = 6; } + // Source type for deployment creation enum SourceType { SOURCE_TYPE_UNSPECIFIED = 0; @@ -28,8 +29,6 @@ message CreateDeploymentRequest { // Source information string environment_slug = 4; - - SourceType source_type = 5; string docker_image = 6; @@ -45,7 +44,6 @@ message CreateDeploymentRequest { // Keyspace ID for authentication optional string keyspace_id = 13; - } message CreateDeploymentResponse { diff --git a/go/proto/ctrl/v1/service.proto b/go/proto/ctrl/v1/service.proto index 1987c52616..c292927984 100644 --- a/go/proto/ctrl/v1/service.proto +++ b/go/proto/ctrl/v1/service.proto @@ -14,4 +14,4 @@ message LivenessResponse { service CtrlService { rpc Liveness(LivenessRequest) returns (LivenessResponse) {} -} \ No newline at end of file +} diff --git a/go/proto/metald/v1/deployment.proto b/go/proto/metald/v1/deployment.proto index d1b3194ce3..62c0dfc8e7 100644 --- a/go/proto/metald/v1/deployment.proto +++ b/go/proto/metald/v1/deployment.proto @@ -7,49 +7,49 @@ import "metald/v1/vm.proto"; option go_package = "github.com/unkeyed/unkey/go/gen/proto/metald/v1;metaldv1"; message DeploymentRequest { - string deployment_id = 1; + string deployment_id = 1; - string image = 2; + string image = 2; - uint32 vm_count = 3; - uint32 cpu = 4; - uint64 memory_size_mib = 5; + uint32 vm_count = 3; + uint32 cpu = 4; + uint64 memory_size_mib = 5; } message CreateDeploymentRequest { - DeploymentRequest deployment = 1; + DeploymentRequest deployment = 1; } message CreateDeploymentResponse { - repeated string vm_ids = 1; + repeated string vm_ids = 1; } message UpdateDeploymentRequest { - DeploymentRequest deployment = 1; + DeploymentRequest deployment = 1; } message UpdateDeploymentResponse { - repeated string vm_ids = 1; + repeated string vm_ids = 1; } message DeleteDeploymentRequest { - string deployment_id = 1; + string deployment_id = 1; } message DeleteDeploymentResponse {} message GetDeploymentRequest { - string deployment_id = 1; + string deployment_id = 1; } message GetDeploymentResponse { - string deployment_id = 1; + string deployment_id = 1; - message Vm { - string id = 1; - string host = 2; - VmState state = 3; - uint32 port = 4; - } + message Vm { + string id = 1; + string host = 2; + VmState state = 3; + uint32 port = 4; + } - repeated Vm vms = 2; + repeated Vm vms = 2; } diff --git a/go/proto/metald/v1/metald.proto b/go/proto/metald/v1/metald.proto index 2b641134ec..22f41434cd 100644 --- a/go/proto/metald/v1/metald.proto +++ b/go/proto/metald/v1/metald.proto @@ -2,8 +2,8 @@ syntax = "proto3"; package metald.v1; -import "metald/v1/vm.proto"; import "metald/v1/deployment.proto"; +import "metald/v1/vm.proto"; option go_package = "github.com/unkeyed/unkey/go/gen/proto/metald/v1;metaldv1"; diff --git a/go/proto/metald/v1/network.proto b/go/proto/metald/v1/network.proto index dd1ba9ee8a..4b49e7888e 100644 --- a/go/proto/metald/v1/network.proto +++ b/go/proto/metald/v1/network.proto @@ -43,35 +43,35 @@ message NetworkInterface { // IPv4 network configuration message IPv4Config { - string address = 1; // IPv4 address (e.g., "10.100.1.2") - string netmask = 2; // Network mask (e.g., "255.255.255.0") - string gateway = 3; // Default gateway + string address = 1; // IPv4 address (e.g., "10.100.1.2") + string netmask = 2; // Network mask (e.g., "255.255.255.0") + string gateway = 3; // Default gateway repeated string dns_servers = 4; // DNS servers - bool dhcp = 5; // Use DHCP instead of static config + bool dhcp = 5; // Use DHCP instead of static config } // IPv6 network configuration message IPv6Config { - string address = 1; // IPv6 address (e.g., "fd00::1:2") - int32 prefix_length = 2; // Prefix length (e.g., 64) - string gateway = 3; // Default gateway + string address = 1; // IPv6 address (e.g., "fd00::1:2") + int32 prefix_length = 2; // Prefix length (e.g., 64) + string gateway = 3; // Default gateway repeated string dns_servers = 4; // DNS servers (IPv6 addresses) bool slaac = 5; // Use SLAAC (Stateless Address Autoconfiguration) bool privacy_extensions = 6; // Enable privacy extensions - string link_local = 7; // Link-local address (auto-generated if empty) + string link_local = 7; // Link-local address (auto-generated if empty) } // Network mode for the interface enum NetworkMode { NETWORK_MODE_UNSPECIFIED = 0; NETWORK_MODE_DUAL_STACK = 1; // Both IPv4 and IPv6 - NETWORK_MODE_IPV4_ONLY = 2; // IPv4 only - NETWORK_MODE_IPV6_ONLY = 3; // IPv6 only + NETWORK_MODE_IPV4_ONLY = 2; // IPv4 only + NETWORK_MODE_IPV6_ONLY = 3; // IPv6 only } // Rate limiting configuration message RateLimit { - int64 bandwidth = 1; // Bandwidth in bytes/second + int64 bandwidth = 1; // Bandwidth in bytes/second int64 refill_time = 2; // Token bucket refill time in milliseconds - int64 burst = 3; // Burst size in bytes + int64 burst = 3; // Burst size in bytes } diff --git a/go/proto/metald/v1/vm.proto b/go/proto/metald/v1/vm.proto index 6f3aba89a4..bd9d14e34c 100644 --- a/go/proto/metald/v1/vm.proto +++ b/go/proto/metald/v1/vm.proto @@ -2,12 +2,12 @@ syntax = "proto3"; package metald.v1; -option go_package = "github.com/unkeyed/unkey/go/gen/proto/metald/v1;metaldv1"; - import "google/protobuf/timestamp.proto"; import "metald/v1/network.proto"; import "metald/v1/storage.proto"; +option go_package = "github.com/unkeyed/unkey/go/gen/proto/metald/v1;metaldv1"; + // VM lifecycle states enum VmState { VM_STATE_UNSPECIFIED = 0; @@ -69,8 +69,8 @@ message CreateVmRequest { } message Endpoint { - string host = 1; - uint32 port = 2; + string host = 1; + uint32 port = 2; } message CreateVmResponse { @@ -88,9 +88,13 @@ message DeleteVmRequest { bool force = 2; } -message DeleteVmResponse { bool success = 1; } +message DeleteVmResponse { + bool success = 1; +} -message BootVmRequest { string vm_id = 1; } +message BootVmRequest { + string vm_id = 1; +} message BootVmResponse { VmState state = 2; @@ -110,13 +114,17 @@ message ShutdownVmResponse { VmState state = 2; } -message PauseVmRequest { string vm_id = 1; } +message PauseVmRequest { + string vm_id = 1; +} message PauseVmResponse { VmState state = 2; } -message ResumeVmRequest { string vm_id = 1; } +message ResumeVmRequest { + string vm_id = 1; +} message ResumeVmResponse { VmState state = 2; @@ -133,7 +141,9 @@ message RebootVmResponse { VmState state = 2; } -message GetVmInfoRequest { string vm_id = 1; } +message GetVmInfoRequest { + string vm_id = 1; +} message GetVmInfoResponse { string vm_id = 1; @@ -223,7 +233,6 @@ message BootConfig { map boot_options = 4; } - message ConsoleConfig { // Whether console is enabled bool enabled = 1; diff --git a/go/proto/partition/v1/gateway.proto b/go/proto/partition/v1/gateway.proto index 1711bc5bb9..aeae5af50d 100644 --- a/go/proto/partition/v1/gateway.proto +++ b/go/proto/partition/v1/gateway.proto @@ -7,8 +7,8 @@ option go_package = "github.com/unkeyed/unkey/go/gen/proto/partition/v1;partitio // GatewayConfig contains all configuration needed for a hostname // including deployment metadata and middleware configurations message GatewayConfig { + Project project = 1; - Project project= 1; // Deployment information Deployment deployment = 2; @@ -17,7 +17,6 @@ message GatewayConfig { // Middleware configurations optional AuthConfig auth_config = 4; optional ValidationConfig validation_config = 5; - } message Deployment {