diff --git a/Packages.md b/Packages.md index a04f36b..c9a5a3a 100644 --- a/Packages.md +++ b/Packages.md @@ -66,7 +66,7 @@ grpcpool is a pool of grpc.ClientConns that can be used to make requests to a gr Documentation can be found at [grpcpool-docs] ## [Workers] -Workers is a worker lifecycle library that manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown. Built on [suture](https://github.com/thejerf/suture), it provides a builder pattern for defining workers, helpers for common patterns (periodic tasks, channel consumers, batch processors), and dynamic child worker management via `WorkerContext`. See the [Workers howto](/howto/workers/) for usage examples. +Workers is a worker lifecycle library that manages background goroutines with automatic panic recovery, configurable restart with backoff, and structured shutdown. Built on [suture](https://github.com/thejerf/suture), it provides a builder pattern for defining workers, helpers for common patterns (periodic tasks, channel consumers, batch processors), dynamic child worker management, composable middleware (recover, tracing, distributed locking, timing, per-cycle timeout, structured logging), and jitter support to desynchronize periodic workers. See the [Workers howto](/howto/workers/) for usage examples. Documentation can be found at [workers-docs] diff --git a/howto/workers.md b/howto/workers.md index db829e7..8ad82ea 100644 --- a/howto/workers.md +++ b/howto/workers.md @@ -3,7 +3,7 @@ layout: default title: "Workers" parent: "How To" nav_order: 15 -description: "How to use go-coldbrew/workers to manage background goroutines with panic recovery, restart, and structured shutdown." +description: "How to use go-coldbrew/workers to manage background goroutines with middleware, jitter, panic recovery, restart, and structured shutdown." --- ## Table of contents {: .no_toc .text-delta } @@ -13,127 +13,493 @@ description: "How to use go-coldbrew/workers to manage background goroutines wit ## Overview -[workers] is a worker lifecycle library that manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown. It is built on top of [suture], an Erlang-inspired supervisor tree library for Go. +[workers] is a worker lifecycle library that manages background goroutines with automatic panic recovery, configurable restart with backoff, and structured shutdown. It is built on top of [suture], an Erlang-inspired supervisor tree library for Go. Every worker runs inside its own supervisor subtree: ```text Root Supervisor ├── Worker-A supervisor -│ ├── Worker-A run func +│ ├── Worker-A service (middleware → handler) │ ├── Child-A1 (added dynamically) │ └── Child-A2 └── Worker-B supervisor - └── Worker-B run func + └── Worker-B service (middleware → handler) ``` **Key properties:** -- **Scoped lifecycle** — when a parent stops, all its children stop -- **Independent restart** — each worker restarts independently with exponential backoff -- **Panic recovery** — panics are caught and converted to errors by suture -- **Tracing** — each worker execution gets an OTEL span via [tracing] -- **Dynamic children** — workers can spawn/remove child workers at runtime +- **Scoped lifecycle** — when a parent stops, all its children stop automatically. No manual cleanup or `sync.WaitGroup` needed. +- **Restart by default** — workers restart with exponential backoff on failure. One-shot workers opt out with `WithRestart(false)` or return `ErrDoNotRestart`. +- **Two-layer panic recovery** — suture catches panics at the supervisor level (restarts the worker). `middleware.Recover` catches panics per-cycle (converts to error without a full restart). Use both for defense in depth. +- **Composable middleware** — tracing, structured logging, distributed locking, per-cycle timeout, and duration metrics as gRPC-style interceptors. Write your own with a single function. +- **Jitter** — desynchronize periodic workers to prevent thundering herd. Per-worker or run-level default. +- **Dynamic children** — workers can spawn and remove child workers at runtime via `Add`/`Remove`. Children inherit middleware, metrics, and scoped lifecycle. +- **Pluggable metrics** — Prometheus out of the box, or implement the `Metrics` interface for Datadog, StatsD, etc. Per-attempt lifetime and per-cycle duration tracked separately. +- **Handler cleanup** — `CycleHandler.Close()` is called exactly once when the worker permanently stops, for resource cleanup (DB connections, leases). ## Quick Start ```go +package main + import ( "context" - "log" + "log/slog" "os" "os/signal" "time" "github.com/go-coldbrew/workers" + "github.com/go-coldbrew/workers/middleware" ) -ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) -defer cancel() +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + err := workers.Run(ctx, []*workers.Worker{ + // Long-running worker — blocks until ctx is cancelled + workers.NewWorker("kafka").HandlerFunc(consume), -if err := workers.Run(ctx, []*workers.Worker{ - workers.NewWorker("kafka", consume), - workers.NewWorker("cleanup", cleanup).Every(5 * time.Minute).WithRestart(true), -}); err != nil { - log.Fatal(err) + // Periodic worker — runs cleanup every 5 minutes with jitter + workers.NewWorker("cleanup").HandlerFunc(cleanup). + Every(5 * time.Minute).WithJitter(10), + }, + // Standard observability: panic recovery, log context, tracing, structured logging + workers.WithInterceptors(middleware.DefaultInterceptors()...), + ) + if err != nil { + slog.Error("workers failed", "error", err) + } } + +// consume and cleanup have signature: +// func(ctx context.Context, info *workers.WorkerInfo) error ``` `Run` blocks until `ctx` is cancelled and all workers have exited. +## Why Workers (vs Plain Goroutines) + +With plain goroutines, you manage lifecycle manually: + +```go +// Before: manual goroutine management +ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) +defer cancel() + +var wg sync.WaitGroup +wg.Add(1) +go func() { + defer wg.Done() + // no panic recovery — crashes the process + // no restart — dies permanently on error + // no structured shutdown — must coordinate ctx + wg manually + // no distributed locking — runs on every pod + consume(ctx) +}() +wg.Wait() +``` + +With workers, the framework handles all of that: + +```go +// After: workers handle lifecycle +workers.Run(ctx, []*workers.Worker{ + workers.NewWorker("kafka").HandlerFunc(consume). + Interceptors( + middleware.Recover(onPanic), + middleware.Tracing(), + middleware.DistributedLock(redisLocker), + ), +}) +``` + +**What you get for free:** panic recovery, configurable restart with exponential backoff, scoped lifecycle (children stop when parents stop), composable middleware (tracing, logging, distributed locking, per-cycle timeout), jitter for periodic workers, and pluggable metrics. Distributed locking ensures only one instance runs a job across pods — no manual coordination. + ## Creating Workers -Use `NewWorker` with a name and a run function. The run function receives a `WorkerContext` (which extends `context.Context`) and should block until the context is cancelled or an error occurs: +Use `NewWorker` with a name, then set a handler via `HandlerFunc` (for plain functions) or `Handler` (for structs with cleanup): ```go -w := workers.NewWorker("my-worker", func(ctx workers.WorkerContext) error { - log.Info(ctx, "msg", "started") - // Worker name and attempt are automatically added to the log context - // by the framework — all log calls using this ctx include them. +// Uses github.com/go-coldbrew/log for structured logging +w := workers.NewWorker("my-worker").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { + log.Info(ctx, "msg", "started", "worker", info.GetName(), "attempt", info.GetAttempt()) <-ctx.Done() return ctx.Err() }) ``` +For handlers that need resource cleanup, implement the `CycleHandler` interface. `Close()` is called exactly once when the worker permanently stops: + +```go +type batchProcessor struct { + db *sql.DB + conn *sql.Conn // dedicated connection for this worker +} + +func NewBatchProcessor(db *sql.DB) (*batchProcessor, error) { + conn, err := db.Conn(context.Background()) + if err != nil { + return nil, err + } + return &batchProcessor{db: db, conn: conn}, nil +} + +func (b *batchProcessor) RunCycle(ctx context.Context, info *workers.WorkerInfo) error { + rows, err := b.conn.QueryContext(ctx, + "SELECT id, payload FROM jobs WHERE status = 'pending' LIMIT 100") + if err != nil { + return err + } + defer rows.Close() + return processBatch(ctx, rows) +} + +func (b *batchProcessor) Close() error { + // Called once on permanent stop — release the dedicated connection + return b.conn.Close() +} +``` + +The handler receives a `context.Context` for cancellation and a `*WorkerInfo` for worker metadata. + +## Handler Return Values + +| Return value | Long-running worker (no `Every`) | Periodic worker (with `Every`) | +|---|---|---| +| `return nil` | Worker stops permanently | Cycle succeeded — next tick fires | +| `return error` | Restarts with backoff (if restart enabled) | Restarts with backoff (if restart enabled) | +| `return ctx.Err()` | Clean shutdown | Clean shutdown | +| `return workers.ErrDoNotRestart` | Permanent stop | Permanent stop | + +**Long-running workers** should block on `<-ctx.Done()`, then return `ctx.Err()`. Returning nil without waiting for ctx cancellation stops the worker permanently. + +**Periodic workers** run the handler once per tick. Return nil for success (next tick fires normally). Return an error to trigger restart. The `Every` wrapper manages the tick loop — your handler just processes one cycle. + +### ErrDoNotRestart + +Return `workers.ErrDoNotRestart` from a handler to signal permanent completion — the supervisor will not restart the worker even though restart is enabled by default. `ChannelWorker` and `BatchChannelWorker` return this automatically when their channel is closed. + +```go +func processQueue(ctx context.Context, info *workers.WorkerInfo) error { + item, ok := queue.Dequeue(ctx) + if !ok { + return workers.ErrDoNotRestart // queue exhausted + } + return process(ctx, item) +} +``` + ## Builder Methods | Method | Description | Default | |--------|-------------|---------| -| `WithRestart(true)` | Restart on failure with backoff | `false` (exit on error) | -| `Every(duration)` | Wrap in a periodic ticker loop | — | +| `HandlerFunc(fn)` | Set handler from a plain function | — | +| `Handler(h)` | Set handler from a `CycleHandler` struct | — | +| `WithRestart(false)` | Disable restart (one-shot worker) | `true` (restart with backoff) | +| `Every(duration)` | Run periodically on a fixed interval | — | +| `WithJitter(percent)` | Randomize tick interval by ±percent (requires `Every`) | inherit run-level | +| `WithInitialDelay(d)` | Delay first tick (requires `Every`) | — | +| `Interceptors(mw...)` | Replace worker-level middleware | — | +| `AddInterceptors(mw...)` | Append to worker-level middleware | — | | `WithFailureBackoff(d)` | Duration between restarts | 15s (suture default) | -| `WithFailureThreshold(n)` | Max failures before giving up | 5 (suture default) | -| `WithFailureDecay(r)` | Rate failures decay per second | 1.0 (suture default) | -| `WithBackoffJitter(j)` | Random jitter on backoff | — | +| `WithFailureThreshold(n float64)` | Max failures before supervisor gives up | 5.0 (suture default) | +| `WithFailureDecay(rate float64)` | Rate at which failure count decays (per second) | 1.0 (suture default) | +| `WithBackoffJitter(j)` | Random jitter on restart backoff | none | | `WithTimeout(d)` | Max time to wait for graceful stop | 10s (suture default) | +| `WithMetrics(m Metrics)` | Per-worker metrics override | inherit from parent/run | Example with full configuration: ```go -workers.NewWorker("resilient-consumer", consume). - WithRestart(true). +workers.NewWorker("resilient-consumer").HandlerFunc(consume). + Every(15 * time.Second). + WithJitter(10). + WithInitialDelay(5 * time.Second). + Interceptors( + middleware.Recover(onPanic), + middleware.Tracing(), + ). WithFailureBackoff(5 * time.Second). WithFailureThreshold(10). WithTimeout(30 * time.Second) ``` -## WorkerContext +## Jitter + +When many workers share the same base interval (e.g. 15s), they synchronize and spike downstream services — the [thundering herd](https://en.wikipedia.org/wiki/Thundering_herd_problem) problem. Jitter desynchronizes ticks by randomizing each interval within a configurable range. + +### Per-worker jitter + +```go +workers.NewWorker("poller").HandlerFunc(poll). + Every(15 * time.Second). + WithJitter(10) // each tick is within [13.5s, 16.5s) +``` + +### Run-level default + +Apply jitter to all periodic workers with `WithDefaultJitter`: + +```go +workers.Run(ctx, myWorkers, workers.WithDefaultJitter(10)) +``` + +Worker-level `WithJitter` takes precedence over the run-level default. Setting `WithJitter(0)` explicitly disables jitter for a specific worker even when a run-level default is set. + +### Formula + +On each tick: +```text +spread = base × percent ÷ 100 +jittered = base − spread + rand(2 × spread) +``` + +The effective interval is clamped to a minimum of 1ms (never zero or negative). Each tick recomputes independently — successive intervals differ. + +### Initial delay + +`WithInitialDelay` delays the first tick, preventing N workers from all firing at t=0 on process start: + +```go +workers.NewWorker("poller").HandlerFunc(poll). + Every(15 * time.Second). + WithJitter(10). + WithInitialDelay(5 * time.Second) +``` + +## Middleware + +Middleware wraps each worker execution cycle with cross-cutting concerns like panic recovery, tracing, distributed locking, and timing. For periodic workers (`Every`), middleware runs on every tick, not once for the worker lifetime. + +### Types + +```go +// CycleHandler handles worker execution cycles. +// Implement as a struct for handlers that need cleanup. +type CycleHandler interface { + RunCycle(ctx context.Context, info *WorkerInfo) error + Close() error // called once when the worker stops +} + +// CycleFunc adapts a plain function into a CycleHandler. +// Close is a no-op — use this for simple, stateless handlers. +type CycleFunc func(ctx context.Context, info *WorkerInfo) error + +// Middleware intercepts each execution cycle. +// Call next to continue the chain. Matches gRPC interceptor convention. +type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) error +``` + +### Worker-level middleware + +```go +w := workers.NewWorker("solver").HandlerFunc(solve). + Every(15 * time.Second). + Interceptors( + middleware.Recover(onPanic), + middleware.Tracing(), + middleware.Duration(observeDuration), + ) +``` + +The first middleware in the list is the outermost wrapper (runs first on entry, last on exit), matching the gRPC interceptor convention. + +### Run-level middleware -Every worker's run function receives a `WorkerContext` that extends `context.Context`: +`WithInterceptors` replaces and `AddInterceptors` appends to the run-level middleware list. These are **run options** that apply to all workers in the `Run` call — distinct from the worker-level `(*Worker).Interceptors` and `(*Worker).AddInterceptors` which only affect a single worker. Run-level middleware wraps **outside** worker-level middleware, so shared concerns like tracing are always outermost: ```go -type WorkerContext interface { - context.Context - Name() string // worker name - Attempt() int // restart attempt (0 on first run) - Add(w *Worker) // add/replace child worker by name - Remove(name string) // stop child worker by name - Children() []string // names of running child workers +workers.Run(ctx, myWorkers, + workers.WithInterceptors(middleware.DefaultInterceptors()...), + workers.AddInterceptors(middleware.Duration(observe)), +) +``` + +Effective chain: `run-level middleware → worker-level middleware → handler` + +### Writing custom middleware + +Middleware is a flat function that calls `next` to continue the chain. The `*WorkerInfo` parameter gives you the worker name and attempt explicitly — no hidden context lookups: + +```go +// Uses github.com/go-coldbrew/log for structured logging +func myLogging(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error { + log.Info(ctx, "msg", "cycle start", "worker", info.GetName()) + err := next(ctx, info) + log.Info(ctx, "msg", "cycle end", "worker", info.GetName(), "error", err) + return err } + +// Attach it +w.Interceptors(myLogging) +``` + +Same shape as gRPC interceptors — familiar to the target audience: +```go +// gRPC: func(ctx, req, info, handler) (resp, error) +// Workers: func(ctx, info, next) error +``` + +## Built-in Middleware + +The `middleware` sub-package ships optional middleware. None are applied by default. + +```go +import "github.com/go-coldbrew/workers/middleware" ``` -Since `WorkerContext` embeds `context.Context`, it passes anywhere a `ctx` is expected — logging, database calls, HTTP requests, gRPC calls all work transparently. +| Middleware | Description | +|-----------|-------------| +| `Recover(onPanic)` | Catches panics, calls callback, returns error | +| `Tracing()` | Creates an OTEL span per cycle via go-coldbrew/tracing | +| `Duration(observe)` | Measures wall-clock time of each cycle | +| `DistributedLock(locker, opts...)` | Acquires a distributed lock before each cycle | +| `Timeout(d)` | Enforces a per-cycle deadline | +| `Slog()` | Structured log lines per cycle (start + end/error) via go-coldbrew/log | +| `LogContext()` | Injects worker name + attempt into log context | +| `DefaultInterceptors()` | Returns `[Recover, LogContext, Tracing, Slog]` | + +### Recover + +Catches panics in the worker cycle and converts them to errors. The panic does not propagate: + +```go +middleware.Recover(func(name string, v any) { + alerting.Send(fmt.Sprintf("worker %s panicked: %v", name, v)) +}) +``` + +### Tracing + +Creates an OTEL span named `worker::cycle` for each tick. Records errors on the span: + +```go +middleware.Tracing() +``` + +### Duration + +Measures wall-clock time of each cycle and calls a callback. This is **per-cycle** timing — distinct from the per-attempt lifetime captured by `Metrics.ObserveRunDuration` (the `worker_run_duration_seconds` Prometheus histogram). + +```go +middleware.Duration(func(name string, d time.Duration) { + metrics.RecordCycleDuration(name, d) +}) +``` + +### DistributedLock + +Acquires a distributed lock before each cycle. If the lock is held by another instance, the cycle is skipped: + +```go +middleware.DistributedLock(redisLocker, + middleware.WithKeyFunc(func(name string) string { + return "myapp:lock:" + name + }), + middleware.WithTTLFunc(func(_ string) time.Duration { + return time.Minute + }), + middleware.WithOnNotAcquired(func(ctx context.Context, name string) error { + log.Info(ctx, "msg", "lock held, skipping", "worker", name) // go-coldbrew/log + return nil + }), +) +``` + +The `Locker` interface: + +```go +type Locker interface { + Acquire(ctx context.Context, key string, ttl time.Duration) (bool, error) + Release(ctx context.Context, key string) error +} +``` + +Release uses `context.WithoutCancel` so that context cancellation does not prevent lock cleanup. + +### Timeout + +Enforces a per-cycle deadline. Distinct from `WithTimeout` (which controls graceful shutdown): + +```go +middleware.Timeout(30 * time.Second) +``` + +### Slog + +Structured log lines per cycle (start + end/error) via go-coldbrew/log. Pair with `LogContext()` to include worker name and attempt automatically: + +```go +middleware.Slog() +``` + +### LogContext + +Injects worker name and attempt into the log context so all log calls inside the worker automatically include them: + +```go +middleware.LogContext() +``` + +### DefaultInterceptors + +Convenience bundle for the standard observability stack: + +```go +// Zero-config observability — one line +workers.Run(ctx, myWorkers, + workers.WithInterceptors(middleware.DefaultInterceptors()...), +) + +// Defaults + extras +workers.Run(ctx, myWorkers, + workers.WithInterceptors(middleware.DefaultInterceptors()...), + workers.AddInterceptors(middleware.Duration(observe)), +) +``` + +## WorkerInfo + +Every handler receives a `*WorkerInfo` that carries worker metadata and child management: + +| Method | Description | +|--------|-------------| +| `GetName() string` | Worker name | +| `GetAttempt() int` | Restart attempt (0 on first run) | +| `Add(w *Worker) bool` | Add child worker — returns false if name already exists (no-op) | +| `Remove(name string)` | Stop child worker by name | +| `GetChildren() []string` | Names of running child workers | +| `GetChild(name string) (Worker, bool)` | Look up a child by name (returns a value copy) | + +Use `Worker.GetName()` and `Worker.GetHandler()` to inspect a child. + +To replace a running worker, call `Remove` then `Add`. This is not atomic — there is a brief window where the worker is not running. + +`context.Context` handles cancellation/deadlines/values. `*WorkerInfo` handles everything worker-specific. ## Helpers ### EveryInterval -Wraps a function in a ticker loop: +Use the `Every` builder method to run a handler periodically: ```go -workers.NewWorker("metrics-reporter", workers.EveryInterval( - 30*time.Second, - func(ctx workers.WorkerContext) error { - return reportMetrics(ctx) - }, -)).WithRestart(true) +workers.NewWorker("metrics-reporter").HandlerFunc(reportMetrics). + Every(30 * time.Second) ``` -Or use the builder shorthand: +For manual control, `EveryInterval` wraps a handler in a timer loop directly: ```go -workers.NewWorker("metrics-reporter", reportMetrics).Every(30 * time.Second).WithRestart(true) +workers.NewWorker("metrics-reporter").HandlerFunc(workers.EveryInterval( + 30*time.Second, reportMetrics, +)) ``` +Both are equivalent. The builder form is preferred — it also supports `WithJitter` and `WithInitialDelay`. + ### ChannelWorker Consumes items from a channel one at a time: @@ -141,8 +507,8 @@ Consumes items from a channel one at a time: ```go refreshChan := make(chan string, 100) -workers.NewWorker("refresher", workers.ChannelWorker(refreshChan, - func(ctx workers.WorkerContext, driverID string) error { +workers.NewWorker("refresher").HandlerFunc(workers.ChannelWorker(refreshChan, + func(ctx context.Context, info *workers.WorkerInfo, driverID string) error { return refreshDriverProfile(ctx, driverID) }, )) @@ -155,23 +521,23 @@ Collects items into batches, flushing when the batch reaches `maxSize` or `maxDe ```go eventChan := make(chan Event, 1000) -workers.NewWorker("event-batcher", workers.BatchChannelWorker(eventChan, +workers.NewWorker("event-batcher").HandlerFunc(workers.BatchChannelWorker(eventChan, 100, // max batch size 500*time.Millisecond, // max delay - func(ctx workers.WorkerContext, batch []Event) error { + func(ctx context.Context, info *workers.WorkerInfo, batch []Event) error { return store.BulkInsert(ctx, batch) }, -)).WithRestart(true) +)) ``` -Partial batches are flushed on context cancellation (graceful shutdown). +Partial batches are flushed on context cancellation (graceful shutdown). Both `ChannelWorker` and `BatchChannelWorker` return `ErrDoNotRestart` when the channel is closed, preventing restart loops on exhausted channels. ## Dynamic Workers -Workers can dynamically spawn and remove child workers using `WorkerContext.Add`, `Remove`, and `Children`. This is the pattern for config-driven worker pools (like database-driven solver workers): +Workers can dynamically spawn and remove child workers using `WorkerInfo.Add`, `Remove`, and `GetChildren`. This is the pattern for config-driven worker pools (like database-driven solver workers): ```go -workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error { +workers.NewWorker("pool-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { @@ -181,33 +547,47 @@ workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error { case <-ticker.C: desired := loadConfigsFromDB(ctx) + running := map[string]bool{} + for _, name := range info.GetChildren() { + running[name] = true + } + // Remove workers no longer in config - for _, name := range ctx.Children() { + for name := range running { if _, ok := desired[name]; !ok { - ctx.Remove(name) + info.Remove(name) } } - // Add/replace desired workers + // Add only workers that aren't already running for name, cfg := range desired { - ctx.Add(workers.NewWorker(name, makeSolver(cfg)).WithRestart(true)) + if !running[name] { + info.Add(workers.NewWorker(name).HandlerFunc(makeSolver(cfg))) + } } } } -}).WithRestart(true) +}) +``` + +**Add is a no-op if the name exists** — it returns `false` without restarting the running worker. To replace a worker (e.g., on config change), call `Remove` then `Add`: + +```go +info.Remove("solver") +info.Add(workers.NewWorker("solver").HandlerFunc(makeSolver(newCfg))) ``` -**Replace semantics:** calling `Add` with a name that already exists stops the old worker and starts the new one. This handles config updates naturally. +Note: `Remove` + `Add` is not atomic — there is a brief window where the worker is not running. ### Example: Fixed children on startup A worker that spawns N consumer goroutines when it starts: ```go -workers.NewWorker("consumer-pool", func(ctx workers.WorkerContext) error { - for i := 0; i < 5; i++ { +workers.NewWorker("consumer-pool").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { + for i := range 5 { name := fmt.Sprintf("consumer-%d", i) - ctx.Add(workers.NewWorker(name, workers.ChannelWorker(eventChan, processEvent))) + info.Add(workers.NewWorker(name).HandlerFunc(workers.ChannelWorker(eventChan, processEvent))) } <-ctx.Done() return ctx.Err() // all 5 consumers stop with parent @@ -219,7 +599,7 @@ workers.NewWorker("consumer-pool", func(ctx workers.WorkerContext) error { Spawn a dedicated worker when a new tenant appears, remove it when the tenant is deactivated: ```go -workers.NewWorker("tenant-manager", func(ctx workers.WorkerContext) error { +workers.NewWorker("tenant-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { for { select { case <-ctx.Done(): @@ -227,14 +607,14 @@ workers.NewWorker("tenant-manager", func(ctx workers.WorkerContext) error { case event := <-tenantEvents: switch event.Type { case "activated": - ctx.Add(workers.NewWorker("tenant:"+event.ID, - makeTenantWorker(event.ID)).WithRestart(true)) + info.Add(workers.NewWorker("tenant:"+event.ID). + HandlerFunc(makeTenantWorker(event.ID))) case "deactivated": - ctx.Remove("tenant:" + event.ID) + info.Remove("tenant:" + event.ID) } } } -}).WithRestart(true) +}) ``` ### Example: Nested hierarchy @@ -242,18 +622,18 @@ workers.NewWorker("tenant-manager", func(ctx workers.WorkerContext) error { Children can spawn their own children — the supervisor tree goes as deep as needed: ```go -workers.NewWorker("region-manager", func(ctx workers.WorkerContext) error { +workers.NewWorker("region-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { for _, region := range []string{"us-east", "eu-west"} { - region := region - ctx.Add(workers.NewWorker("region:"+region, func(ctx workers.WorkerContext) error { - // Each region spawns its own zone workers - zones := fetchZones(ctx, region) - for _, zone := range zones { - ctx.Add(workers.NewWorker("zone:"+zone, makeZoneWorker(zone))) - } - <-ctx.Done() - return ctx.Err() - })) + info.Add(workers.NewWorker("region:"+region).HandlerFunc( + func(ctx context.Context, info *workers.WorkerInfo) error { + zones := fetchZones(ctx, region) + for _, zone := range zones { + info.Add(workers.NewWorker("zone:"+zone).HandlerFunc(makeZoneWorker(zone))) + } + <-ctx.Done() + return ctx.Err() + }, + )) } <-ctx.Done() return ctx.Err() @@ -280,19 +660,33 @@ err := workers.Run(ctx, []*workers.Worker{w1, w2, w3}) workers.RunWorker(ctx, w) ``` -`RunWorker` is a convenience for `workers.Run(ctx, []*workers.Worker{w})`. Useful for dynamic managers spawning children in goroutines. +`RunWorker` is a convenience for `workers.Run(ctx, []*workers.Worker{w})`. Unlike `Run`, it discards the error. Use `Run` if you need error handling. + +## Graceful Shutdown + +When the context passed to `Run` is cancelled: + +1. All worker contexts are cancelled — handlers should return `ctx.Err()` +2. `BatchChannelWorker` flushes any partial batch before returning +3. `handler.Close()` is called exactly once (for `CycleHandler` implementations) +4. Children stop when their parent stops (scoped lifecycle) +5. `Run` returns nil + +`WithTimeout(d)` controls how long suture waits for a worker to return after context cancellation. If a worker ignores cancellation and doesn't return within the timeout, suture logs a stop-timeout event and abandons the goroutine. ## Logging -Worker lifecycle events (panics, restarts, backoff, timeouts) are logged via [go-coldbrew/log][Log]: +Supervisor-level lifecycle events (panics, restarts, backoff, timeouts) are logged via stdlib `log/slog`. If your application configures `slog.SetDefault`, these events flow through your handler: ```json -{"level":"error","msg":"worker panicked","worker":"my-worker","event":"..."} -{"level":"warning","msg":"worker terminated","worker":"my-worker","event":"..."} -{"level":"warning","msg":"worker backoff","event":"..."} -{"level":"info","msg":"worker resumed","event":"..."} +{"level":"ERROR","msg":"worker panicked","worker":"my-worker","event":"..."} +{"level":"WARN","msg":"worker terminated","worker":"my-worker","event":"..."} +{"level":"WARN","msg":"worker backoff","event":"..."} +{"level":"INFO","msg":"worker resumed","event":"..."} ``` +Per-cycle logging is available via the `middleware.Slog()` and `middleware.LogContext()` interceptors, which use `go-coldbrew/log` (a wrapper around `slog`). Since `go-coldbrew/log` calls `slog` under the hood, `slog.SetDefault` affects both layers. + ## Metrics Workers support pluggable metrics via the `Metrics` interface. Pass metrics at the root level — all workers and their children inherit them automatically. @@ -301,7 +695,7 @@ Workers support pluggable metrics via the `Metrics` interface. Pass metrics at t ```go if err := workers.Run(ctx, myWorkers, workers.WithMetrics(workers.NewPrometheusMetrics("myapp"))); err != nil { - log.Fatal(err) + slog.Error("workers failed", "error", err) } ``` @@ -314,7 +708,7 @@ This registers the following metrics (auto-registered via `promauto`): | `myapp_worker_panicked_total{worker}` | Counter | Total worker panics | | `myapp_worker_failed_total{worker}` | Counter | Total worker failures | | `myapp_worker_restarted_total{worker}` | Counter | Total worker restarts | -| `myapp_worker_run_duration_seconds{worker}` | Histogram | Duration of worker run cycles | +| `myapp_worker_run_duration_seconds{worker}` | Histogram | Worker attempt lifetime (start to stop/failure) | | `myapp_worker_active_count` | Gauge | Currently active workers | `NewPrometheusMetrics` is safe to call multiple times with the same namespace — it returns the cached instance. @@ -349,18 +743,68 @@ func (m *myDatadogMetrics) WorkerFailed(name string, err error) { ### Per-worker override -Children inherit metrics from the root by default. Override for specific workers via the builder. Use `WorkerContext.Add` inside a manager worker: +Children inherit metrics from the root by default. Override for specific workers via the builder: ```go -workers.NewWorker("manager", func(ctx workers.WorkerContext) error { +workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error { // This child uses custom metrics instead of the inherited root metrics. - ctx.Add(workers.NewWorker("special", fn).WithMetrics(customMetrics)) + info.Add(workers.NewWorker("special").HandlerFunc(processSpecial).WithMetrics(datadogMetrics)) <-ctx.Done() return ctx.Err() }) ``` -## ColdBrew Integration (Phase 2) +## Testing + +### Testing middleware + +Use `NewWorkerInfo` to create a `*WorkerInfo` for unit-testing middleware without running the full supervisor: + +```go +info := workers.NewWorkerInfo("test-worker", 0) +err := myMiddleware(ctx, info, func(ctx context.Context, info *workers.WorkerInfo) error { + // assert middleware behavior + return nil +}) +``` + +### Testing with dynamic children + +Use `WithTestChildren` to create a `WorkerInfo` that supports `Add`/`Remove`/`GetChildren`: + +```go +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +info := workers.NewWorkerInfo("manager", 0, workers.WithTestChildren(ctx)) +info.Add(workers.NewWorker("child").HandlerFunc(childFn)) +assert.Equal(t, []string{"child"}, info.GetChildren()) +``` + +### Integration testing + +Use `RunWorker` with a short-lived context: + +```go +ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) +defer cancel() +workers.RunWorker(ctx, myWorker) +// assert side effects +``` + +## Best Practices + +- **Handler contract:** Long-running workers should block on `<-ctx.Done()`. Periodic workers should return quickly from each tick. +- **`WithRestart(false)` vs `ErrDoNotRestart`:** Use `WithRestart(false)` when a worker is unconditionally one-shot (known at build time). Use `ErrDoNotRestart` when the decision is made at runtime (e.g., channel closed, work exhausted). +- **Naming:** Use descriptive names. For hierarchical workers, use colons: `"region:us-east"`, `"tenant:abc123"`. +- **Middleware ordering:** The first middleware in the list is the outermost. Put `Recover` first (so it catches panics from all inner middleware), `Tracing` next, then domain-specific middleware. +- **Metrics inheritance:** Set metrics once at the `Run` level. Override per-worker only when you need separate dashboards. +- **Distributed locking:** Use `DistributedLock` for periodic workers that should run on only one pod. The lock is acquired per cycle, not per worker lifetime. + +## ColdBrew Integration +{: .label .label-yellow } +Planned — not yet available +{: .d-inline-block } The workers package is standalone — any Go service can use it. ColdBrew integration via `CBServiceV2` is planned for a future core release, where workers will be started/stopped as part of the ColdBrew service lifecycle. diff --git a/tests/content.spec.ts b/tests/content.spec.ts index 6ab0398..79cc5a5 100644 --- a/tests/content.spec.ts +++ b/tests/content.spec.ts @@ -27,6 +27,20 @@ test.describe("Code Blocks", () => { await expect(mainContent).toContainText("JWT"); await expect(mainContent).toContainText("API key"); }); + + test("workers howto renders middleware and jitter code blocks", async ({ + page, + }) => { + await page.goto("/howto/workers/"); + const codeBlocks = page.locator("pre code"); + expect(await codeBlocks.count()).toBeGreaterThanOrEqual(10); + const mainContent = page.locator("main, .main-content").first(); + await expect(mainContent).toContainText("WithJitter"); + await expect(mainContent).toContainText("Middleware"); + await expect(mainContent).toContainText("CycleHandler"); + await expect(mainContent).toContainText("CycleFunc"); + await expect(mainContent).toContainText("DistributedLock"); + }); }); test.describe("Tables", () => {