Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/agent_build_publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ jobs:

- name: Get Version
run: |
VERSION=$(git describe --tags --always)
VERSION=${VERSION#agent/}
# Extract everything after "agent/"
# For example, from "agent/v1.2.3" we get "v1.2.3"
VERSION=${GITHUB_REF#refs/tags/agent/}
echo "VERSION=$VERSION" >> $GITHUB_ENV

- name: Get tags
Expand Down
12 changes: 12 additions & 0 deletions deployment/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ services:
- mysql
- redis
- clickhouse
- otel
environment:
UNKEY_HTTP_PORT: 7070
UNKEY_CLUSTER: true
Expand All @@ -61,6 +62,9 @@ services:
UNKEY_CLUSTER_DISCOVERY_REDIS_URL: "redis://redis:6379"
UNKEY_DATABASE_PRIMARY_DSN: "mysql://unkey:password@tcp(mysql:3900)/unkey?parseTime=true"
UNKEY_CLICKHOUSE_URL: "clickhouse://default:password@clickhouse:9000"
UNKEY_OTEL: true
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel:4318"
OTEL_EXPORTER_OTLP_PROTOCOL: "http/protobuf"

redis:
image: redis:latest
Expand Down Expand Up @@ -169,6 +173,14 @@ services:
- clickhouse
- chproxy

otel:
image: grafana/otel-lgtm:latest
container_name: otel
ports:
- 3000:3000
- 4317:4317
- 4318:4318

volumes:
mysql:
clickhouse:
Expand Down
14 changes: 11 additions & 3 deletions go/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ tasks:
cmds:
- go test -json -race -failfast ./... -timeout=30m | tparse -all -progress -smallscreen

test-e2e:
test-integration:
env:
SIMULATON_TEST: true
INTEGRATION_TEST: true
OTEL_EXPORTER_OTLP_ENDPOINT: http://localhost:4318
OTEL_EXPORTER_OTLP_PROTOCOL: http/protobuf
deps:
- otel
cmds:
- go test -v -failfast -timeout=30m ./apps/api/integration/...

otel:
cmds:
- task: test
- docker compose -f ../deployment/docker-compose.yaml up otel -d

build:
cmds:
Expand Down
2 changes: 1 addition & 1 deletion go/apps/api/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestContextCancellation(t *testing.T) {
Region: "test-region",
Clock: nil, // Will use real clock
ClusterEnabled: false, // Disable clustering for simpler test
ClusterNodeID: uid.New("node"),
ClusterInstanceID: uid.New(uid.InstancePrefix),
LogsColor: false,
ClickhouseURL: "",
DatabasePrimary: dbDsn,
Expand Down
8 changes: 4 additions & 4 deletions go/apps/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type Config struct {

ClusterEnabled bool

// ClusterNodeID is the unique identifier for this node within the cluster
ClusterNodeID string
// ClusterInstanceID is the unique identifier for this instance within the cluster
ClusterInstanceID string

// --- Advertise Address configuration ---

Expand Down Expand Up @@ -76,8 +76,8 @@ type Config struct {
func (c Config) Validate() error {

if c.ClusterEnabled {
err := assert.Multi(
assert.NotEmpty(c.ClusterNodeID, "node id must not be empty"),
err := assert.All(
assert.NotEmpty(c.ClusterInstanceID, "instance id must not be empty"),
assert.Greater(c.ClusterRpcPort, 0),
assert.Greater(c.ClusterGossipPort, 0),
assert.True(c.ClusterAdvertiseAddrStatic != "" || c.ClusterAdvertiseAddrAwsEcsMetadata),
Expand Down
172 changes: 172 additions & 0 deletions go/apps/api/integration/harness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package integration

import (
"context"
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/unkeyed/unkey/go/apps/api"
"github.com/unkeyed/unkey/go/pkg/db"
"github.com/unkeyed/unkey/go/pkg/otel/logging"
"github.com/unkeyed/unkey/go/pkg/port"
"github.com/unkeyed/unkey/go/pkg/testutil/containers"
"github.com/unkeyed/unkey/go/pkg/testutil/seed"
)

// ClusterNode represents a running instance of the API server
type ClusterNode struct {
InstanceID string
HttpPort int
RPCPort int
GossipPort int
}

// Harness is a test harness for creating and managing a cluster of API nodes
type Harness struct {
t *testing.T
ctx context.Context
cancel context.CancelFunc
nodes []ClusterNode
ports *port.FreePort
containerMgr *containers.Containers
Seed *seed.Seeder
dbDSN string
DB db.Database
}

// Config contains configuration options for the test harness
type Config struct {
// NumNodes is the number of API nodes to create in the cluster
NumNodes int
}

// New creates a new cluster test harness
func New(t *testing.T, config Config) *Harness {
t.Helper()

require.Greater(t, config.NumNodes, 0)
ctx, cancel := context.WithCancel(context.Background())

containerMgr := containers.New(t)

dbDSN := containerMgr.RunMySQL()
db, err := db.New(db.Config{
Logger: logging.NewNoop(),
PrimaryDSN: dbDSN,
ReadOnlyDSN: "",
})
require.NoError(t, err)

h := &Harness{
t: t,
ctx: ctx,
cancel: cancel,
ports: port.New(),
containerMgr: containerMgr,
nodes: []ClusterNode{},
Seed: seed.New(t, db),
dbDSN: dbDSN,
DB: db,
}

t.Cleanup(func() {
h.t.Log("Shutting down test cluster...")
h.cancel()
})

h.Seed.Seed(ctx)

// Prepare data for gossip-based cluster discovery
var joinAddrs []string

// Create and start each node
for i := 0; i < config.NumNodes; i++ {
node := h.createNode(i, joinAddrs)

// Add this node's gossip address to the joinAddrs for subsequent nodes
joinAddrs = append(joinAddrs, fmt.Sprintf("localhost:%d", node.GossipPort))
h.nodes = append(h.nodes, node)
}
return h
}

func (h *Harness) Resources() seed.Resources {
return h.Seed.Resources
}

// createNode creates and starts a single API node
func (h *Harness) createNode(index int, joinAddrs []string) ClusterNode {
h.t.Helper()

instanceID := fmt.Sprintf("i_%d", index)
httpPort := h.ports.Get()
rpcPort := h.ports.Get()
gossipPort := h.ports.Get()

nodeConfig := api.Config{
Platform: "test",
Image: "test",
HttpPort: httpPort,
Region: "test-region",
Clock: nil, // Will use real clock
ClusterEnabled: true,
ClusterInstanceID: instanceID,
ClusterAdvertiseAddrStatic: "localhost",
ClusterRpcPort: rpcPort,
ClusterGossipPort: gossipPort,
ClusterDiscoveryStaticAddrs: joinAddrs,
ClusterDiscoveryRedisURL: "",
ClusterAdvertiseAddrAwsEcsMetadata: false,
DatabasePrimary: h.dbDSN,
DatabaseReadonlyReplica: "",
LogsColor: false,
ClickhouseURL: "",
OtelEnabled: os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "",
}

// Start the node in a separate goroutine
go func() {
if err := api.Run(h.ctx, nodeConfig); err != nil {
// If this is a planned shutdown (context canceled), don't fail the test
if h.ctx.Err() == nil {
h.t.Errorf("Node %s failed to run: %v", instanceID, err)
}
}
}()

// Ensure the node is up and running
require.Eventually(h.t, func() bool {
res, err := http.Get(fmt.Sprintf("http://localhost:%d/v2/liveness", httpPort))
if err != nil {
return false
}
defer res.Body.Close()
return res.StatusCode == http.StatusOK
}, 15*time.Second, 100*time.Millisecond, "API node %s failed to start", instanceID)

h.t.Logf("Node %s started and healthy", instanceID)

return ClusterNode{
InstanceID: instanceID,
HttpPort: httpPort,
RPCPort: rpcPort,
GossipPort: gossipPort,
}
}

// GetNodes returns all nodes in the cluster
func (h *Harness) GetNodes() []ClusterNode {
return h.nodes
}

// GetNode returns a specific node by index
func (h *Harness) GetNode(index int) ClusterNode {
if index < 0 || index >= len(h.nodes) {
h.t.Fatalf("Invalid node index: %d", index)
}
return h.nodes[index]
}
63 changes: 63 additions & 0 deletions go/apps/api/integration/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package integration

import (
"bytes"
"encoding/json"
"fmt"
"io"
"math/rand/v2"
"net/http"

"github.com/stretchr/testify/require"
)

type TestResponse[TBody any] struct {
Status int
Headers http.Header
Body TBody
RawBody string
}

func CallNode[Req any, Res any](h *Harness, node ClusterNode, method string, path string, headers http.Header, req Req) TestResponse[Res] {
h.t.Helper()

url := fmt.Sprintf("http://localhost:%d%s", node.HttpPort, path)

body := new(bytes.Buffer)
err := json.NewEncoder(body).Encode(req)
require.NoError(h.t, err)

httpReq, err := http.NewRequest(method, url, body)
require.NoError(h.t, err)

httpReq.Header = headers
if httpReq.Header == nil {
httpReq.Header = http.Header{}
}

httpRes, err := http.DefaultClient.Do(httpReq)
require.NoError(h.t, err)
defer httpRes.Body.Close()

resBody, err := io.ReadAll(httpRes.Body)
require.NoError(h.t, err)

var res Res
err = json.Unmarshal(resBody, &res)
require.NoError(h.t, err, fmt.Sprintf("failed to decode response body: %s", string(resBody)))

return TestResponse[Res]{
Status: httpRes.StatusCode,
Headers: httpRes.Header,
Body: res,
RawBody: string(resBody),
}
}
Comment on lines +21 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle non-200 status code responses.

Currently, the function decodes JSON and checks for errors only in I/O operations. If an endpoint returns an error status (e.g., HTTP 500), the JSON unmarshal still executes. Consider explicitly checking and handling unexpected status codes to provide clearer failure messages in your tests.

 if httpRes.StatusCode < 200 || httpRes.StatusCode >= 300 {
-   // currently unhandled
+   require.FailNowf(h.t, "unexpected status code %d", httpRes.StatusCode)
 }

Committable suggestion skipped: line range outside the PR's diff.


func CallRandomNode[Req any, Res any](h *Harness, method string, path string, headers http.Header, req Req) TestResponse[Res] {
h.t.Helper()
// nolint:gosec
node := h.nodes[rand.IntN(len(h.nodes))]
return CallNode[Req, Res](h, node, method, path, headers, req)

}
Loading
Loading