Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions howto/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ The handler receives a `context.Context` for cancellation and a `*WorkerInfo` fo
| Return value | Long-running worker (no `Every`) | Periodic worker (with `Every`) |
|---|---|---|
| `return nil` | Worker stops permanently | Cycle succeeded — next tick fires |
| `return workers.ErrSkipTick` | No effect (not meaningful) | Tick skipped — next tick fires normally |

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — changed to "Treated like return error (not meaningful)" to clarify it's not a no-op for long-running workers.

| `return error` | Restarts with backoff (if restart enabled) | Restarts with backoff (if restart enabled) |
| `return ctx.Err()` | Clean shutdown | Clean shutdown |
| `return workers.ErrDoNotRestart` | Permanent stop | Permanent stop |
Expand All @@ -177,6 +178,22 @@ The handler receives a `context.Context` for cancellation and a `*WorkerInfo` fo

**Periodic workers** run the handler once per tick. Return nil for success (next tick fires normally). Return an error to trigger restart. The `Every` wrapper manages the tick loop — your handler just processes one cycle.

### ErrSkipTick

Return `workers.ErrSkipTick` from a periodic handler when a tick fails transiently (DB timeout, network blip) and you want to skip it without triggering a full restart. The timer continues and the next tick fires normally:

```go
func pollDatabase(ctx context.Context, info *workers.WorkerInfo) error {
rows, err := db.QueryContext(ctx, "SELECT ...")
if err != nil {
Comment thread
ankurs marked this conversation as resolved.
return workers.ErrSkipTick // skip this tick, try again next interval
}
Comment thread
ankurs marked this conversation as resolved.
Comment thread
ankurs marked this conversation as resolved.
return processRows(rows)
}
```

Without `ErrSkipTick`, you'd have to swallow errors by returning nil and track them internally. `ErrSkipTick` gives the framework visibility into skipped ticks while keeping the timer going.

### ErrDoNotRestart

Return `workers.ErrDoNotRestart` from a handler to signal permanent completion — the supervisor will not restart the worker even though restart is enabled by default. `ChannelWorker` and `BatchChannelWorker` return this automatically when their channel is closed.
Expand Down Expand Up @@ -408,6 +425,18 @@ middleware.DistributedLock(redisLocker,
)
```

For the common case of logging and skipping, use `WithSkipOnNotAcquired`:

```go
middleware.DistributedLock(redisLocker,
middleware.WithSkipOnNotAcquired(func(ctx context.Context, name string) {
log.Info(ctx, "msg", "lock held, skipping", "worker", name)
}),
)
```

**Caution:** `WithOnNotAcquired` returns an error. A non-nil return triggers restart for periodic workers. Use `WithSkipOnNotAcquired` or return nil if you want to skip without restart.
Comment thread
ankurs marked this conversation as resolved.
Outdated
Comment thread
ankurs marked this conversation as resolved.
Outdated

The `Locker` interface:

```go
Expand All @@ -417,6 +446,8 @@ type Locker interface {
}
```

If your lock implementation already has these two methods with matching signatures, it satisfies `Locker` directly — no adapter needed.

Release uses `context.WithoutCancel` so that context cancellation does not prevent lock cleanup.

### Timeout
Expand Down Expand Up @@ -468,12 +499,14 @@ Every handler receives a `*WorkerInfo` that carries worker metadata and child ma
|--------|-------------|
| `GetName() string` | Worker name |
| `GetAttempt() int` | Restart attempt (0 on first run) |
| `GetHandler() CycleHandler` | The worker's handler — use type assertion for handler-specific state |
| `Add(w *Worker) bool` | Add child worker — returns false if name already exists (no-op) |
| `Remove(name string)` | Stop child worker by name |
| `GetChildren() []string` | Names of running child workers |
| `GetChildren() []string` | Names of running child workers (stopped children auto-pruned) |
| `GetChild(name string) (Worker, bool)` | Look up a child by name (returns a value copy) |
Comment thread
ankurs marked this conversation as resolved.
| `GetChildCount() int` | Number of running children (cheaper than `len(GetChildren())`) |

Use `Worker.GetName()` and `Worker.GetHandler()` to inspect a child.
Use `Worker.GetName()`, `Worker.GetHandler()`, `Worker.GetInterval()`, and `Worker.GetRestartOnFail()` to inspect a child.

To replace a running worker, call `Remove` then `Add`. This is not atomic — there is a brief window where the worker is not running.

Expand Down Expand Up @@ -579,6 +612,45 @@ info.Add(workers.NewWorker("solver").HandlerFunc(makeSolver(newCfg)))

Note: `Remove` + `Add` is not atomic — there is a brief window where the worker is not running.

**Automatic cleanup:** When a child permanently stops (returns nil with `WithRestart(false)`, returns `ErrDoNotRestart`, or exhausts restart attempts), it is automatically pruned from the children map on the next call to `GetChildren`, `GetChild`, or `GetChildCount`. No manual cleanup needed.
Comment thread
ankurs marked this conversation as resolved.
Outdated

### Example: Config change detection via handler

Instead of maintaining a parallel map to track per-worker state (e.g., config versions), store metadata on your `CycleHandler` implementation and inspect it via `GetChild().GetHandler()` type assertion:

```go
type solverHandler struct {
version int64
cfg SolverConfig
}

func (h *solverHandler) RunCycle(ctx context.Context, info *workers.WorkerInfo) error {
return solve(ctx, h.cfg)
}

func (h *solverHandler) Close() error { return nil }
```

In the reconciler, detect config changes without a parallel tracking map:

```go
for key, desired := range desiredConfigs {
child, exists := info.GetChild(key)
if exists {
if h, ok := child.GetHandler().(*solverHandler); ok && h.version == desired.version {
continue // config unchanged, skip
}
info.Remove(key) // config changed, replace
}
info.Add(workers.NewWorker(key).Handler(&solverHandler{
version: desired.version,
cfg: desired.cfg,
}))
}
```

The handler is an interface value — `GetChild()` returns a copy of the `Worker` struct, but the handler reference is shared. Type assertion gives read access to the original handler's fields.
Comment thread
ankurs marked this conversation as resolved.
Outdated

### Example: Fixed children on startup

A worker that spawns N consumer goroutines when it starts:
Expand Down
Loading