Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 82 additions & 4 deletions howto/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ The handler receives a `context.Context` for cancellation and a `*WorkerInfo` fo
| Return value | Long-running worker (no `Every`) | Periodic worker (with `Every`) |
|---|---|---|
| `return nil` | Worker stops permanently | Cycle succeeded — next tick fires |
| `return workers.ErrSkipTick` | Treated like `return error` (not meaningful) | Tick skipped — next tick fires normally |
| `return error` | Restarts with backoff (if restart enabled) | Restarts with backoff (if restart enabled) |
| `return ctx.Err()` | Clean shutdown | Clean shutdown |
| `return workers.ErrDoNotRestart` | Permanent stop | Permanent stop |
Expand All @@ -177,6 +178,26 @@ The handler receives a `context.Context` for cancellation and a `*WorkerInfo` fo

**Periodic workers** run the handler once per tick. Return nil for success (next tick fires normally). Return an error to trigger restart. The `Every` wrapper manages the tick loop — your handler just processes one cycle.

### ErrSkipTick

Return `workers.ErrSkipTick` from a periodic handler when a tick fails transiently (DB timeout, network blip) and you want to skip it without triggering a full restart. The timer continues and the next tick fires normally:

```go
func pollDatabase(ctx context.Context, info *workers.WorkerInfo) error {
rows, err := db.QueryContext(ctx, "SELECT ...")
if err != nil {
Comment thread
ankurs marked this conversation as resolved.
if ctx.Err() != nil {
return ctx.Err() // context cancelled — clean shutdown
}
return workers.ErrSkipTick // transient failure, try again next interval
}
Comment thread
ankurs marked this conversation as resolved.
Comment thread
ankurs marked this conversation as resolved.
defer rows.Close()
return processRows(rows)
}
```

Without `ErrSkipTick`, you'd have to swallow errors by returning nil and track them internally. `ErrSkipTick` gives the framework visibility into skipped ticks while keeping the timer going.

### ErrDoNotRestart

Return `workers.ErrDoNotRestart` from a handler to signal permanent completion — the supervisor will not restart the worker even though restart is enabled by default. `ChannelWorker` and `BatchChannelWorker` return this automatically when their channel is closed.
Expand All @@ -197,7 +218,7 @@ func processQueue(ctx context.Context, info *workers.WorkerInfo) error {
|--------|-------------|---------|
| `HandlerFunc(fn)` | Set handler from a plain function | — |
| `Handler(h)` | Set handler from a `CycleHandler` struct | — |
| `WithRestart(false)` | Disable restart (one-shot worker) | `true` (restart with backoff) |
| `WithRestart(false)` | Disable restart (one-shot worker). Periodic workers should generally keep the default; use `ErrSkipTick`/`ErrDoNotRestart` instead. | `true` (restart with backoff) |
| `Every(duration)` | Run periodically on a fixed interval | — |
| `WithJitter(percent)` | Randomize tick interval by ±percent (requires `Every`) | inherit run-level |
| `WithInitialDelay(d)` | Delay first tick (requires `Every`) | — |
Expand Down Expand Up @@ -373,7 +394,9 @@ middleware.Recover(func(name string, v any) {

### Tracing

Creates an OTEL span named `worker:<name>:cycle` for each tick. Records errors on the span:
Creates an OTEL span named `worker:<name>:cycle` for each tick. Records errors on the span. Worker spans are always sampled regardless of the global sampler — this prevents silent span drops when using `ParentBased(TraceIDRatioBased(...))`, where worker root spans (which have no incoming parent) would otherwise be probabilistically dropped.

The OTEL trace ID is automatically injected into the log context as `trace` for correlation with your tracing backend.

```go
middleware.Tracing()
Expand Down Expand Up @@ -408,6 +431,18 @@ middleware.DistributedLock(redisLocker,
)
```

For the common case of logging and skipping, use `WithSkipOnNotAcquired`:

```go
middleware.DistributedLock(redisLocker,
middleware.WithSkipOnNotAcquired(func(ctx context.Context, name string) {
log.Info(ctx, "msg", "lock held, skipping", "worker", name)
}),
)
```

**Caution:** If the `WithOnNotAcquired` callback returns a non-nil error, the framework treats it as a cycle failure — for periodic workers, this triggers restart with backoff. Use `WithSkipOnNotAcquired` or return nil from the callback if you want to skip without restart.

The `Locker` interface:

```go
Expand All @@ -417,6 +452,8 @@ type Locker interface {
}
```

If your lock implementation already has these two methods with matching signatures, it satisfies `Locker` directly — no adapter needed.

Release uses `context.WithoutCancel` so that context cancellation does not prevent lock cleanup.

### Timeout
Expand Down Expand Up @@ -468,12 +505,14 @@ Every handler receives a `*WorkerInfo` that carries worker metadata and child ma
|--------|-------------|
| `GetName() string` | Worker name |
| `GetAttempt() int` | Restart attempt (0 on first run) |
| `GetHandler() CycleHandler` | The worker's handler — use type assertion for handler-specific state |
| `Add(w *Worker) bool` | Add child worker — returns false if name already exists (no-op) |
| `Remove(name string)` | Stop child worker by name |
| `GetChildren() []string` | Names of running child workers |
| `GetChildren() []string` | Names of running child workers (stopped children auto-pruned) |
| `GetChild(name string) (Worker, bool)` | Look up a child by name (returns a value copy) |
| `GetChildCount() int` | Number of running children (cheaper than `len(GetChildren())`) |

Use `Worker.GetName()` and `Worker.GetHandler()` to inspect a child.
Use `Worker.GetName()`, `Worker.GetHandler()`, `Worker.GetInterval()`, and `Worker.GetRestartOnFail()` to inspect a child.

To replace a running worker, call `Remove` then `Add`. This is not atomic — there is a brief window where the worker is not running.

Expand Down Expand Up @@ -579,6 +618,45 @@ info.Add(workers.NewWorker("solver").HandlerFunc(makeSolver(newCfg)))

Note: `Remove` + `Add` is not atomic — there is a brief window where the worker is not running.

**Automatic cleanup:** When a child permanently stops (see the [return value table](#handler-return-values) for what triggers permanent stop), it is automatically excluded from `GetChildren` and `GetChild`. The underlying [suture] supervisor is the source of truth — no manual cleanup needed. Note that there may be a brief delay between the child stopping and the change being visible, as stop events are processed asynchronously.

### Example: Config change detection via handler

Instead of maintaining a parallel map to track per-worker state (e.g., config versions), store metadata on your `CycleHandler` implementation and inspect it via `GetChild().GetHandler()` type assertion:

```go
type solverHandler struct {
version int64
cfg SolverConfig
}

func (h *solverHandler) RunCycle(ctx context.Context, info *workers.WorkerInfo) error {
return solve(ctx, h.cfg)
}

func (h *solverHandler) Close() error { return nil }
```

In the reconciler, detect config changes without a parallel tracking map:

```go
for key, desired := range desiredConfigs {
child, exists := info.GetChild(key)
if exists {
if h, ok := child.GetHandler().(*solverHandler); ok && h.version == desired.version {
continue // config unchanged, skip
}
info.Remove(key) // config changed, replace
}
info.Add(workers.NewWorker(key).Handler(&solverHandler{
version: desired.version,
cfg: desired.cfg,
}))
}
```

`GetChild()` returns a copy of the `Worker` struct, but the handler is stored as a `CycleHandler` interface — use type assertion to access handler-specific fields for change detection or metadata inspection.

### Example: Fixed children on startup

A worker that spawns N consumer goroutines when it starts:
Expand Down
Loading