Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
20fbfc1
feat: add jitter support and composable middleware chain
ankurs Apr 23, 2026
bcbeb7b
fix: use govulncheck -scan=module to skip tool dependencies
ankurs Apr 23, 2026
4f9f6b6
fix: address PR review comments
ankurs Apr 23, 2026
b924a4e
feat: add Child() lookup and Worker getters (GetName, GetHandler)
ankurs Apr 23, 2026
6bf8371
fix: Child() returns a shallow copy to prevent mutation of live workers
ankurs Apr 23, 2026
6e1148c
fix: Child() returns (Worker, bool) by value instead of *Worker
ankurs Apr 23, 2026
d65ade2
fix: address second-round PR review comments
ankurs Apr 23, 2026
e8d2919
fix: use govulncheck symbol-level scan to avoid false positives on un…
ankurs Apr 23, 2026
a322e8b
fix: bump go directive to 1.26.2 to resolve stdlib vulns
ankurs Apr 23, 2026
db65b21
fix: log handler.Close errors in closerService, fix jitter doc comment
ankurs Apr 23, 2026
e1ca449
fix: restore ObserveRunDuration in Serve() for attempt lifetime metrics
ankurs Apr 23, 2026
8ea3bf3
docs: clarify attempt lifetime vs per-cycle duration metrics
ankurs Apr 23, 2026
313fd31
feat: make restart the default, add ErrDoNotRestart sentinel
ankurs Apr 23, 2026
a3768c0
fix: unwrap ErrDoNotRestart before returning to suture so wrapped err…
ankurs Apr 23, 2026
7424e3a
refactor: simplify round 2
ankurs Apr 23, 2026
098f59a
refactor: consistent getter naming — all getters use Get prefix
ankurs Apr 23, 2026
30bfaa4
fix: address remaining PR review comments
ankurs Apr 23, 2026
fcb0647
fix: propagate flush errors on ctx cancel, fix restart suppression logic
ankurs Apr 23, 2026
025307e
fix: propagate flush error on channel close in BatchChannelWorker
ankurs Apr 23, 2026
c8757f0
fix: prevent Close/RunCycle race with WaitGroup coordination
ankurs Apr 23, 2026
4a28abf
fix: ensure handler.Close() fires when suture exhausts failure threshold
ankurs Apr 24, 2026
6ce494c
fix: update stale RestartOnFail and Children references in source doc…
ankurs Apr 24, 2026
b833785
feat: add WithTestChildren for testable WorkerInfo + improve godoc
ankurs Apr 24, 2026
9517c94
fix: scope onPermanentStop to worker's own service, not child termina…
ankurs Apr 24, 2026
5054529
fix: replace event hook close with closingSupervisor wrapper
ankurs Apr 24, 2026
ceb0599
test: comprehensive test coverage (82% → 92%)
ankurs Apr 24, 2026
9f86a9a
feat: Add is now a no-op if name exists, returns bool
ankurs Apr 24, 2026
fde2813
feat: WithBackoffJitter accepts func(time.Duration) time.Duration
ankurs Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Package Overview

`go-coldbrew/workers` is a worker lifecycle library built on [thejerf/suture](https://github.com/thejerf/suture). It manages background goroutines with panic recovery, configurable restart, tracing, and structured shutdown.
`go-coldbrew/workers` is a worker lifecycle library built on [thejerf/suture](https://github.com/thejerf/suture). It manages background goroutines with panic recovery, configurable restart, composable middleware, jitter, and structured shutdown.

## Build & Test

Expand All @@ -21,32 +21,47 @@ Every worker runs inside its own suture supervisor subtree:
```
Root Supervisor (created by Run)
├── Worker-A supervisor
│ ├── Worker-A run func
│ ├── Child-A1 supervisor (added via ctx.Add)
│ │ └── Child-A1 run func
│ ├── Worker-A service (middleware chain → handler)
│ ├── Child-A1 supervisor (added via info.Add)
│ │ └── Child-A1 service
│ └── Child-A2 supervisor
│ └── Child-A2 run func
│ └── Child-A2 service
└── Worker-B supervisor
└── Worker-B run func
└── Worker-B service
```

Key properties:
- **Scoped lifecycle**: when a parent stops, all children stop
- **Independent restart**: each worker restarts independently via suture
- **Panic recovery**: suture catches panics and converts to errors
- **Backoff**: configurable exponential backoff with jitter on restart
- **Tracing**: each worker execution gets an OTEL span via `go-coldbrew/tracing`
- **Middleware chain**: run-level → worker-level → handler (gRPC interceptor convention)

## Key Types

- `Worker` — struct with builder pattern (`NewWorker().WithRestart().Every()`)
- `WorkerContext` — extends `context.Context` with `Name()`, `Attempt()`, `Add()`, `Remove()`, `Children()`
- `Run(ctx, []*Worker) error` — starts all workers, blocks until ctx cancelled
- `RunWorker(ctx, *Worker)` — runs a single worker
- `WorkerInfo` — struct with `GetName()`, `GetAttempt()` getters and child management (`Add`, `Remove`, `GetChildren`)
- `CycleHandler` — interface: `RunCycle(ctx, *WorkerInfo) error` + `Close() error`
- `CycleFunc` — function adapter for `CycleHandler` (Close is no-op)
- `Middleware` — `func(ctx, *WorkerInfo, next CycleFunc) error` (interceptor pattern)
- `Worker` — builder pattern: `NewWorker(name).HandlerFunc(fn).Every(d).WithJitter(10).Interceptors(mw...)`
Comment thread
coderabbitai[bot] marked this conversation as resolved.
- `Run(ctx, []*Worker, ...RunOption) error` — starts all workers, blocks until ctx cancelled
- `RunWorker(ctx, *Worker, ...RunOption)` — runs a single worker

## Middleware Sub-Package

`workers/middleware` provides optional built-in interceptors:
- `Recover(onPanic)` — catch panics per-cycle
- `Tracing()` — OTEL span per cycle
- `Duration(observe)` — wall-clock timing
- `Timeout(d)` — per-cycle deadline
- `Slog()` — structured log per cycle
- `LogContext()` — inject worker name/attempt into log context
- `DistributedLock(locker, opts...)` — distributed lock before each cycle
- `DefaultInterceptors()` — [Recover, LogContext, Tracing, Slog]

## Helpers

- `EveryInterval(d, fn)` — periodic ticker loop
- `EveryInterval(d, fn)` — periodic timer loop
- `ChannelWorker[T](ch, fn)` — consume from channel
- `BatchChannelWorker[T](ch, maxSize, maxDelay, fn)` — batch with size/timer flush

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ build:
go build ./...

test:
go test -race ./...
go test -race -cover ./...

doc:
go tool gomarkdoc --output '{{.Dir}}/README.md' ./...
Expand Down
Loading
Loading