diff --git a/deployment/docker-compose.yaml b/deployment/docker-compose.yaml index 61d9a13b93..d1660433b4 100644 --- a/deployment/docker-compose.yaml +++ b/deployment/docker-compose.yaml @@ -52,6 +52,7 @@ services: - mysql - redis - clickhouse + - tempo environment: UNKEY_HTTP_PORT: 7070 UNKEY_CLUSTER: true @@ -61,6 +62,7 @@ services: UNKEY_CLUSTER_DISCOVERY_REDIS_URL: "redis://redis:6379" UNKEY_DATABASE_PRIMARY_DSN: "mysql://unkey:password@tcp(mysql:3900)/unkey?parseTime=true" UNKEY_CLICKHOUSE_URL: "clickhouse://default:password@clickhouse:9000" + UNKEY_OTEL_OTLP_ENDPOINT: "otel-collector:4318" # Point directly to Tempo redis: image: redis:latest @@ -168,33 +170,72 @@ services: - agent - clickhouse - chproxy + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + container_name: otel-collector + command: ["--config=/etc/otel-collector-config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + ports: + - "4318:4318" # OTLP HTTP + depends_on: + - prometheus + - tempo + - loki + - grafana + prometheus: + image: prom/prometheus:latest + container_name: prometheus + command: + - "--config.file=/etc/prometheus/config.yaml" + - "--storage.tsdb.path=/prometheus" + - "--web.enable-lifecycle" + - "--web.enable-remote-write-receiver" # Add this flag + ports: + - 9090:9090 + volumes: + - ./prometheus/config.yaml:/etc/prometheus/config.yaml + - prometheus:/prometheus - # prometheus: - # image: prom/prometheus - # container_name: prometheus - # command: - # - '--config.file=/etc/prometheus/prometheus.yaml' - # ports: - # - 9090:9090 - # restart: unless-stopped - # volumes: - # - ./prometheus:/etc/prometheus - # - prometheus:/prometheus + grafana: + image: grafana/grafana-oss:latest + container_name: grafana + ports: + - 3000:3000 + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=grafana + - GF_INSTALL_PLUGINS=grafana-clock-panel,grafana-simple-json-datasource + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning + - grafana:/var/lib/grafana + + # Tempo for distributed tracing + tempo: + image: grafana/tempo:latest + container_name: tempo + command: ["-config.file=/etc/tempo.yaml"] + volumes: + - ./tempo/config.yaml:/etc/tempo.yaml + - tempo:/tmp/tempo + ports: + - "3200:3200" # tempo + + # Loki for logs + loki: + container_name: loki + image: grafana/loki:latest + ports: + - "3100:3100" + command: -config.file=/etc/loki/config.yaml + volumes: + - ./loki/config.yaml:/etc/loki/config.yaml - # grafana: - # image: grafana/grafana - # container_name: grafana - # ports: - # - 4000:3000 - # restart: unless-stopped - # environment: - # - GF_SECURITY_ADMIN_USER=admin - # - GF_SECURITY_ADMIN_PASSWORD=grafana - # volumes: - # - ./grafana:/etc/grafana/provisioning/datasources volumes: mysql: grafana: + tempo: + loki: clickhouse: clickhouse-keeper: s3: diff --git a/deployment/grafana/provisioning/datasources/datasources.yaml b/deployment/grafana/provisioning/datasources/datasources.yaml new file mode 100644 index 0000000000..49e26498e7 --- /dev/null +++ b/deployment/grafana/provisioning/datasources/datasources.yaml @@ -0,0 +1,20 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + + - name: Tempo + type: tempo + access: proxy + url: http://tempo:3200 + uid: tempo + + - name: Loki + type: loki + access: proxy + url: http://loki:3100 + uid: loki diff --git a/deployment/loki/config.yaml b/deployment/loki/config.yaml new file mode 100644 index 0000000000..3d55b8c4c1 --- /dev/null +++ b/deployment/loki/config.yaml @@ -0,0 +1,62 @@ +auth_enabled: false + +server: + http_listen_port: 3100 + grpc_listen_port: 9096 + log_level: debug + grpc_server_max_concurrent_streams: 1000 + +common: + instance_addr: 127.0.0.1 + path_prefix: /tmp/loki + storage: + filesystem: + chunks_directory: /tmp/loki/chunks + rules_directory: /tmp/loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + max_size_mb: 100 + +limits_config: + metric_aggregation_enabled: true + +schema_config: + configs: + - from: 2020-10-24 + store: tsdb + object_store: filesystem + schema: v13 + index: + prefix: index_ + period: 24h + +pattern_ingester: + enabled: true + metric_aggregation: + loki_address: localhost:3100 + +ruler: + alertmanager_url: http://localhost:9093 + +frontend: + encoding: protobuf +# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration +# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/ +# +# Statistics help us better understand how Loki is used, and they show us performance +# levels for most users. This helps us prioritize features and documentation. +# For more information on what's sent, look at +# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go +# Refer to the buildReport method to see what goes into a report. +# +# If you would like to disable reporting, uncomment the following lines: +#analytics: +# reporting_enabled: false diff --git a/deployment/otel-collector-config.yaml b/deployment/otel-collector-config.yaml new file mode 100644 index 0000000000..ceba3adffc --- /dev/null +++ b/deployment/otel-collector-config.yaml @@ -0,0 +1,44 @@ +receivers: + otlp: + protocols: + http: + endpoint: "0.0.0.0:4318" + grpc: + endpoint: "0.0.0.0:4317" + +processors: + batch: + send_batch_size: 10000 + timeout: 5s + +exporters: + # For traces - send to Tempo + otlp/tempo: + endpoint: "tempo:4317" + tls: + insecure: true + + # For metrics - send to Prometheus + prometheusremotewrite: + endpoint: "http://prometheus:9090/api/v1/write" + tls: + insecure: true + + # Debug output for troubleshooting + debug: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp/tempo, debug] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [prometheusremotewrite, debug] + logs: + receivers: [otlp] + processors: [batch] + exporters: [debug] diff --git a/deployment/prometheus/config.yaml b/deployment/prometheus/config.yaml new file mode 100644 index 0000000000..c852f72a24 --- /dev/null +++ b/deployment/prometheus/config.yaml @@ -0,0 +1,20 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: "prometheus" + static_configs: + - targets: ["localhost:9090"] + + - job_name: "tempo" + static_configs: + - targets: ["tempo:3200"] + + - job_name: "loki" + static_configs: + - targets: ["loki:3100"] + + - job_name: "otel-collector" + static_configs: + - targets: ["otel-collector:8889"] # If your collector exposes metrics diff --git a/deployment/prometheus/prometheus.yaml b/deployment/prometheus/prometheus.yaml deleted file mode 100644 index faef29e530..0000000000 --- a/deployment/prometheus/prometheus.yaml +++ /dev/null @@ -1,20 +0,0 @@ -global: - scrape_interval: 15s - scrape_timeout: 10s - evaluation_interval: 15s -alerting: - alertmanagers: - - static_configs: - - targets: [] - scheme: http - timeout: 10s - api_version: v1 -scrape_configs: -- job_name: prometheus - metrics_path: /metrics - dns_sd_configs: - - names: - - agent - type: A - port: 2112 - refresh_interval: "30s" diff --git a/deployment/tempo/config.yaml b/deployment/tempo/config.yaml new file mode 100644 index 0000000000..d5c30614f5 --- /dev/null +++ b/deployment/tempo/config.yaml @@ -0,0 +1,24 @@ +server: + http_listen_port: 3200 + +distributor: + receivers: + otlp: + protocols: + http: + endpoint: "0.0.0.0:4318" + grpc: + endpoint: "0.0.0.0:4317" + +storage: + trace: + backend: local + local: + path: /tmp/tempo + +ingester: + max_block_duration: 5m + +compactor: + compaction: + block_retention: 24h diff --git a/go/apps/api/run.go b/go/apps/api/run.go index 5ba4decb2b..7adb5c74df 100644 --- a/go/apps/api/run.go +++ b/go/apps/api/run.go @@ -62,15 +62,18 @@ func Run(ctx context.Context, cfg Config) error { }() if cfg.OtelOtlpEndpoint != "" { - shutdownOtel, grafanaErr := otel.InitGrafana(ctx, otel.Config{ + grafanaErr := otel.InitGrafana(ctx, otel.Config{ GrafanaEndpoint: cfg.OtelOtlpEndpoint, Application: "api", Version: version.Version, - }) + NodeID: cfg.ClusterNodeID, + CloudRegion: cfg.Region, + }, + shutdowns, + ) if grafanaErr != nil { return fmt.Errorf("unable to init grafana: %w", grafanaErr) } - shutdowns.RegisterCtx(shutdownOtel...) } db, err := db.New(db.Config{ diff --git a/go/pkg/cache/cache.go b/go/pkg/cache/cache.go index 8d5c2692f3..a195413df2 100644 --- a/go/pkg/cache/cache.go +++ b/go/pkg/cache/cache.go @@ -55,23 +55,23 @@ type Config[K comparable, V any] struct { var _ Cache[any, any] = (*cache[any, any])(nil) // New creates a new cache instance -func New[K comparable, V any](config Config[K, V]) *cache[K, V] { +func New[K comparable, V any](config Config[K, V]) (*cache[K, V], error) { builder, err := otter.NewBuilder[K, swrEntry[V]](config.MaxSize) if err != nil { - panic(err) + return nil, err } otter, err := builder.CollectStats().Cost(func(key K, value swrEntry[V]) uint32 { return 1 }).WithTTL(config.Stale).Build() if err != nil { - panic(err) + return nil, err } pool, err := ants.NewPool(10) if err != nil { - panic(err) + return nil, err } c := &cache[K, V]{ @@ -87,31 +87,60 @@ func New[K comparable, V any](config Config[K, V]) *cache[K, V] { inflightRefreshes: make(map[K]bool), } - go c.collectMetrics() - - return c + err = c.registerMetrics() + if err != nil { + return nil, err + } + return c, nil } -func (c *cache[K, V]) collectMetrics() { +func (c *cache[K, V]) registerMetrics() error { attributes := metric.WithAttributes( attribute.String("resource", c.resource), ) - t := time.NewTicker(time.Second * 5) - for range t.C { - ctx := context.Background() + err := metrics.Cache.Size.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(int64(c.otter.Size()), attributes) + return nil + }) + if err != nil { + return err + } - metrics.Cache.Size.Record(ctx, int64(c.otter.Size()), attributes) + err = metrics.Cache.Capacity.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(int64(c.otter.Capacity()), attributes) + return nil + }) + if err != nil { + return err + } - stats := c.otter.Stats() - metrics.Cache.Hits.Record(ctx, stats.Hits(), attributes) - metrics.Cache.Misses.Record(ctx, stats.Misses(), attributes) - metrics.Cache.Evicted.Record(ctx, stats.EvictedCount(), attributes) + err = metrics.Cache.Hits.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(c.otter.Stats().Hits(), attributes) + return nil + }) + if err != nil { + return err + } + err = metrics.Cache.Misses.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(c.otter.Stats().Misses(), attributes) + return nil + }) + if err != nil { + return err } + err = metrics.Cache.Evicted.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(c.otter.Stats().EvictedCount(), attributes) + return nil + }) + if err != nil { + return err + } + return nil } func (c *cache[K, V]) Get(ctx context.Context, key K) (value V, hit CacheHit) { diff --git a/go/pkg/cache/cache_test.go b/go/pkg/cache/cache_test.go index dcedc81424..bdfc164f2c 100644 --- a/go/pkg/cache/cache_test.go +++ b/go/pkg/cache/cache_test.go @@ -15,7 +15,7 @@ import ( func TestWriteRead(t *testing.T) { - c := cache.New[string, string](cache.Config[string, string]{ + c, err := cache.New[string, string](cache.Config[string, string]{ MaxSize: 10_000, Fresh: time.Minute, @@ -23,6 +23,7 @@ func TestWriteRead(t *testing.T) { Logger: logging.NewNoop(), Resource: "test", Clock: clock.New(), }) + require.NoError(t, err) c.Set(context.Background(), "key", "value") value, hit := c.Get(context.Background(), "key") require.Equal(t, cache.Hit, hit) @@ -32,7 +33,7 @@ func TestWriteRead(t *testing.T) { func TestEviction(t *testing.T) { clk := clock.NewTestClock() - c := cache.New[string, string](cache.Config[string, string]{ + c, err := cache.New[string, string](cache.Config[string, string]{ MaxSize: 10_000, Fresh: time.Second, @@ -41,7 +42,7 @@ func TestEviction(t *testing.T) { Resource: "test", Clock: clk, }) - + require.NoError(t, err) c.Set(context.Background(), "key", "value") clk.Tick(2 * time.Second) _, hit := c.Get(context.Background(), "key") @@ -55,7 +56,7 @@ func TestRefresh(t *testing.T) { // count how many times we refreshed from origin refreshedFromOrigin := atomic.Int32{} - c := cache.New[string, string](cache.Config[string, string]{ + c, err := cache.New[string, string](cache.Config[string, string]{ MaxSize: 10_000, Fresh: time.Second * 2, @@ -64,7 +65,7 @@ func TestRefresh(t *testing.T) { Resource: "test", Clock: clk, }) - + require.NoError(t, err) c.Set(context.Background(), "key", "value") clk.Tick(time.Second) @@ -79,7 +80,7 @@ func TestRefresh(t *testing.T) { func TestNull(t *testing.T) { - c := cache.New[string, string](cache.Config[string, string]{ + c, err := cache.New[string, string](cache.Config[string, string]{ MaxSize: 10_000, Fresh: time.Second * 1, Stale: time.Minute * 5, @@ -87,7 +88,7 @@ func TestNull(t *testing.T) { Resource: "test", Clock: clock.New(), }) - + require.NoError(t, err) c.SetNull(context.Background(), "key") _, hit := c.Get(context.Background(), "key") diff --git a/go/pkg/cache/simulation_test.go b/go/pkg/cache/simulation_test.go index 9d769860ad..8555e0da16 100644 --- a/go/pkg/cache/simulation_test.go +++ b/go/pkg/cache/simulation_test.go @@ -105,7 +105,7 @@ func TestSimulation(t *testing.T) { fresh := time.Second + time.Duration(rng.IntN(60*60*1000))*time.Millisecond stale := fresh + time.Duration(rng.IntN(24*60*60*1000))*time.Millisecond - c := cache.New[uint64, uint64](cache.Config[uint64, uint64]{ + c, err := cache.New[uint64, uint64](cache.Config[uint64, uint64]{ Clock: clk, Fresh: fresh, Stale: stale, @@ -113,7 +113,7 @@ func TestSimulation(t *testing.T) { MaxSize: rng.IntN(1_000_000) + 1, // Ensure at least size 1 Resource: "test", }) - + require.NoError(t, err) return &state{ keys: []uint64{}, cache: c, diff --git a/go/pkg/clickhouse/client.go b/go/pkg/clickhouse/client.go index a2fc600e5f..f04d8405b2 100644 --- a/go/pkg/clickhouse/client.go +++ b/go/pkg/clickhouse/client.go @@ -2,7 +2,6 @@ package clickhouse import ( "context" - "crypto/tls" "fmt" "time" @@ -66,15 +65,16 @@ func New(config Config) (*Clickhouse, error) { opts.Debugf = func(format string, v ...any) { config.Logger.Debug(fmt.Sprintf(format, v...)) } - if opts.TLS == nil { - - opts.TLS = new(tls.Config) - } + // if opts.TLS == nil { + // + // opts.TLS = new(tls.Config) + // } config.Logger.Info("connecting to clickhouse") conn, err := ch.Open(opts) if err != nil { return nil, fault.Wrap(err, fault.WithDesc("opening clickhouse failed", "")) + } err = retry.New( diff --git a/go/pkg/cluster/cluster.go b/go/pkg/cluster/cluster.go index bdf0c31144..366537ea2f 100644 --- a/go/pkg/cluster/cluster.go +++ b/go/pkg/cluster/cluster.go @@ -7,7 +7,10 @@ import ( "github.com/unkeyed/unkey/go/pkg/events" "github.com/unkeyed/unkey/go/pkg/logging" "github.com/unkeyed/unkey/go/pkg/membership" + "github.com/unkeyed/unkey/go/pkg/otel/metrics" "github.com/unkeyed/unkey/go/pkg/ring" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) // Config configures a new cluster instance with the necessary components @@ -49,6 +52,11 @@ func New(config Config) (*cluster, error) { leaveEvents: events.NewTopic[Node](), } + err = c.registerMetrics() + if err != nil { + return nil, err + } + go c.keepInSync() err = r.AddNode(context.Background(), ring.Node[Node]{ @@ -81,6 +89,25 @@ func (c *cluster) Self() Node { return c.self } +func (c *cluster) registerMetrics() error { + + err := metrics.Cluster.Size.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + members, err := c.membership.Members() + if err != nil { + return err + } + + o.Observe(int64(len(members)), metric.WithAttributes(attribute.String("nodeID", c.self.ID))) + return nil + }) + + if err != nil { + return err + } + + return nil +} + // SubscribeJoin returns a channel that receives Node events // whenever a new node joins the cluster. func (c *cluster) SubscribeJoin() <-chan Node { diff --git a/go/pkg/membership/serf.go b/go/pkg/membership/serf.go index 8cda005795..5d153b4c32 100644 --- a/go/pkg/membership/serf.go +++ b/go/pkg/membership/serf.go @@ -3,6 +3,7 @@ package membership import ( "context" "fmt" + "net" "strings" "sync" "time" @@ -45,10 +46,15 @@ var _ Membership = (*serfMembership)(nil) // New creates a new membership instance with Serf func New(config Config) (*serfMembership, error) { + host, err := parseHost(config.AdvertiseHost) + + if err != nil { + return nil, err + } // Create self member with metadata self := Member{ NodeID: config.NodeID, - Host: config.AdvertiseHost, + Host: host, GossipPort: config.GossipPort, RpcPort: config.RpcPort, } @@ -56,7 +62,7 @@ func New(config Config) (*serfMembership, error) { // Serf configuration serfConfig := serf.DefaultConfig() serfConfig.NodeName = config.NodeID - serfConfig.MemberlistConfig.AdvertiseAddr = config.AdvertiseHost + serfConfig.MemberlistConfig.AdvertiseAddr = host serfConfig.MemberlistConfig.AdvertisePort = config.GossipPort serfConfig.MemberlistConfig.BindAddr = "0.0.0.0" serfConfig.MemberlistConfig.BindPort = config.GossipPort @@ -92,6 +98,20 @@ func New(config Config) (*serfMembership, error) { return m, nil } +// docker compose gives us weird hostnames that we need to look up first +func parseHost(host string) (string, error) { + + advertiseAddrs, err := net.LookupHost(host) + if err != nil { + return "", fmt.Errorf("unable to lookup addr %s: %w", host, err) + } + if len(advertiseAddrs) == 0 { + return "", fmt.Errorf("no advertise addrs found") + } + + return advertiseAddrs[0], nil +} + // Handle Serf events and propagate to our event system func (m *serfMembership) handleEvents(ch <-chan serf.Event) { for event := range ch { diff --git a/go/pkg/otel/grafana.go b/go/pkg/otel/grafana.go index 62fe6004d1..649544f8df 100644 --- a/go/pkg/otel/grafana.go +++ b/go/pkg/otel/grafana.go @@ -10,17 +10,26 @@ import ( "github.com/unkeyed/unkey/go/pkg/shutdown" "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/sdk/metric" - + "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" ) // Config defines the configuration settings for OpenTelemetry integration with Grafana. // It specifies connection details and application metadata needed for proper telemetry. type Config struct { + // NodeID is a unique identifier for the current service instance, + // used to distinguish between multiple instances of the same service. + NodeID string + + // CloudRegion indicates the geographic region where this service instance is running, + // which helps with identifying regional performance patterns or issues. + CloudRegion string + // GrafanaEndpoint is the URL endpoint where telemetry data will be sent. // For Grafana Cloud, this looks like "https://otlp-gateway-{your-stack-id}.grafana.net/otlp" GrafanaEndpoint string @@ -43,57 +52,105 @@ type Config struct { // - Runtime metrics for Go applications (memory, GC, goroutines, etc.) // - Custom application metrics defined in the metrics package // -// The function returns a slice of shutdown functions that should be called in reverse -// order during application shutdown to ensure proper cleanup of telemetry resources. +// The function registers all necessary shutdown handlers with the provided shutdowns instance. +// These handlers will be called during application termination to ensure proper cleanup. // // Example: // -// shutdownFuncs, err := otel.InitGrafana(ctx, otel.Config{ +// shutdowns := shutdown.New() +// err := otel.InitGrafana(ctx, otel.Config{ // GrafanaEndpoint: "https://otlp-gateway-prod-us-east-0.grafana.net/otlp", // Application: "unkey-api", // Version: version.Version, -// }) +// }, shutdowns) +// // if err != nil { // log.Fatalf("Failed to initialize telemetry: %v", err) // } // // // Later during shutdown: -// for i := len(shutdownFuncs) - 1; i >= 0; i-- { -// if err := shutdownFuncs[i](ctx); err != nil { -// log.Printf("Error during telemetry shutdown: %v", err) -// } +// ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) +// defer cancel() +// errs := shutdowns.Shutdown(ctx) +// for _, err := range errs { +// log.Printf("Shutdown error: %v", err) // } -func InitGrafana(ctx context.Context, config Config) ([]shutdown.ShutdownCtx, error) { - shutdowns := make([]shutdown.ShutdownCtx, 0) +func InitGrafana(ctx context.Context, config Config, shutdowns *shutdown.Shutdowns) error { + // Create a resource with common attributes + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(config.Application), + semconv.ServiceVersion(config.Version), + semconv.ServiceInstanceID(config.NodeID), + semconv.CloudRegion(config.CloudRegion), + ), + ) + if err != nil { + return fmt.Errorf("failed to create resource: %w", err) + } - // Initialize trace exporter - traceExporter, err := otlptrace.New(ctx, otlptracehttp.NewClient()) + // Initialize trace exporter with configuration matching the old implementation + traceExporter, err := otlptracehttp.New(ctx, + otlptracehttp.WithEndpoint(config.GrafanaEndpoint), + otlptracehttp.WithCompression(otlptracehttp.GzipCompression), + otlptracehttp.WithInsecure(), // For local development + + ) if err != nil { - return nil, fmt.Errorf("unable to init grafana tracing: %w", err) + return fmt.Errorf("failed to create trace exporter: %w", err) } - shutdowns = append(shutdowns, traceExporter.Shutdown) - // Create and register trace provider - traceProvider := trace.NewTracerProvider(trace.WithBatcher(traceExporter)) - shutdowns = append(shutdowns, traceProvider.Shutdown) + // Register shutdown function for trace exporter + shutdowns.RegisterCtx(traceExporter.Shutdown) + // Create and register trace provider with the same batch settings as the old code + traceProvider := trace.NewTracerProvider( + trace.WithBatcher(traceExporter), + trace.WithResource(res), + ) + + // Register shutdown function for trace provider + shutdowns.RegisterCtx(traceProvider.Shutdown) + + // Set the global trace provider + otel.SetTracerProvider(traceProvider) tracing.SetGlobalTraceProvider(traceProvider) - // Initialize metrics exporter - metricExporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpoint(config.GrafanaEndpoint)) + // Initialize metrics exporter with configuration matching the old implementation + metricExporter, err := otlpmetrichttp.New(ctx, + otlpmetrichttp.WithEndpoint(config.GrafanaEndpoint), + otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + otlpmetrichttp.WithInsecure(), // For local development + + ) if err != nil { - return nil, fmt.Errorf("unable to init grafana metrics: %w", err) + return fmt.Errorf("failed to create metric exporter: %w", err) } - shutdowns = append(shutdowns, metricExporter.Shutdown) - // Create and register meter provider - meterProvider := metric.NewMeterProvider(metric.WithReader(metric.NewPeriodicReader(metricExporter))) - shutdowns = append(shutdowns, meterProvider.Shutdown) + // Register shutdown function for metric exporter + shutdowns.RegisterCtx(metricExporter.Shutdown) + + // Create and register meter provider with the same reader settings as the old code + meterProvider := metric.NewMeterProvider( + metric.WithReader( + metric.NewPeriodicReader( + metricExporter, + metric.WithInterval(10*time.Second), // Match the 10s interval from the old code + ), + ), + metric.WithResource(res), + ) + + // Register shutdown function for meter provider + shutdowns.RegisterCtx(meterProvider.Shutdown) + + // Set the global meter provider + otel.SetMeterProvider(meterProvider) // Initialize application metrics err = metrics.Init(meterProvider.Meter(config.Application)) if err != nil { - return nil, fmt.Errorf("unable to init custom metrics: %w", err) + return fmt.Errorf("failed to initialize custom metrics: %w", err) } // Collect runtime metrics (memory, GC, goroutines, etc.) @@ -102,8 +159,8 @@ func InitGrafana(ctx context.Context, config Config) ([]shutdown.ShutdownCtx, er runtime.WithMinimumReadMemStatsInterval(time.Second), ) if err != nil { - return nil, fmt.Errorf("unable to init runtime metrics: %w", err) + return fmt.Errorf("failed to start runtime metrics collection: %w", err) } - return shutdowns, nil + return nil } diff --git a/go/pkg/otel/metrics/doc.go b/go/pkg/otel/metrics/doc.go new file mode 100644 index 0000000000..b9a01abe1e --- /dev/null +++ b/go/pkg/otel/metrics/doc.go @@ -0,0 +1,34 @@ +// Package metrics provides OpenTelemetry metric instrumentation for the Unkey system. +// +// This package offers a set of pre-defined metrics. +// +// It's designed to be initialized once at application startup and then used throughout +// the codebase for consistent metric collection. +// +// By default, the package initializes with no-op metrics that don't record any data, +// ensuring that importing the package is safe even without explicit initialization. +// For production use, call Init() with a proper meter provider. +// +// Example usage: +// +// // Initialize metrics with a real provider +// provider := prometheus.NewMeterProvider() +// meter := provider.Meter("my-service") +// if err := metrics.Init(meter); err != nil { +// log.Fatal("failed to initialize metrics:", err) +// } +// +// // Record HTTP request metrics +// metrics.Http.Requests.Add(ctx, 1, metric.WithAttributes( +// attribute.String("path", "/api/v1/keys"), +// attribute.Int("status", 200), +// )) +// +// // Register a callback for observable metrics +// metrics.Cache.Size.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { +// o.Observe(currentSize, metric.WithAttributes( +// attribute.String("resource", "api_keys"), +// )) +// return nil +// }) +package metrics diff --git a/go/pkg/otel/metrics/interface.go b/go/pkg/otel/metrics/interface.go new file mode 100644 index 0000000000..47ab230318 --- /dev/null +++ b/go/pkg/otel/metrics/interface.go @@ -0,0 +1,38 @@ +package metrics + +import ( + "context" + + "go.opentelemetry.io/otel/metric" +) + +// Int64Counter represents a metric that accumulates int64 values. +// It's a wrapper around OpenTelemetry's metric.Int64Counter that simplifies its usage. +type Int64Counter interface { + // Add increments the counter by the given value. + // + // Parameters: + // - ctx: The context for the operation, which can carry tracing information + // - incr: The amount to increment the counter by + // - options: Optional metric.AddOption values like attributes + // + // This method is safe for concurrent use. + Add(ctx context.Context, incr int64, options ...metric.AddOption) +} + +// Int64Observable represents a metric that reports values through a callback function. +// This is used for metrics that need to be computed on-demand when they're collected. +type Int64Observable interface { + // RegisterCallback registers a callback function that will be called when the metric + // is collected. + // + // Parameters: + // - callback: The function that will be called to observe the metric value + // + // Returns: + // - error: Any error encountered during callback registration + // + // This method is not guaranteed to be safe for concurrent use and should typically + // be called during application initialization. + RegisterCallback(metric.Int64Callback) error +} diff --git a/go/pkg/otel/metrics/metrics.go b/go/pkg/otel/metrics/metrics.go index 7786cb0488..b9dacbfe3b 100644 --- a/go/pkg/otel/metrics/metrics.go +++ b/go/pkg/otel/metrics/metrics.go @@ -1,7 +1,3 @@ -// unkey/go/pkg/otel/metrics/metrics.go -// Package metrics provides OpenTelemetry instrumentation for monitoring application behavior. -// It exposes global metric instances, initialized with no-op -// implementations by default that can be replaced with real implementations via Init(). package metrics import ( @@ -25,6 +21,7 @@ func init() { // Global metric instances provide easy access to commonly used metrics. // These are initialized during package initialization and are safe for concurrent use. var ( + // Http contains metrics related to HTTP operations Http struct { // Requests counts all incoming API requests. @@ -39,7 +36,7 @@ var ( // attribute.String("path", "/api/v1/users"), // attribute.Int("status", 200), // )) - Requests metric.Int64Counter + Requests Int64Counter } // Cache contains metrics related to cache operations @@ -51,10 +48,13 @@ var ( // - resource (string): The type of resource being cached (e.g., "user_profile") // // Example: - // metrics.Cache.Hits.Record(ctx, 1, metric.WithAttributes( - // attribute.String("resource", "user_profile"), - // )) - Hits metric.Int64Gauge + // metrics.Cache.Hits.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + // o.Observe(hitCount, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + // return nil + // }) + Hits Int64Observable // Misses tracks the number of cache read operations that did not find the requested item. // Use this to monitor cache efficiency and identify opportunities for improvement. @@ -63,10 +63,13 @@ var ( // - resource (string): The type of resource being cached (e.g., "user_profile") // // Example: - // metrics.Cache.Misses.Record(ctx, 1, metric.WithAttributes( - // attribute.String("resource", "user_profile"), - // )) - Misses metric.Int64Gauge + // metrics.Cache.Misses.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + // o.Observe(missCount, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + // return nil + // }) + Misses Int64Observable // Writes tracks the number of cache write operations. // Use this to monitor write pressure on the cache. @@ -75,10 +78,13 @@ var ( // - resource (string): The type of resource being cached (e.g., "user_profile") // // Example: - // metrics.Cache.Writes.Add(ctx, 1, metric.WithAttributes( - // attribute.String("resource", "user_profile"), - // )) - Writes metric.Int64Counter + // metrics.Cache.Writes.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + // o.Observe(writeCount, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + // return nil + // }) + Writes Int64Observable // Evicted tracks the number of items removed from the cache due to space constraints // or explicit deletion. Use this to monitor cache churn and capacity issues. @@ -88,11 +94,14 @@ var ( // - reason (string): The reason for eviction (e.g., "capacity", "ttl", "manual") // // Example: - // metrics.Cache.Evicted.Record(ctx, 1, metric.WithAttributes( - // attribute.String("resource", "user_profile"), - // attribute.String("reason", "ttl"), - // )) - Evicted metric.Int64Gauge + // metrics.Cache.Evicted.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + // o.Observe(evictionCount, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // attribute.String("reason", "ttl"), + // )) + // return nil + // }) + Evicted Int64Observable // ReadLatency measures the duration of cache read operations in milliseconds. // This histogram helps track cache performance and identify slowdowns. @@ -115,10 +124,46 @@ var ( // - resource (string): The type of resource being cached (e.g., "user_profile") // // Example: - // metrics.Cache.Size.Record(ctx, 1042, metric.WithAttributes( - // attribute.String("resource", "user_profile"), - // )) - Size metric.Int64Gauge + // metrics.Cache.Size.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + // o.Observe(currentSize, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + // return nil + // }) + Size Int64Observable + + // Capacity tracks the current maximum number of items in the cache + // Use this to monitor cache utilization and growth patterns. + // + // Attributes: + // - resource (string): The type of resource being cached (e.g., "user_profile") + // + // Example: + // metrics.Cache.Capacity.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + // o.Observe(currentCapacity, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + // return nil + // }) + Capacity Int64Observable + } + + // Cluster contains metrics related to cluster operations and status + Cluster struct { + // Size tracks the current number of nodes in the cluster. + // Use this to monitor cluster health, scaling events, and load distribution. + // + // Attributes: + // - nodeID (string): The unique identifier of the node (e.g., "node-1", "node-abc123") + // + // Example: + // metrics.Cluster.Size.RegisterCallback(func(_ context.Context, o metric.Int64Observer) error { + // o.Observe(nodeCount, metric.WithAttributes( + // attribute.String("nodeID", "node-abc123"), + // )) + // return nil + // }) + Size Int64Observable } ) @@ -152,32 +197,36 @@ func Init(m metric.Meter) error { } // Initialize Cache metrics - Cache.Hits, err = m.Int64Gauge("cache_hit", - metric.WithDescription("Cache hits"), - ) - if err != nil { - return err + Cache.Hits = &int64ObservableCounter{ + m: m, + name: "cache_hit", + opts: []metric.Int64ObservableCounterOption{ + metric.WithDescription("How many cache hits we encountered."), + }, } - Cache.Misses, err = m.Int64Gauge("cache_miss", - metric.WithDescription("Cache misses"), - ) - if err != nil { - return err + Cache.Misses = &int64ObservableCounter{ + m: m, + name: "cache_miss", + opts: []metric.Int64ObservableCounterOption{ + metric.WithDescription("How many cache misses we encountered."), + }, } - Cache.Writes, err = m.Int64Counter("cache_write", - metric.WithDescription("Cache writes"), - ) - if err != nil { - return err + Cache.Writes = &int64ObservableCounter{ + m: m, + name: "cache_writes", + opts: []metric.Int64ObservableCounterOption{ + metric.WithDescription("How many cache writes we did."), + }, } - Cache.Evicted, err = m.Int64Gauge("cache_evicted", - metric.WithDescription("Evicted entries"), - ) - if err != nil { - return err + Cache.Evicted = &int64ObservableCounter{ + m: m, + name: "cache_evicted", + opts: []metric.Int64ObservableCounterOption{ + metric.WithDescription("How many cache evictions we did."), + }, } Cache.ReadLatency, err = m.Int64Histogram("cache_read_latency", @@ -188,12 +237,27 @@ func Init(m metric.Meter) error { return err } - Cache.Size, err = m.Int64Gauge("cache_size", - metric.WithDescription("How many entries are stored in the cache."), - ) - if err != nil { - return err + Cache.Size = &int64ObservableGauge{ + m: m, + name: "cache_size", + opts: []metric.Int64ObservableGaugeOption{ + metric.WithDescription("How many entries are stored in the cache."), + }, + } + Cache.Capacity = &int64ObservableGauge{ + m: m, + name: "cache_capacity", + opts: []metric.Int64ObservableGaugeOption{ + metric.WithDescription("Maximum number of items the cache can hold."), + }, } + Cluster.Size = &int64ObservableGauge{ + m: m, + name: "cluster_size", + opts: []metric.Int64ObservableGaugeOption{ + metric.WithDescription("How many nodes are in the cluster."), + }, + } return nil } diff --git a/go/pkg/otel/metrics/observable.go b/go/pkg/otel/metrics/observable.go new file mode 100644 index 0000000000..7769e56a40 --- /dev/null +++ b/go/pkg/otel/metrics/observable.go @@ -0,0 +1,66 @@ +package metrics + +import "go.opentelemetry.io/otel/metric" + +// int64ObservableGauge implements the Int64Observable interface for gauge metrics. +// It provides a way to observe values that can go up and down over time. +type int64ObservableGauge struct { + m metric.Meter + name string + opts []metric.Int64ObservableGaugeOption +} + +// Ensure int64ObservableGauge implements Int64Observable +var _ Int64Observable = (*int64ObservableGauge)(nil) + +// RegisterCallback registers a callback function for the int64ObservableGauge. +// This callback will be invoked when the metrics system collects values. +// +// Parameters: +// - callback: The function that will be called to observe the metric value +// +// Returns: +// - error: Any error encountered during callback registration +// +// The callback will be invoked periodically by the metrics collection system, +// and should efficiently compute and report the current value of the metric. +// +// Thread-safety: This method is not safe for concurrent use with other methods +// on the same gauge instance. It should typically be called during initialization. +func (g *int64ObservableGauge) RegisterCallback(callback metric.Int64Callback) error { + _, err := g.m.Int64ObservableGauge(g.name, append(g.opts, metric.WithInt64Callback(callback))...) + + return err +} + +// int64ObservableCounter implements the Int64Observable interface for counter metrics. +// It provides a way to observe monotonically increasing values over time. +type int64ObservableCounter struct { + m metric.Meter + name string + opts []metric.Int64ObservableCounterOption +} + +// Ensure int64ObservableCounter implements Int64Observable +var _ Int64Observable = (*int64ObservableCounter)(nil) + +// RegisterCallback registers a callback function for the int64ObservableCounter. +// This callback will be invoked when the metrics system collects values. +// +// Parameters: +// - callback: The function that will be called to observe the metric value +// +// Returns: +// - error: Any error encountered during callback registration +// +// The callback will be invoked periodically by the metrics collection system, +// and should efficiently compute and report the current value of the metric. +// For counters, the reported value should never decrease. +// +// Thread-safety: This method is not safe for concurrent use with other methods +// on the same counter instance. It should typically be called during initialization. +func (g *int64ObservableCounter) RegisterCallback(callback metric.Int64Callback) error { + _, err := g.m.Int64ObservableCounter(g.name, append(g.opts, metric.WithInt64Callback(callback))...) + + return err +} diff --git a/go/pkg/zen/middleware_metrics.go b/go/pkg/zen/middleware_metrics.go index 6f6e367267..9aac5e5d17 100644 --- a/go/pkg/zen/middleware_metrics.go +++ b/go/pkg/zen/middleware_metrics.go @@ -8,6 +8,9 @@ import ( "time" "github.com/unkeyed/unkey/go/pkg/clickhouse/schema" + "github.com/unkeyed/unkey/go/pkg/otel/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) type EventBuffer interface { @@ -44,6 +47,7 @@ func WithMetrics(eventBuffer EventBuffer) Middleware { start := time.Now() nextErr := next(ctx, s) serviceLatency := time.Since(start) + requestHeaders := []string{} for k, vv := range s.r.Header { if k == "authorization" { @@ -58,6 +62,13 @@ func WithMetrics(eventBuffer EventBuffer) Middleware { responseHeaders = append(responseHeaders, fmt.Sprintf("%s: %s", k, strings.Join(vv, ","))) } + metrics.Http.Requests.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet( + attribute.String("host", s.r.Host), + attribute.String("method", s.r.Method), + attribute.String("path", s.r.URL.Path), + attribute.Int("status", s.responseStatus), + ))) + eventBuffer.BufferApiRequest(schema.ApiRequestV1{ WorkspaceID: s.workspaceID, RequestID: s.requestID,