Skip to content
Merged
296 changes: 251 additions & 45 deletions README.md

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ import (
"github.com/go-coldbrew/workers"
)

// solverHandler is used in Example_reconcilerWithChangeDetection to
// demonstrate the handler-as-metadata pattern.
type solverHandler struct {
version int
}

func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error {
<-ctx.Done()
return ctx.Err()
}

func (h *solverHandler) Close() error { return nil }

// A simple worker that runs until cancelled.
func ExampleNewWorker() {
w := workers.NewWorker("greeter").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
Expand Down Expand Up @@ -338,6 +351,73 @@ func Example_dynamicWorkerPool() {
// pool shut down
}

// Demonstrates config-driven reconciliation with change detection using
// the handler-as-metadata pattern. The handler struct carries a config
// version that the reconciler inspects via GetChild().GetHandler() type
// assertion, eliminating the need for a parallel tracking map.
func Example_reconcilerWithChangeDetection() {
type solverConfig struct {
version int
}

// Simulate config that changes over 3 ticks.
configs := []map[string]solverConfig{
{"a": {version: 1}},
{"a": {version: 1}, "b": {version: 1}},
{"a": {version: 2}, "b": {version: 1}}, // a gets new version
}

tick := 0
manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if tick >= len(configs) {
continue
}
desired := configs[tick]
tick++

// Remove workers no longer desired.
for _, name := range info.GetChildren() {
if _, ok := desired[name]; !ok {
info.Remove(name)
}
}

// Add new or replace changed workers.
for key, cfg := range desired {
child, exists := info.GetChild(key)
if exists {
// Check if config changed via handler type assertion.
if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version {
continue // unchanged, skip
}
info.Remove(key) // config changed, replace
time.Sleep(10 * time.Millisecond) // let old worker stop
}
info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
time.Sleep(10 * time.Millisecond)
fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren()))
}
}
})

ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()

workers.Run(ctx, []*workers.Worker{manager})
// Output:
// tick 1: children=[a] count=1
// tick 2: children=[a b] count=2
// tick 3: children=[a b] count=2
}

// Per-worker middleware using the interceptor pattern.
func ExampleWorker_Interceptors() {
loggingMW := func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
Expand Down
5 changes: 4 additions & 1 deletion helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workers

import (
"context"
"errors"
"math/rand/v2"
"time"
)
Expand Down Expand Up @@ -51,7 +52,9 @@ func everyIntervalWithJitter(base time.Duration, jitterPercent int, initialDelay
return ctx.Err()
case <-timer.C:
if err := fn(ctx, info); err != nil {
return err
if !errors.Is(err, ErrSkipTick) {
return err
}
}
timer.Reset(computeInterval())
}
Expand Down
39 changes: 39 additions & 0 deletions helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workers
import (
"context"
"errors"
"fmt"
"math"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -136,6 +137,44 @@ func TestEveryInterval_Jitter_VariableIntervals(t *testing.T) {
assert.Greater(t, stddev, 0.0, "intervals should vary with jitter enabled")
}

func TestEveryIntervalWithJitter_ErrSkipTick(t *testing.T) {
var count atomic.Int32
fn := everyIntervalWithJitter(10*time.Millisecond, 0, 0, func(_ context.Context, _ *WorkerInfo) error {
n := count.Add(1)
if n == 1 {
return ErrSkipTick // skip first tick
}
return nil
})

ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond)
defer cancel()

info := &WorkerInfo{name: "skiptick", attempt: 0}
err := fn(ctx, info)
assert.ErrorIs(t, err, context.DeadlineExceeded, "should not exit from ErrSkipTick")
assert.GreaterOrEqual(t, int(count.Load()), 2, "should continue ticking after ErrSkipTick")
}

func TestEveryIntervalWithJitter_ErrSkipTick_Wrapped(t *testing.T) {
var count atomic.Int32
fn := everyIntervalWithJitter(10*time.Millisecond, 0, 0, func(_ context.Context, _ *WorkerInfo) error {
n := count.Add(1)
if n == 1 {
return fmt.Errorf("db timeout: %w", ErrSkipTick)
}
return nil
})

ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond)
defer cancel()

info := &WorkerInfo{name: "skiptick-wrapped", attempt: 0}
err := fn(ctx, info)
assert.ErrorIs(t, err, context.DeadlineExceeded, "wrapped ErrSkipTick should also be caught")
assert.GreaterOrEqual(t, int(count.Load()), 2)
}

func TestChannelWorker(t *testing.T) {
ch := make(chan string, 3)
ch <- "a"
Expand Down
32 changes: 24 additions & 8 deletions middleware/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Package middleware provides optional interceptors for [go\\\-coldbrew/workers](<
- [type LockOption](<#LockOption>)
- [func WithKeyFunc\(fn func\(name string\) string\) LockOption](<#WithKeyFunc>)
- [func WithOnNotAcquired\(fn func\(ctx context.Context, name string\) error\) LockOption](<#WithOnNotAcquired>)
- [func WithSkipOnNotAcquired\(logFn func\(ctx context.Context, name string\)\) LockOption](<#WithSkipOnNotAcquired>)
- [func WithTTLFunc\(fn func\(name string\) time.Duration\) LockOption](<#WithTTLFunc>)
- [type Locker](<#Locker>)

Expand All @@ -60,7 +61,7 @@ workers.Run(ctx, myWorkers,
```

<a name="DistributedLock"></a>
## func [DistributedLock](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L52>)
## func [DistributedLock](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L73>)

```go
func DistributedLock(locker Locker, opts ...LockOption) workers.Middleware
Expand Down Expand Up @@ -114,16 +115,20 @@ func Timeout(d time.Duration) workers.Middleware
Timeout enforces a per\-cycle deadline. Distinct from \[workers.Worker.WithTimeout\] which controls graceful shutdown.

<a name="Tracing"></a>
## func [Tracing](<https://github.com/go-coldbrew/workers/blob/main/middleware/tracing.go#L12>)
## func [Tracing](<https://github.com/go-coldbrew/workers/blob/main/middleware/tracing.go#L24>)

```go
func Tracing() workers.Middleware
```

Tracing creates an OTEL span per cycle via go\-coldbrew/tracing. The span is named "worker:\<name\>:cycle" and records errors.

Worker spans are always sampled regardless of the global TracerProvider's sampler. This prevents silent span drops when using ParentBased\(TraceIDRatioBased\(ratio\)\), where worker root spans \(which have no parent\) would otherwise be probabilistically dropped.

The OTEL trace ID is injected into the log context as "trace" for correlation with the tracing backend.

<a name="LockOption"></a>
## type [LockOption](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L22>)
## type [LockOption](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L25>)

LockOption configures [DistributedLock](<#DistributedLock>) behavior.

Expand All @@ -132,7 +137,7 @@ type LockOption func(*lockConfig)
```

<a name="WithKeyFunc"></a>
### func [WithKeyFunc](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L32>)
### func [WithKeyFunc](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L35>)

```go
func WithKeyFunc(fn func(name string) string) LockOption
Expand All @@ -141,16 +146,27 @@ func WithKeyFunc(fn func(name string) string) LockOption
WithKeyFunc sets a custom function to derive the lock key from the worker name. Default: "worker\-lock:\<name\>".

<a name="WithOnNotAcquired"></a>
### func [WithOnNotAcquired](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L44>)
### func [WithOnNotAcquired](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L52>)

```go
func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOption
```

WithOnNotAcquired sets a callback invoked when the lock is held by another instance. The cycle is skipped. Default: skip silently \(return nil\).

Caution: returning a non\-nil error from this callback triggers the framework's normal error handling — for periodic workers, this means restart with backoff. If you want to log and skip, return nil from this callback or use [WithSkipOnNotAcquired](<#WithSkipOnNotAcquired>).

<a name="WithSkipOnNotAcquired"></a>
### func [WithSkipOnNotAcquired](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L60>)

```go
func WithSkipOnNotAcquired(logFn func(ctx context.Context, name string)) LockOption
```

WithSkipOnNotAcquired is a convenience [LockOption](<#LockOption>) that calls logFn when the lock is held and skips the cycle \(returns nil, no restart\). If logFn is nil, the cycle is skipped silently \(same as the default but explicit in intent\).

<a name="WithTTLFunc"></a>
### func [WithTTLFunc](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L38>)
### func [WithTTLFunc](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L41>)

```go
func WithTTLFunc(fn func(name string) time.Duration) LockOption
Expand All @@ -159,9 +175,9 @@ func WithTTLFunc(fn func(name string) time.Duration) LockOption
WithTTLFunc sets a custom function to derive the lock TTL from the worker name. Default: 30s.

<a name="Locker"></a>
## type [Locker](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L13-L19>)
## type [Locker](<https://github.com/go-coldbrew/workers/blob/main/middleware/lock.go#L16-L22>)

Locker abstracts a distributed lock backend \(e.g., Redis, etcd, Consul\).
Locker abstracts a distributed lock backend \(e.g., Redis, etcd, Consul\). If your lock implementation already has Acquire\(ctx, key, ttl\) \(bool, error\) and Release\(ctx, key\) error methods, it satisfies this interface directly — no adapter needed.

```go
type Locker interface {
Expand Down
21 changes: 21 additions & 0 deletions middleware/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
)

// Locker abstracts a distributed lock backend (e.g., Redis, etcd, Consul).
// If your lock implementation already has Acquire(ctx, key, ttl) (bool, error)
// and Release(ctx, key) error methods, it satisfies this interface directly —
// no adapter needed.
type Locker interface {
// Acquire attempts to acquire a lock for the given key with a TTL.
// Returns true if the lock was acquired, false if held by another instance.
Expand Down Expand Up @@ -41,10 +44,28 @@ func WithTTLFunc(fn func(name string) time.Duration) LockOption {

// WithOnNotAcquired sets a callback invoked when the lock is held by another
// instance. The cycle is skipped. Default: skip silently (return nil).
//
// Caution: returning a non-nil error from this callback triggers the
// framework's normal error handling — for periodic workers, this means
// restart with backoff. If you want to log and skip, return nil from
// this callback or use [WithSkipOnNotAcquired].
func WithOnNotAcquired(fn func(ctx context.Context, name string) error) LockOption {
return func(c *lockConfig) { c.onNotAcquired = fn }
}

// WithSkipOnNotAcquired is a convenience [LockOption] that calls logFn
// when the lock is held and skips the cycle (returns nil, no restart).
// If logFn is nil, the cycle is skipped silently (same as the default
// but explicit in intent).
func WithSkipOnNotAcquired(logFn func(ctx context.Context, name string)) LockOption {
return WithOnNotAcquired(func(ctx context.Context, name string) error {
if logFn != nil {
logFn(ctx, name)
}
return nil
})
}

// DistributedLock acquires a distributed lock before each cycle. If the lock
// is held by another instance, the cycle is skipped (or the onNotAcquired
// callback is invoked). Release uses [context.WithoutCancel] so that context
Expand Down
31 changes: 31 additions & 0 deletions middleware/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,37 @@ func TestDistributedLock_AcquireError(t *testing.T) {
assert.EqualError(t, err, "redis down")
}

func TestDistributedLock_WithSkipOnNotAcquired(t *testing.T) {
locker := &mockLocker{acquired: false}
var gotName string
mw := DistributedLock(locker, WithSkipOnNotAcquired(func(_ context.Context, name string) {
gotName = name
}))

called := false
info := workers.NewWorkerInfo("skipped", 0)
err := mw(context.Background(), info, func(_ context.Context, _ *workers.WorkerInfo) error {
called = true
return nil
})

assert.NoError(t, err, "WithSkipOnNotAcquired should return nil")
assert.False(t, called, "next should not be called")
assert.Equal(t, "skipped", gotName, "logFn should receive worker name")
}

func TestDistributedLock_WithSkipOnNotAcquired_NilLogFn(t *testing.T) {
locker := &mockLocker{acquired: false}
mw := DistributedLock(locker, WithSkipOnNotAcquired(nil))

info := workers.NewWorkerInfo("skipped", 0)
err := mw(context.Background(), info, func(_ context.Context, _ *workers.WorkerInfo) error {
return nil
})

assert.NoError(t, err, "nil logFn should still skip silently")
}

func TestDistributedLock_CustomKeyAndTTL(t *testing.T) {
locker := &mockLocker{acquired: true}
mw := DistributedLock(locker,
Expand Down
38 changes: 38 additions & 0 deletions middleware/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,59 @@ package middleware

import (
"context"
"crypto/rand"

"github.com/go-coldbrew/log"
"github.com/go-coldbrew/tracing"
"github.com/go-coldbrew/workers"
oteltrace "go.opentelemetry.io/otel/trace"
)

// Tracing creates an OTEL span per cycle via go-coldbrew/tracing.
// The span is named "worker:<name>:cycle" and records errors.
//
// Worker spans are always sampled regardless of the global
// TracerProvider's sampler. This prevents silent span drops when
// using ParentBased(TraceIDRatioBased(ratio)), where worker root
// spans (which have no parent) would otherwise be probabilistically
// dropped.
//
// The OTEL trace ID is injected into the log context as "trace"
// for correlation with the tracing backend.
func Tracing() workers.Middleware {
return func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
ctx = ensureSampled(ctx)
span, ctx := tracing.NewInternalSpan(ctx, "worker:"+info.GetName()+":cycle")
defer span.Finish()

if spanCtx := oteltrace.SpanFromContext(ctx).SpanContext(); spanCtx.HasTraceID() {
ctx = log.AddToContext(ctx, "trace", spanCtx.TraceID().String())
}

err := next(ctx, info)
if err != nil {
_ = span.SetError(err)
}
return err
}
}

// ensureSampled injects a sampled remote span context so that
// ParentBased samplers always sample the next span created from
// this context. If the context already has a sampled span, it is
// returned unchanged.
func ensureSampled(ctx context.Context) context.Context {
if oteltrace.SpanFromContext(ctx).SpanContext().IsSampled() {
return ctx
}
var traceID oteltrace.TraceID
var spanID oteltrace.SpanID
_, _ = rand.Read(traceID[:])
_, _ = rand.Read(spanID[:])
return oteltrace.ContextWithRemoteSpanContext(ctx, oteltrace.NewSpanContext(oteltrace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
TraceFlags: oteltrace.FlagsSampled,
Remote: true,
}))
}
Loading
Loading