diff --git a/go/cmd/api/config.go b/go/cmd/api/config.go index 7a24b66564..7ef69ff56c 100644 --- a/go/cmd/api/config.go +++ b/go/cmd/api/config.go @@ -1,46 +1,47 @@ package api type nodeConfig struct { - Platform string `json:"platform,omitempty" description:"The platform this agent is running on"` - Image string `json:"image,omitempty" description:"The image this agent is running"` - HttpPort int `json:"httpPort" default:"7070" description:"Port to listen on"` - Schema string `json:"$schema,omitempty" description:"Make jsonschema happy"` - Region string `json:"region,omitempty" description:"The region this agent is running in"` + Platform string `json:"platform,omitempty" description:"Cloud platform identifier (e.g., aws, gcp, hetzner)"` + Image string `json:"image,omitempty" description:"Container image identifier including repository and tag"` + HttpPort int `json:"httpPort" default:"7070" description:"HTTP port for the API server to listen on"` + Schema string `json:"$schema,omitempty" description:"JSON Schema URI for configuration validation"` + Region string `json:"region,omitempty" description:"Geographic region identifier where this node is deployed"` Heartbeat *struct { - URL string `json:"url" minLength:"1" description:"URL to send heartbeat to"` - Interval int `json:"interval" min:"1" description:"Interval in seconds to send heartbeat"` - } `json:"heartbeat,omitempty" description:"Send heartbeat to a URL"` + URL string `json:"url" minLength:"1" description:"Complete URL endpoint where heartbeat signals will be sent"` + Interval int `json:"interval" min:"1" description:"Time between heartbeat signals in seconds"` + } `json:"heartbeat,omitempty" description:"Configuration for health check heartbeat mechanism"` Cluster *struct { - NodeID string `json:"nodeId,omitempty" description:"A unique node id"` + NodeID string `json:"nodeId,omitempty" description:"Unique identifier for this node within the cluster"` AdvertiseAddr struct { - Static *string `json:"static,omitempty" description:"The address to advertise to other nodes"` - AwsEcsMetadata *bool `json:"awsEcsMetadata,omitempty" description:"Use AWS ECS metadata to retrieve the address of the current node"` - } `json:"advertiseAddr" description:"A mechanism of retrieving the address of the current node."` - RpcPort string `json:"rpcPort" default:"7071" description:"The port used for RPC"` - GossipPort string `json:"gossipPort" default:"7072" description:"The port used for gossip"` + Static *string `json:"static,omitempty" description:"Static IP address or hostname for node discovery"` + AwsEcsMetadata *bool `json:"awsEcsMetadata,omitempty" description:"Enable automatic address discovery using AWS ECS container metadata"` + } `json:"advertiseAddr" description:"Node address advertisement configuration for cluster communication"` + RpcPort string `json:"rpcPort" default:"7071" description:"Port used for internal RPC communication between nodes"` + GossipPort string `json:"gossipPort" default:"7072" description:"Port used for cluster membership and failure detection"` Discovery *struct { Static *struct { - Addrs []string `json:"addrs" minLength:"1" description:"List of node addresses"` - } `json:"static,omitempty" description:"Static cluster discovery configuration"` + Addrs []string `json:"addrs" minLength:"1" description:"List of seed node addresses for static cluster configuration"` + } `json:"static,omitempty" description:"Static cluster membership configuration"` Redis *struct { - URL string `json:"url" minLength:"1" description:"Redis URL"` - } `json:"redis,omitempty" description:"Redis cluster discovery configuration"` - } `json:"discovery,omitempty" description:"Cluster discovery configuration, only one supported: static, cloudmap"` - } `json:"cluster,omitempty" description:"Cluster configuration"` + URL string `json:"url" minLength:"1" description:"Redis connection string for dynamic cluster discovery"` + } `json:"redis,omitempty" description:"Redis-based cluster discovery configuration"` + } `json:"discovery,omitempty" description:"Cluster node discovery mechanism configuration"` + } `json:"cluster,omitempty" description:"Distributed cluster configuration settings"` Logs *struct { - Color bool `json:"color" description:"Display color in logs"` + Color bool `json:"color" description:"Enable ANSI color codes in log output"` } `json:"logs,omitempty"` Clickhouse *struct { - Url string `json:"url" minLength:"1"` + Url string `json:"url" minLength:"1" description:"ClickHouse database connection string"` } `json:"clickhouse,omitempty"` Database struct { - // DSN of the primary database for reads and writes. - Primary string `json:"primary"` - - // An optional read replica DSN. - ReadonlyReplica string `json:"readonlyReplica,omitempty"` + Primary string `json:"primary" description:"Primary database connection string for read and write operations"` + ReadonlyReplica string `json:"readonlyReplica,omitempty" description:"Optional read-replica database connection string for read operations"` } `json:"database"` + + Otel *struct { + OtlpEndpoint string `json:"otlpEndpoint" description:"OpenTelemetry collector endpoint for metrics, traces, and logs"` + } `json:"otel,omitempty" description:"OpenTelemetry observability configuration"` } diff --git a/go/cmd/api/main.go b/go/cmd/api/main.go index 1a4f538a2e..b39f3ca0b3 100644 --- a/go/cmd/api/main.go +++ b/go/cmd/api/main.go @@ -25,6 +25,8 @@ import ( "github.com/unkeyed/unkey/go/pkg/discovery" "github.com/unkeyed/unkey/go/pkg/logging" "github.com/unkeyed/unkey/go/pkg/membership" + "github.com/unkeyed/unkey/go/pkg/otel" + "github.com/unkeyed/unkey/go/pkg/shutdown" "github.com/unkeyed/unkey/go/pkg/uid" "github.com/unkeyed/unkey/go/pkg/version" "github.com/unkeyed/unkey/go/pkg/zen" @@ -54,7 +56,7 @@ var Cmd = &cli.Command{ // nolint:gocognit func run(cliC *cli.Context) error { - shutdowns := []func(ctx context.Context) error{} + shutdowns := []shutdown.ShutdownFn{} if cliC.Bool("generate-config-schema") { // nolint:exhaustruct @@ -111,6 +113,18 @@ func run(cliC *cli.Context) error { logger.Info(ctx, "configration loaded", slog.String("file", configFile)) + if cfg.Otel != nil { + shutdownOtel, grafanaErr := otel.InitGrafana(ctx, otel.Config{ + GrafanaEndpoint: cfg.Otel.OtlpEndpoint, + Application: "api", + Version: version.Version, + }) + if grafanaErr != nil { + return fmt.Errorf("unable to init grafana: %w", grafanaErr) + } + shutdowns = append(shutdowns, shutdownOtel...) + } + db, err := database.New(database.Config{ PrimaryDSN: cfg.Database.Primary, ReadOnlyDSN: cfg.Database.ReadonlyReplica, @@ -189,7 +203,7 @@ func run(cliC *cli.Context) error { return gracefulShutdown(ctx, logger, shutdowns) } -func gracefulShutdown(ctx context.Context, logger logging.Logger, shutdowns []func(ctx context.Context) error) error { +func gracefulShutdown(ctx context.Context, logger logging.Logger, shutdowns []shutdown.ShutdownFn) error { cShutdown := make(chan os.Signal, 1) signal.Notify(cShutdown, os.Interrupt, syscall.SIGTERM) @@ -212,13 +226,12 @@ func gracefulShutdown(ctx context.Context, logger logging.Logger, shutdowns []fu return nil } -func setupCluster(cfg nodeConfig, logger logging.Logger) (cluster.Cluster, []func(ctx context.Context) error, error) { +func setupCluster(cfg nodeConfig, logger logging.Logger) (cluster.Cluster, []shutdown.ShutdownFn, error) { + shutdowns := []shutdown.ShutdownFn{} if cfg.Cluster == nil { - return cluster.NewNoop("", "127.0.0.1"), []func(ctx context.Context) error{}, nil + return cluster.NewNoop("", "127.0.0.1"), shutdowns, nil } - shutdowns := []func(ctx context.Context) error{} - var advertiseAddr string { switch { diff --git a/go/cmd/healthcheck/main.go b/go/cmd/healthcheck/main.go index 2214dcd6f6..36dc0eabf7 100644 --- a/go/cmd/healthcheck/main.go +++ b/go/cmd/healthcheck/main.go @@ -26,6 +26,7 @@ func run(cliC *cli.Context) error { return fmt.Errorf("You must provide a url like so: 'unkey healthcheck '") } + // nolint:gosec res, err := http.Get(url) if err != nil { return fmt.Errorf("failed to perform healthcheck: %w", err) diff --git a/go/go.mod b/go/go.mod index 86618d8c54..79f6a535b3 100644 --- a/go/go.mod +++ b/go/go.mod @@ -6,13 +6,10 @@ require ( connectrpc.com/connect v1.16.2 connectrpc.com/otelconnect v0.7.1 github.com/ClickHouse/clickhouse-go/v2 v2.31.0 - github.com/Southclaws/fault v0.8.1 - github.com/axiomhq/axiom-go v0.22.0 github.com/btcsuite/btcutil v1.0.2 github.com/danielgtaylor/huma v1.14.3 github.com/go-sql-driver/mysql v1.8.1 github.com/gonum/stat v0.0.0-20181125101827-41a0da705a5b - github.com/google/uuid v1.6.0 github.com/hashicorp/memberlist v0.5.3 github.com/lmittmann/tint v1.0.7 github.com/maypok86/otter v1.2.4 @@ -28,7 +25,14 @@ require ( github.com/unkeyed/unkey/apps/agent v0.0.0-20250211105155-776bdbccce47 github.com/urfave/cli/v2 v2.27.5 github.com/xeipuuv/gojsonschema v1.2.0 + go.opentelemetry.io/contrib/instrumentation/runtime v0.59.0 go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.29.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 + go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 google.golang.org/protobuf v1.36.5 ) @@ -44,6 +48,7 @@ require ( github.com/andybalholm/brotli v1.1.1 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect + github.com/axiomhq/axiom-go v0.20.2 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -63,7 +68,6 @@ require ( github.com/dprotaso/go-yit v0.0.0-20240618133044-5a0af90af097 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fatih/structtag v1.2.0 // indirect - github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gammazero/deque v1.0.0 // indirect github.com/getkin/kin-openapi v0.127.0 // indirect github.com/go-faster/city v1.0.1 // indirect @@ -81,19 +85,17 @@ require ( github.com/gonum/matrix v0.0.0-20181209220409-c518dec07be9 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/cel-go v0.22.1 // indirect - github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-metrics v0.5.4 // indirect - github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-sockaddr v1.0.7 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect - github.com/hashicorp/serf v0.10.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/yaml v0.3.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -104,7 +106,6 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/mailru/easyjson v0.9.0 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/miekg/dns v1.1.63 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect @@ -128,7 +129,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/riza-io/grpc-go v0.2.0 // indirect - github.com/rs/zerolog v1.33.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect @@ -150,11 +150,6 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go/go.sum b/go/go.sum index b37c3ceaea..af35ec5ced 100644 --- a/go/go.sum +++ b/go/go.sum @@ -73,8 +73,6 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/Southclaws/fault v0.8.1 h1:mgqqdC6kUBQ6ExMALZ0nNaDfNJD5h2+wq3se5mAyX+8= -github.com/Southclaws/fault v0.8.1/go.mod h1:VUVkAWutC59SL16s6FTqf3I6I2z77RmnaW5XRz4bLOE= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -94,8 +92,8 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/axiomhq/axiom-go v0.22.0 h1:QFC09ugLrwc8DNaq8QgF4Q2R/B2V5xYTjZzvUfe72s8= -github.com/axiomhq/axiom-go v0.22.0/go.mod h1:ybDThTO73XgRNQjTRxXqUiZh3QM7Wf5/exaFbp9VgLY= +github.com/axiomhq/axiom-go v0.20.2 h1:RKelFJr8Pei0xIoBaVteGGvn2pkaaMLrWiHLWu8d0Mc= +github.com/axiomhq/axiom-go v0.20.2/go.mod h1:TWHIoBDv/IJNKgyo2EQeOwH4svi+cTePSihPVWZC1/8= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -149,7 +147,6 @@ github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7b github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0= @@ -384,8 +381,6 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-metrics v0.5.4 h1:8mmPiIJkTPPEbAiV97IxdAGNdRdaWwVap1BU6elejKY= github.com/hashicorp/go-metrics v0.5.4/go.mod h1:CG5yz4NZ/AI/aQt9Ucm/vdBnbh7fvmv4lxZ350i+QQI= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= -github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= -github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack/v2 v2.1.2 h1:4Ee8FTp834e+ewB71RDrQ0VKpyFdrKOjvYtnQ/ltVj0= github.com/hashicorp/go-msgpack/v2 v2.1.2/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= @@ -410,12 +405,9 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0= github.com/hashicorp/memberlist v0.5.3 h1:tQ1jOCypD0WvMemw/ZhhtH+PWpzcftQvgCorLu0hndk= github.com/hashicorp/memberlist v0.5.3/go.mod h1:h60o12SZn/ua/j0B6iKAZezA4eDaGsIuPO70eOaJ6WE= github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= -github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY= -github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -485,16 +477,12 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -623,9 +611,6 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= -github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= @@ -740,10 +725,14 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= +go.opentelemetry.io/contrib/instrumentation/runtime v0.59.0 h1:rfi2MMujBc4yowE0iHckZX4o4jg6SA67EnFVL8ldVvU= +go.opentelemetry.io/contrib/instrumentation/runtime v0.59.0/go.mod h1:IO/gfPEcQYpOpPxn1OXFp1DvRY0viP8ONMedXLjjHIU= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.29.0 h1:xvhQxJ/C9+RTnAj5DpTg7LSM1vbbMTiXt7e9hsfqHNw= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.29.0/go.mod h1:Fcvs2Bz1jkDM+Wf5/ozBGmi3tQ/c9zPKLnsipnfhGAo= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 h1:BEj3SPM81McUZHYjRS5pEgNgnmzGJ5tRpU5krWnV8Bs= @@ -752,8 +741,8 @@ go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= -go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= -go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= @@ -990,11 +979,8 @@ golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/go/internal/services/ratelimit/peers.go b/go/internal/services/ratelimit/peers.go index 46b85a7079..45ab342e19 100644 --- a/go/internal/services/ratelimit/peers.go +++ b/go/internal/services/ratelimit/peers.go @@ -8,10 +8,10 @@ import ( "connectrpc.com/connect" "connectrpc.com/otelconnect" + "github.com/unkeyed/unkey/apps/agent/pkg/tracing" "github.com/unkeyed/unkey/go/gen/proto/ratelimit/v1/ratelimitv1connect" "github.com/unkeyed/unkey/go/pkg/cluster" "github.com/unkeyed/unkey/go/pkg/fault" - "github.com/unkeyed/unkey/go/pkg/tracing" ) type peer struct { diff --git a/go/internal/services/ratelimit/replay.go b/go/internal/services/ratelimit/replay.go index 621d6e1a3e..99a28a24a8 100644 --- a/go/internal/services/ratelimit/replay.go +++ b/go/internal/services/ratelimit/replay.go @@ -7,7 +7,7 @@ import ( "connectrpc.com/connect" ratelimitv1 "github.com/unkeyed/unkey/go/gen/proto/ratelimit/v1" - "github.com/unkeyed/unkey/go/pkg/tracing" + "github.com/unkeyed/unkey/go/pkg/otel/tracing" ) // consumes the replay buffer and sends out replay requests to peers diff --git a/go/internal/services/ratelimit/sliding_window.go b/go/internal/services/ratelimit/sliding_window.go index df140fc093..d58c58d632 100644 --- a/go/internal/services/ratelimit/sliding_window.go +++ b/go/internal/services/ratelimit/sliding_window.go @@ -13,7 +13,7 @@ import ( "github.com/unkeyed/unkey/go/pkg/clock" "github.com/unkeyed/unkey/go/pkg/cluster" "github.com/unkeyed/unkey/go/pkg/logging" - "github.com/unkeyed/unkey/go/pkg/tracing" + "github.com/unkeyed/unkey/go/pkg/otel/tracing" "go.opentelemetry.io/otel/attribute" ) diff --git a/go/pkg/cache/cache.go b/go/pkg/cache/cache.go index 7a7a05288a..f36f8882f4 100644 --- a/go/pkg/cache/cache.go +++ b/go/pkg/cache/cache.go @@ -13,6 +13,9 @@ import ( "github.com/unkeyed/unkey/go/pkg/clock" "github.com/unkeyed/unkey/go/pkg/fault" "github.com/unkeyed/unkey/go/pkg/logging" + "github.com/unkeyed/unkey/go/pkg/otel/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) type cache[K comparable, V any] struct { @@ -62,7 +65,7 @@ func New[K comparable, V any](config Config[K, V]) *cache[K, V] { otter, err := builder.CollectStats().Cost(func(key K, value swrEntry[V]) uint32 { return 1 - }).WithTTL(time.Hour).Build() + }).WithTTL(config.Stale).Build() if err != nil { panic(err) } @@ -85,22 +88,47 @@ func New[K comparable, V any](config Config[K, V]) *cache[K, V] { inflightRefreshes: make(map[K]bool), } + go c.collectMetrics() + return c } +func (c *cache[K, V]) collectMetrics() { + + attributes := metric.WithAttributes( + attribute.String("resource", c.resource), + ) + + t := time.NewTicker(time.Second * 5) + for range t.C { + ctx := context.Background() + + metrics.Cache.Size.Record(ctx, int64(c.otter.Size()), attributes) + + stats := c.otter.Stats() + metrics.Cache.Hits.Record(ctx, stats.Hits(), attributes) + metrics.Cache.Misses.Record(ctx, stats.Misses(), attributes) + metrics.Cache.Evicted.Record(ctx, stats.EvictedCount(), attributes) + + } + +} + func (c *cache[K, V]) Get(ctx context.Context, key K) (value V, hit CacheHit) { - e, ok := c.otter.Get(key) + e, ok := c.get(ctx, key) if !ok { // This hack is necessary because you can not return nil as V var v V + return v, Miss } now := c.clock.Now() if now.Before(e.Stale) { + return e.Value, e.Hit } @@ -118,9 +146,21 @@ func (c *cache[K, V]) SetNull(ctx context.Context, key K) { func (c *cache[K, V]) Set(ctx context.Context, key K, value V) { c.set(ctx, key, value) } + +func (c *cache[K, V]) get(ctx context.Context, key K) (swrEntry[V], bool) { + t0 := c.clock.Now() + v, ok := c.otter.Get(key) + t1 := c.clock.Now() + + metrics.Cache.ReadLatency.Record(ctx, t1.UnixMilli()-t0.UnixMilli(), metric.WithAttributes( + attribute.String("resource", c.resource), + )) + + return v, ok +} + func (c *cache[K, V]) set(_ context.Context, key K, value ...V) { now := c.clock.Now() - e := swrEntry[V]{ Value: value[0], Fresh: now.Add(c.fresh), @@ -223,7 +263,7 @@ func (c *cache[K, V]) SWR( translateError func(error) CacheHit, ) (V, error) { now := c.clock.Now() - e, ok := c.otter.Get(key) + e, ok := c.get(ctx, key) if ok { // Cache Hit diff --git a/go/pkg/cache/middleware/tracing.go b/go/pkg/cache/middleware/tracing.go index 3ba79b9e89..04287de88c 100644 --- a/go/pkg/cache/middleware/tracing.go +++ b/go/pkg/cache/middleware/tracing.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/unkeyed/unkey/go/pkg/cache" - "github.com/unkeyed/unkey/go/pkg/tracing" + "github.com/unkeyed/unkey/go/pkg/otel/tracing" "go.opentelemetry.io/otel/attribute" ) diff --git a/go/pkg/circuitbreaker/lib.go b/go/pkg/circuitbreaker/lib.go index e464eb076c..ffbde5df8f 100644 --- a/go/pkg/circuitbreaker/lib.go +++ b/go/pkg/circuitbreaker/lib.go @@ -9,7 +9,7 @@ import ( "github.com/unkeyed/unkey/go/pkg/clock" "github.com/unkeyed/unkey/go/pkg/logging" - "github.com/unkeyed/unkey/go/pkg/tracing" + "github.com/unkeyed/unkey/go/pkg/otel/tracing" ) type CB[Res any] struct { @@ -148,7 +148,7 @@ func New[Res any](name string, applyConfigs ...applyConfig) *CB[Res] { var _ CircuitBreaker[any] = (*CB[any])(nil) func (cb *CB[Res]) Do(ctx context.Context, fn func(context.Context) (Res, error)) (res Res, err error) { - ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "Do")) + ctx, span := tracing.Start(ctx, fmt.Sprintf("circuitbreaker.%s.Do", cb.config.name)) defer span.End() err = cb.preflight(ctx) @@ -156,7 +156,7 @@ func (cb *CB[Res]) Do(ctx context.Context, fn func(context.Context) (Res, error) return res, err } - ctx, fnSpan := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "fn")) + ctx, fnSpan := tracing.Start(ctx, fmt.Sprintf("circuitbreaker.%s.fn", cb.config.name)) res, err = fn(ctx) fnSpan.End() @@ -168,7 +168,7 @@ func (cb *CB[Res]) Do(ctx context.Context, fn func(context.Context) (Res, error) // preflight checks if the circuit is ready to accept a request func (cb *CB[Res]) preflight(ctx context.Context) error { - ctx, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "preflight")) + ctx, span := tracing.Start(ctx, fmt.Sprintf("circuitbreaker.%s.preflight", cb.config.name)) defer span.End() cb.Lock() defer cb.Unlock() @@ -205,7 +205,7 @@ func (cb *CB[Res]) preflight(ctx context.Context) error { // postflight updates the circuit breaker state based on the result of the request func (cb *CB[Res]) postflight(ctx context.Context, err error) { - _, span := tracing.Start(ctx, tracing.NewSpanName(fmt.Sprintf("circuitbreaker.%s", cb.config.name), "postflight")) + _, span := tracing.Start(ctx, fmt.Sprintf("circuitbreaker.%s.postflight", cb.config.name)) defer span.End() cb.Lock() defer cb.Unlock() diff --git a/go/pkg/config/json_test.go b/go/pkg/config/json_test.go index 6dfb0c0e34..d7fdfeff4b 100644 --- a/go/pkg/config/json_test.go +++ b/go/pkg/config/json_test.go @@ -6,7 +6,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/unkeyed/unkey/apps/agent/pkg/config" + "github.com/unkeyed/unkey/go/pkg/config" ) func TestLoadFile_WithMissingRequired(t *testing.T) { diff --git a/go/pkg/discovery/static.go b/go/pkg/discovery/static.go index 86a7c4b2de..a325b0bb8c 100644 --- a/go/pkg/discovery/static.go +++ b/go/pkg/discovery/static.go @@ -6,6 +6,6 @@ type Static struct { var _ Discoverer = (*Static)(nil) -func (s *Static) Discover() ([]string, error) { +func (s Static) Discover() ([]string, error) { return s.Addrs, nil } diff --git a/go/pkg/env/env.go b/go/pkg/env/env.go deleted file mode 100644 index 4c411f9ed4..0000000000 --- a/go/pkg/env/env.go +++ /dev/null @@ -1,109 +0,0 @@ -package env - -import ( - "fmt" - "os" - "strconv" - "strings" - "time" -) - -type Env struct { - ErrorHandler func(error) -} - -func (e *Env) String(name string, fallback ...string) string { - value := os.Getenv(name) - if value != "" { - return value - } - if len(fallback) > 0 { - return fallback[0] - } - e.ErrorHandler(fmt.Errorf("%s is not set and no fallback provided", name)) - return "" -} - -// Strings parses a comma-separated list of strings. -func (e *Env) Strings(name string, fallback ...[]string) []string { - value := os.Getenv(name) - if value != "" { - return strings.Split(value, ",") - } - if len(fallback) > 0 { - return fallback[0] - } - e.ErrorHandler(fmt.Errorf("%s is not set and no fallback provided", name)) - return []string{} - -} - -// Strings parses a comma-separated list of strings and appends it to the default values -func (e *Env) StringsAppend(name string, defaultValues ...[]string) []string { - all := []string{} - if len(defaultValues) > 0 { - all = defaultValues[0] - } - - value := os.Getenv(name) - if value != "" { - all = append(all, strings.Split(value, ",")...) - } - if len(all) == 0 { - e.ErrorHandler(fmt.Errorf("%s is not set and no fallback provided", name)) - return []string{} - } - return all - -} - -func (e *Env) Int(name string, fallback ...int) int { - value := os.Getenv(name) - if value != "" { - i, err := strconv.Atoi(value) - if err != nil { - e.ErrorHandler(err) - return 0 - } - return i - } - if len(fallback) > 0 { - return fallback[0] - } - e.ErrorHandler(fmt.Errorf("%s is not set and no fallback provided", name)) - return 0 -} - -func (e *Env) Bool(name string, fallback ...bool) bool { - value := os.Getenv(name) - if value != "" { - b, err := strconv.ParseBool(value) - if err != nil { - e.ErrorHandler(err) - return false - } - return b - } - if len(fallback) > 0 { - return fallback[0] - } - e.ErrorHandler(fmt.Errorf("%s is not set and no fallback provided", name)) - return false -} - -func (e *Env) Duration(name string, fallback ...time.Duration) time.Duration { - value := os.Getenv(name) - if value != "" { - d, err := time.ParseDuration(value) - if err != nil { - e.ErrorHandler(err) - return 0 - } - return d - } - if len(fallback) > 0 { - return fallback[0] - } - e.ErrorHandler(fmt.Errorf("%s is not set and no fallback provided", name)) - return 0 -} diff --git a/go/pkg/env/env_test.go b/go/pkg/env/env_test.go deleted file mode 100644 index 3f0117d5b6..0000000000 --- a/go/pkg/env/env_test.go +++ /dev/null @@ -1,182 +0,0 @@ -package env_test - -import ( - "fmt" - "math/rand" - "strings" - "testing" - "time" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/unkeyed/unkey/apps/agent/pkg/env" -) - -func TestString_WhenSet(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - value := uuid.NewString() - - t.Setenv(key, value) - - got := e.String(key) - require.Equal(t, got, value) -} - -func TestString_WhenNotSet(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.Error(t, err) }, - } - - key := uuid.NewString() - - got := e.String(key) - require.Equal(t, "", got) -} - -func TestString_WhenNotSetFallback(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - fallback := uuid.NewString() - - got := e.String(key, fallback) - require.Equal(t, fallback, got) -} - -func TestStringsAppend_WhenSet(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - values := []string{uuid.NewString(), uuid.NewString()} - - t.Setenv(key, strings.Join(values, ",")) - - got := e.StringsAppend(key) - require.Equal(t, got, values) -} - -func TestStringsAppend_WhenSetWithDefaults(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - values := []string{uuid.NewString(), uuid.NewString()} - defaults := []string{uuid.NewString(), uuid.NewString()} - - t.Setenv(key, strings.Join(values, ",")) - - got := e.StringsAppend(key, defaults) - require.Equal(t, 4, len(got)) - require.Contains(t, got, values[0]) - require.Contains(t, got, values[1]) - require.Contains(t, got, defaults[0]) - require.Contains(t, got, defaults[1]) -} - -func TestStringsAppend_WhenNotSet(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.Error(t, err) }, - } - - key := uuid.NewString() - - got := e.StringsAppend(key) - require.Equal(t, []string{}, got) -} - -func TestStringsAppend_WhenNotSetFallback(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - fallback := []string{uuid.NewString()} - - got := e.StringsAppend(key, fallback) - require.Equal(t, fallback, got) -} - -func TestInt_WhenSet(t *testing.T) { - - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - value := int(rand.NewSource(time.Now().UnixNano()).Int63()) - - t.Setenv(key, fmt.Sprintf("%d", value)) - - got := e.Int(key) - require.Equal(t, got, value) -} - -func TestInt_WhenNotSet(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.Error(t, err) }, - } - - key := uuid.NewString() - - got := e.Int(key) - require.Equal(t, 0, got) -} - -func TestInt_WhenNotSetFallback(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - fallback := int(rand.NewSource(time.Now().UnixNano()).Int63()) - - got := e.Int(key, fallback) - require.Equal(t, fallback, got) -} - -func TestBool_WhenSet(t *testing.T) { - - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - value := true - - t.Setenv(key, fmt.Sprintf("%t", value)) - - got := e.Bool(key) - require.Equal(t, got, value) -} - -func TestBool_WhenNotSet(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.Error(t, err) }, - } - - key := uuid.NewString() - - got := e.Bool(key) - require.Equal(t, false, got) -} - -func TestBool_WhenNotSetFallback(t *testing.T) { - e := env.Env{ - ErrorHandler: func(err error) { require.NoError(t, err) }, - } - - key := uuid.NewString() - fallback := true - - got := e.Bool(key, fallback) - require.Equal(t, fallback, got) -} diff --git a/go/pkg/events/topic.go b/go/pkg/events/topic.go index aa788c9e6d..2c0097d181 100644 --- a/go/pkg/events/topic.go +++ b/go/pkg/events/topic.go @@ -5,9 +5,8 @@ import ( "fmt" "sync" - "github.com/unkeyed/unkey/apps/agent/pkg/tracing" + "github.com/unkeyed/unkey/go/pkg/otel/tracing" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) type EventEmitter[E any] interface { @@ -53,8 +52,7 @@ func (t *topic[E]) Emit(ctx context.Context, event E) { t.mu.Lock() defer t.mu.Unlock() for _, l := range t.listeners { - var span trace.Span - _, span = tracing.Start(ctx, fmt.Sprintf("topic.Emit:%s", l.id)) + _, span := tracing.Start(ctx, fmt.Sprintf("topic.Emit:%s", l.id)) span.SetAttributes(attribute.Int("channelSize", len(l.ch))) l.ch <- event span.End() diff --git a/go/pkg/membership/bus.go b/go/pkg/membership/bus.go index 862c0d5849..569b678063 100644 --- a/go/pkg/membership/bus.go +++ b/go/pkg/membership/bus.go @@ -7,13 +7,16 @@ import ( "github.com/unkeyed/unkey/go/pkg/events" ) +// bus implements the memberlist.EventDelegate interface to handle and broadcast +// cluster membership events through typed channels. type bus struct { onJoin events.Topic[Member] onLeave events.Topic[Member] onUpdate events.Topic[Member] } -// NotifyJoin is invoked when a node is detected to have joined. +// NotifyJoin is called when a node joins the cluster. +// It broadcasts a join event with the new member's information to all subscribers. // The Node argument must not be modified. func (b *bus) NotifyJoin(node *memberlist.Node) { @@ -23,19 +26,19 @@ func (b *bus) NotifyJoin(node *memberlist.Node) { }) } -// NotifyLeave is invoked when a node is detected to have left. +// NotifyLeave is called when a node leaves the cluster. +// It broadcasts a leave event with the departing member's information to all subscribers. // The Node argument must not be modified. func (b *bus) NotifyLeave(node *memberlist.Node) { - b.onLeave.Emit(context.Background(), Member{ NodeID: node.Name, Addr: node.Addr.String(), }) } -// NotifyUpdate is invoked when a node is detected to have -// updated, usually involving the meta data. The Node argument -// must not be modified. +// NotifyUpdate is called when a node's metadata is updated. +// It broadcasts an update event with the member's updated information to all subscribers. +// The Node argument must not be modified. func (b *bus) NotifyUpdate(node *memberlist.Node) { b.onUpdate.Emit(context.Background(), Member{ diff --git a/go/pkg/membership/interface.go b/go/pkg/membership/interface.go index 560f6a0ae3..d3ad62ae18 100644 --- a/go/pkg/membership/interface.go +++ b/go/pkg/membership/interface.go @@ -3,8 +3,9 @@ package membership import "github.com/unkeyed/unkey/go/pkg/discovery" // Membership defines the interface for cluster membership management. -// It handles node discovery, joining/leaving clusters, member listing, -// and membership event subscriptions. +// It provides functionality for node discovery, cluster joining/leaving, +// member listing, and membership event subscriptions. The implementation +// uses a gossip protocol to maintain eventually consistent cluster state. type Membership interface { // Start initializes the membership system and joins the cluster using // the provided discovery mechanism. It should only be called once. @@ -42,4 +43,7 @@ type Membership interface { // // The returned channel will be closed when the membership system is shut down. SubscribeLeaveEvents() <-chan Member + + // Addr returns the node's advertised address. + Addr() string } diff --git a/go/pkg/membership/logger.go b/go/pkg/membership/logger.go index faa4f35f69..91f59175c0 100644 --- a/go/pkg/membership/logger.go +++ b/go/pkg/membership/logger.go @@ -8,8 +8,8 @@ import ( "github.com/unkeyed/unkey/go/pkg/logging" ) -// logger implements io.Writer interface to integrate with memberlist's logging system -// and routes logs to the application's structured logging system. +// logger implements io.Writer interface to integrate memberlist's logging system +// with the application's structured logging system. type logger struct { logger logging.Logger } diff --git a/go/pkg/membership/member.go b/go/pkg/membership/member.go index e196b688fc..f2e299c2d3 100644 --- a/go/pkg/membership/member.go +++ b/go/pkg/membership/member.go @@ -4,16 +4,23 @@ import ( "encoding/json" ) +// Member represents a node in the cluster with its identifying information +// and network address. type Member struct { - // Global unique identifier for the node + // NodeID is a globally unique identifier for the node NodeID string `json:"nodeId"` - Addr string `json:"addr"` + // Addr is the network address of the node + Addr string `json:"addr"` } +// Marshal encodes the Member into a JSON byte slice. +// Returns the encoded bytes or an error if marshaling fails. func (m Member) Marshal() ([]byte, error) { return json.Marshal(m) } +// Unmarshal decodes a JSON byte slice into the Member. +// Returns an error if unmarshaling fails. func (m *Member) Unmarshal(b []byte) error { return json.Unmarshal(b, m) } diff --git a/go/pkg/membership/memberlist.go b/go/pkg/membership/memberlist.go index 704c9b3b46..16c1d1db0c 100644 --- a/go/pkg/membership/memberlist.go +++ b/go/pkg/membership/memberlist.go @@ -16,11 +16,16 @@ import ( "github.com/unkeyed/unkey/go/pkg/retry" ) +// Config specifies the configuration options for creating a new membership instance. type Config struct { - NodeID string - Addr string + // NodeID is the unique identifier for this node + NodeID string + // Addr is the network address this node will listen on + Addr string + // GossipPort is the port used for cluster membership gossip protocol GossipPort int - Logger logging.Logger + // Logger is the logging interface used for membership-related logs + Logger logging.Logger } type membership struct { @@ -37,6 +42,9 @@ type membership struct { var _ Membership = (*membership)(nil) +// New creates a new membership instance with the provided configuration. +// It initializes the memberlist with default LAN configuration and sets up event buses. +// Returns the new membership instance and any error encountered during creation. func New(config Config) (*membership, error) { b := &bus{ @@ -144,3 +152,7 @@ func (m *membership) Members() ([]Member, error) { } return members, nil } + +func (m *membership) Addr() string { + return m.self.Addr +} diff --git a/go/pkg/membership/membership_test.go b/go/pkg/membership/membership_test.go index 757a920b69..5c6cb8e454 100644 --- a/go/pkg/membership/membership_test.go +++ b/go/pkg/membership/membership_test.go @@ -7,9 +7,10 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/unkeyed/unkey/apps/agent/pkg/logging" - "github.com/unkeyed/unkey/apps/agent/pkg/membership" - "github.com/unkeyed/unkey/apps/agent/pkg/port" + "github.com/unkeyed/unkey/go/pkg/discovery" + "github.com/unkeyed/unkey/go/pkg/logging" + "github.com/unkeyed/unkey/go/pkg/membership" + "github.com/unkeyed/unkey/go/pkg/port" ) var CLUSTER_SIZES = []int{3, 9, 36} @@ -19,31 +20,44 @@ func TestJoin2Nodes(t *testing.T) { freePort := port.New() m1, err := membership.New(membership.Config{ - NodeId: "node_1", - SerfAddr: fmt.Sprintf("localhost:%d", freePort.Get()), - RpcAddr: fmt.Sprintf("http://localhost:%d", freePort.Get()), - Logger: logging.New(nil), + NodeID: "node_1", + Addr: fmt.Sprintf("localhost:%d", freePort.Get()), + GossipPort: freePort.Get(), + Logger: logging.NewNoop(), }) require.NoError(t, err) m2, err := membership.New(membership.Config{ - NodeId: "node_2", - SerfAddr: fmt.Sprintf("localhost:%d", freePort.Get()), - RpcAddr: fmt.Sprintf("http://localhost:%d", freePort.Get()), - Logger: logging.New(nil), + NodeID: "node_2", + Addr: fmt.Sprintf("localhost:%d", freePort.Get()), + GossipPort: freePort.Get(), + Logger: logging.NewNoop(), }) require.NoError(t, err) - members, err := m1.Join() + err = m1.Start(&discovery.Static{Addrs: []string{}}) + require.NoError(t, err) + m1Members, err := m1.Members() + require.NoError(t, err) + require.Len(t, m1Members, 1) + + err = m2.Start(&discovery.Static{Addrs: []string{m1.Addr()}}) require.NoError(t, err) - require.Equal(t, 1, members) - _, err = m2.Join(m1.SerfAddr()) require.NoError(t, err) require.Eventually(t, func() bool { - members, err := m2.Members() + + m1Members, err := m1.Members() + require.NoError(t, err) + return len(m1Members) == 2 + + }, 10*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + + m2Members, err := m2.Members() require.NoError(t, err) - return len(members) == 2 + return len(m2Members) == 2 }, 10*time.Second, 100*time.Millisecond) @@ -79,10 +93,10 @@ func TestJoin_emits_join_event(t *testing.T) { var err error for i := 0; i < clusterSize; i++ { members[i], err = membership.New(membership.Config{ - NodeId: fmt.Sprintf("node_%d", i), - SerfAddr: fmt.Sprintf("localhost:%d", freePort.Get()), - RpcAddr: fmt.Sprintf("http://localhost:%d", freePort.Get()), - Logger: logging.New(nil), + NodeID: fmt.Sprintf("node_%d", i), + Addr: fmt.Sprintf("localhost:%d", freePort.Get()), + GossipPort: freePort.Get(), + Logger: logging.NewNoop(), }) require.NoError(t, err) @@ -95,22 +109,22 @@ func TestJoin_emits_join_event(t *testing.T) { go func() { for event := range joinEvents { joinMu.Lock() - join[event.NodeId] = true + join[event.Addr] = true joinMu.Unlock() } }() - serfAddrs := make([]string, 0) + peerAddrs := make([]string, 0) for _, m := range members { - _, err = m.Join(serfAddrs...) + err = m.Start(&discovery.Static{Addrs: peerAddrs}) require.NoError(t, err) - serfAddrs = append(serfAddrs, m.SerfAddr()) + peerAddrs = append(peerAddrs, m.Addr()) } for _, n := range members[1:] { require.Eventually(t, func() bool { joinMu.RLock() - ok := join[n.NodeId()] + ok := join[n.Addr()] joinMu.RUnlock() return ok }, 30*time.Second, 100*time.Millisecond) @@ -135,7 +149,7 @@ func TestLeave_emits_leave_event(t *testing.T) { go func() { for event := range leaveEvents { leftMu.Lock() - left[event.NodeId] = true + left[event.Addr] = true leftMu.Unlock() } }() @@ -148,7 +162,7 @@ func TestLeave_emits_leave_event(t *testing.T) { for _, n := range nodes[1:] { require.Eventually(t, func() bool { leftMu.RLock() - l := left[n.NodeId()] + l := left[n.Addr()] leftMu.RUnlock() return l }, 30*time.Second, 100*time.Millisecond) @@ -165,20 +179,20 @@ func runMany(t *testing.T, n int) []membership.Membership { var err error for i := 0; i < n; i++ { members[i], err = membership.New(membership.Config{ - NodeId: fmt.Sprintf("node_%d", i), - SerfAddr: fmt.Sprintf("localhost:%d", freePort.Get()), - RpcAddr: fmt.Sprintf("http://localhost:%d", freePort.Get()), - Logger: logging.New(nil), + NodeID: fmt.Sprintf("node_%d", i), + GossipPort: freePort.Get(), + Addr: fmt.Sprintf("localhost:%d", freePort.Get()), + Logger: logging.NewNoop(), }) require.NoError(t, err) } - serfAddrs := make([]string, 0) + peerAddrs := make([]string, 0) for _, m := range members { - _, err = m.Join(serfAddrs...) + err = m.Start(discovery.Static{Addrs: peerAddrs}) require.NoError(t, err) - serfAddrs = append(serfAddrs, m.SerfAddr()) + peerAddrs = append(peerAddrs, m.Addr()) } for _, m := range members { diff --git a/go/pkg/otel/grafana.go b/go/pkg/otel/grafana.go new file mode 100644 index 0000000000..fb1b1c22dc --- /dev/null +++ b/go/pkg/otel/grafana.go @@ -0,0 +1,72 @@ +package otel + +import ( + "context" + "fmt" + "time" + + "github.com/unkeyed/unkey/go/pkg/otel/metrics" + "github.com/unkeyed/unkey/go/pkg/otel/tracing" + "github.com/unkeyed/unkey/go/pkg/shutdown" + "go.opentelemetry.io/contrib/instrumentation/runtime" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/metric" + + "go.opentelemetry.io/otel/sdk/trace" +) + +type Config struct { + + // WithEndpoint sets the target endpoint the Exporter will connect to. This + // endpoint is specified as a host and optional port, no path or scheme should + // be included. + GrafanaEndpoint string + + Application string + Version string +} + +// InitGrafana initializes the global tracer and metric providers. +// It returns a slice of ShutdownFuncs that should be called when the +// application is shutting down. +func InitGrafana(ctx context.Context, config Config) ([]shutdown.ShutdownFn, error) { + shutdowns := make([]shutdown.ShutdownFn, 0) + + traceExporter, err := otlptrace.New(ctx, otlptracehttp.NewClient()) + if err != nil { + return nil, fmt.Errorf("unable to init grafana tracing: %w", err) + } + shutdowns = append(shutdowns, traceExporter.Shutdown) + + traceProvider := trace.NewTracerProvider(trace.WithBatcher(traceExporter)) + shutdowns = append(shutdowns, traceProvider.Shutdown) + + tracing.SetGlobalTraceProvider(traceProvider) + + metricExporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpoint(config.GrafanaEndpoint)) + if err != nil { + return nil, fmt.Errorf("unable to init grafana metrics: %w", err) + } + shutdowns = append(shutdowns, metricExporter.Shutdown) + + meterProvider := metric.NewMeterProvider(metric.WithReader(metric.NewPeriodicReader(metricExporter))) + shutdowns = append(shutdowns, meterProvider.Shutdown) + + err = metrics.Init(meterProvider.Meter(config.Application)) + if err != nil { + return nil, fmt.Errorf("unable to init custom metrics: %w", err) + } + // Collect runtime metrics as well + err = runtime.Start( + runtime.WithMeterProvider(meterProvider), + runtime.WithMinimumReadMemStatsInterval(time.Second), + ) + if err != nil { + return nil, fmt.Errorf("unable to init runtime metrics: %w", err) + } + + return shutdowns, nil +} diff --git a/go/pkg/otel/metrics/metrics.go b/go/pkg/otel/metrics/metrics.go new file mode 100644 index 0000000000..b67170f01c --- /dev/null +++ b/go/pkg/otel/metrics/metrics.go @@ -0,0 +1,181 @@ +// Package metrics provides OpenTelemetry instrumentation for monitoring application behavior. +// It exposes global metric instances, initialized with no-op +// implementations by default that can be replaced with real implementations via Init(). +package metrics + +import ( + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" +) + +// init initializes the metrics package with a no-op meter provider. +// This ensures the package can be safely imported even if not explicitly initialized. +// For production use, call Init() with a proper meter provider. +func init() { + p := noop.NewMeterProvider() + m := p.Meter("noop") + + err := Init(m) + if err != nil { + panic(err) + } +} + +// Global metric instances provide easy access to commonly used metrics. +// These are initialized during package initialization and are safe for concurrent use. +var ( + // Http contains metrics related to HTTP operations + Http struct { + // Requests counts all incoming API requests. + // Use this counter to monitor API traffic patterns. + // + // Attributes: + // - path (string): The HTTP path of the request (e.g., "/api/v1/users") + // - status (int): The HTTP status code of the response (e.g., 200, 404, 500) + // + // Example: + // metrics.Http.Requests.Add(ctx, 1, metric.WithAttributes( + // attribute.String("path", "/api/v1/users"), + // attribute.Int("status", 200), + // )) + Requests metric.Int64Counter + } + + // Cache contains metrics related to cache operations + Cache struct { + // Hits tracks the number of cache read operations. + // Use this to monitor cache hit rates and usage patterns. + // + // Attributes: + // - resource (string): The type of resource being cached (e.g., "user_profile") + // + // Example: + // metrics.Cache.Hits.Record(ctx, 1, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + Hits metric.Int64Gauge + + // Misses tracks the number of cache read operations. + // Use this to monitor cache hit rates and usage patterns. + // + // Attributes: + // - resource (string): The type of resource being cached (e.g., "user_profile") + // + // Example: + // metrics.Cache.Misses.Record(ctx, 1, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + Misses metric.Int64Gauge + + // Writes tracks the number of cache write operations. + // Use this to monitor cache write rates and usage patterns. + // + // Attributes: + // - resource (string): The type of resource being cached (e.g., "user_profile") + // + // Example: + // metrics.Cache.Writes.Add(ctx, 1, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + Writes metric.Int64Counter + + // Evicted tracks the number of cache delete operations. + // Use this to monitor cache delete rates and usage patterns. + // + // Attributes: + // - resource (string): The type of resource being cached (e.g., "user_profile") + // + // Example: + // metrics.Cache.Evicted.Record(ctx, 1, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + Evicted metric.Int64Gauge + + // ReadLatency measures the duration of cache read operations in milliseconds. + // This histogram helps track cache performance and identify slowdowns. + // + // Attributes: + // - latency (int64): The duration of the operation in milliseconds + // - resource (string): The type of resource being read (e.g., "user_profile") + // + // Example: + // metrics.Cache.ReadLatency.Record(ctx, 42, metric.WithAttributes( + // attribute.String("resource", "user_profile"), + // )) + ReadLatency metric.Int64Histogram + + Size metric.Int64Gauge + } +) + +// Init initializes the metrics with the provided meter. +// This function must be called before using any metrics in production, typically +// during application startup. It replaces the default no-op implementations with +// real metrics that will be reported to your metrics backend. +// +// Parameters: +// - m: The meter instance to use for creating metrics +// +// Returns: +// - error: Any error encountered during metric initialization +// +// Example: +// +// provider := prometheus.NewMeterProvider() +// meter := provider.Meter("my-service") +// if err := metrics.Init(meter); err != nil { +// log.Fatal("failed to initialize metrics:", err) +// } +func Init(m metric.Meter) error { + var err error + Http.Requests, err = m.Int64Counter("http_request", + metric.WithDescription("How many api requests we handle."), + ) + if err != nil { + return err + } + + Cache.Hits, err = m.Int64Gauge("cache_hit", + metric.WithDescription("Cache hits"), + ) + if err != nil { + return err + } + Cache.Misses, err = m.Int64Gauge("cache_miss", + metric.WithDescription("Cache misses"), + ) + if err != nil { + return err + } + + Cache.Writes, err = m.Int64Counter("cache_write", + metric.WithDescription("Cache writes"), + ) + if err != nil { + return err + } + + Cache.Evicted, err = m.Int64Gauge("cache_evicted", + metric.WithDescription("Evicted entries"), + ) + if err != nil { + return err + } + + Cache.ReadLatency, err = m.Int64Histogram("cache_read_latency", + metric.WithDescription("The latency of read operations"), + metric.WithUnit("ms"), + ) + if err != nil { + return err + } + + Cache.Size, err = m.Int64Gauge("cache_size", + metric.WithDescription("How many entries are stored in the cache."), + ) + if err != nil { + return err + } + + return nil +} diff --git a/go/pkg/tracing/schema.go b/go/pkg/otel/schema.go similarity index 87% rename from go/pkg/tracing/schema.go rename to go/pkg/otel/schema.go index 5d0674f53b..a52a749428 100644 --- a/go/pkg/tracing/schema.go +++ b/go/pkg/otel/schema.go @@ -1,4 +1,4 @@ -package tracing +package otel import "fmt" diff --git a/go/pkg/tracing/trace.go b/go/pkg/otel/tracing/trace.go similarity index 69% rename from go/pkg/tracing/trace.go rename to go/pkg/otel/tracing/trace.go index 4148105ff0..fba2701c61 100644 --- a/go/pkg/tracing/trace.go +++ b/go/pkg/otel/tracing/trace.go @@ -3,6 +3,7 @@ package tracing import ( "context" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" ) @@ -13,12 +14,23 @@ func init() { globalTracer = noop.NewTracerProvider() } +func SetGlobalTraceProvider(t trace.TracerProvider) { + globalTracer = t +} + func Start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { // nolint:spancheck // the caller will end the span return globalTracer.Tracer("main").Start(ctx, name, opts...) } +func RecordError(span trace.Span, err error) { + if err == nil { + return + } + span.SetStatus(codes.Error, err.Error()) +} + func GetGlobalTraceProvider() trace.TracerProvider { return globalTracer } diff --git a/go/pkg/tracing/util.go b/go/pkg/otel/util.go similarity index 94% rename from go/pkg/tracing/util.go rename to go/pkg/otel/util.go index 484e14c67e..1f7f3661fb 100644 --- a/go/pkg/tracing/util.go +++ b/go/pkg/otel/util.go @@ -1,4 +1,4 @@ -package tracing +package otel import ( "go.opentelemetry.io/otel/codes" diff --git a/go/pkg/port/free.go b/go/pkg/port/free.go new file mode 100644 index 0000000000..395fd2b4d5 --- /dev/null +++ b/go/pkg/port/free.go @@ -0,0 +1,66 @@ +package port + +import ( + "fmt" + "math/rand/v2" + "net" + "sync" +) + +// FreePort is a utility to find a free port. +type FreePort struct { + mu sync.RWMutex + min int + max int + attempts int + + // The caller may request multiple ports without binding them immediately + // so we need to keep track of which ports are assigned. + assigned map[int]bool +} + +func New() *FreePort { + return &FreePort{ + min: 10000, + max: 65535, + attempts: 10, + assigned: map[int]bool{}, + mu: sync.RWMutex{}, + } +} +func (f *FreePort) Get() int { + port, err := f.GetWithError() + if err != nil { + panic(err) + } + + return port +} + +// Get returns a free port. +func (f *FreePort) GetWithError() (int, error) { + f.mu.Lock() + defer f.mu.Unlock() + + for i := 0; i < f.attempts; i++ { + + // nolint:gosec + // This isn't cryptography + port := rand.IntN(f.max-f.min) + f.min + if f.assigned[port] { + continue + } + + ln, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: port, Zone: ""}) + if err != nil { + continue + } + err = ln.Close() + if err != nil { + return -1, err + } + f.assigned[port] = true + return port, nil + } + return -1, fmt.Errorf("could not find a free port, maybe increase attempts?") +} diff --git a/go/pkg/shutdown/shutdown.go b/go/pkg/shutdown/shutdown.go new file mode 100644 index 0000000000..db114e14f8 --- /dev/null +++ b/go/pkg/shutdown/shutdown.go @@ -0,0 +1,5 @@ +package shutdown + +import "context" + +type ShutdownFn func(ctx context.Context) error diff --git a/go/pkg/tracing/axiom.go b/go/pkg/tracing/axiom.go deleted file mode 100644 index c4e7ca4ed7..0000000000 --- a/go/pkg/tracing/axiom.go +++ /dev/null @@ -1,30 +0,0 @@ -package tracing - -import ( - "context" - "fmt" - - axiom "github.com/axiomhq/axiom-go/axiom/otel" -) - -type Config struct { - Dataset string - Application string - Version string - AxiomToken string -} - -// Closer is a function that closes the global tracer. -type Closer func() error - -func Init(ctx context.Context, config Config) (Closer, error) { - tp, err := axiom.TracerProvider(ctx, config.Dataset, config.Application, config.Version, axiom.SetNoEnv(), axiom.SetToken(config.AxiomToken)) - if err != nil { - return nil, fmt.Errorf("unable to init tracing: %w", err) - } - globalTracer = tp - - return func() error { - return tp.Shutdown(context.Background()) - }, nil -} diff --git a/go/pkg/zen/middleware_tracing.go b/go/pkg/zen/middleware_tracing.go index 12aaeed68d..8b162624ab 100644 --- a/go/pkg/zen/middleware_tracing.go +++ b/go/pkg/zen/middleware_tracing.go @@ -1,8 +1,6 @@ package zen -import ( - "github.com/unkeyed/unkey/go/pkg/tracing" -) +import "github.com/unkeyed/unkey/go/pkg/otel/tracing" func WithTracing() Middleware {