Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ type customError struct {
basePath string // snapshot of basePath at capture time
cause error
wrapped error // immediate parent for Unwrap() chain; may differ from cause
shouldNotify bool
shouldNotify atomic.Bool
status *grpcstatus.Status
}

// ShouldNotify returns true if the error should be reported to notifiers.
func (c *customError) ShouldNotify() bool {
return c.shouldNotify
return c.shouldNotify.Load()
}

// Notified marks the error as having been notified (or not).
func (c *customError) Notified(status bool) {
c.shouldNotify = !status
c.shouldNotify.Store(!status)
}

// Error returns the error message.
Expand Down Expand Up @@ -233,30 +233,30 @@ func WrapWithSkipAndStatus(err error, msg string, skip int, status *grpcstatus.S
//if we have stack information reuse that
if e, ok := err.(ErrorExt); ok {
c := &customError{
Msg: msg + e.Error(),
cause: e.Cause(),
wrapped: err, // preserve full chain for errors.Is/errors.As
status: status,
shouldNotify: true,
Msg: msg + e.Error(),
cause: e.Cause(),
wrapped: err, // preserve full chain for errors.Is/errors.As
status: status,
}
c.shouldNotify.Store(true)

c.stack = e.Callers()
if ce, ok := e.(*customError); ok {
c.basePath = ce.basePath
}
if n, ok := e.(NotifyExt); ok {
c.shouldNotify = n.ShouldNotify()
c.shouldNotify.Store(n.ShouldNotify())
}
return c
}

c := &customError{
Msg: msg + err.Error(),
cause: err,
wrapped: err,
shouldNotify: true,
status: status,
Msg: msg + err.Error(),
cause: err,
wrapped: err,
status: status,
}
c.shouldNotify.Store(true)
c.captureStack(skip + 1)
return c

Expand Down
29 changes: 21 additions & 8 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

gobrake "github.com/airbrake/gobrake/v5"
Expand Down Expand Up @@ -35,12 +36,20 @@ var (
hostname string
traceHeader string = "x-trace-id"

// asyncSem is a semaphore that bounds the number of concurrent async
// notification goroutines. When full, new notifications are dropped
// to prevent goroutine explosion under sustained error bursts.
asyncSem = make(chan struct{}, 1000)
)

// asyncSem is a semaphore that bounds the number of concurrent async
// notification goroutines. When full, new notifications are dropped
// to prevent goroutine explosion under sustained error bursts.
// Stored as atomic.Pointer to eliminate the race between SetMaxAsyncNotifications
// and NotifyAsync goroutines reading the channel variable.
var asyncSem atomic.Pointer[chan struct{}]

func init() {
ch := make(chan struct{}, 20)
asyncSem.Store(&ch)
}

const (
tracerID = "tracerId"
)
Expand All @@ -50,11 +59,13 @@ var asyncSemOnce sync.Once
// SetMaxAsyncNotifications sets the maximum number of concurrent async
// notification goroutines. When the limit is reached, new async notifications
// are dropped to prevent goroutine explosion under sustained error bursts.
// Default is 1000. Can only be called once; subsequent calls are no-ops.
// Default is 20. The first successful call wins; subsequent calls are no-ops.
// It is safe to call concurrently with NotifyAsync.
func SetMaxAsyncNotifications(n int) {
if n > 0 {
asyncSemOnce.Do(func() {
asyncSem = make(chan struct{}, n)
ch := make(chan struct{}, n)
asyncSem.Store(&ch)
})
}
}
Expand All @@ -67,7 +78,7 @@ func NotifyAsync(err error, rawData ...interface{}) error {
if err == nil {
return nil
}
sem := asyncSem
sem := *asyncSem.Load()
select {
case sem <- struct{}{}:
data := append([]interface{}(nil), rawData...)
Expand Down Expand Up @@ -553,7 +564,9 @@ func SetTraceId(ctx context.Context) context.Context {
func GetTraceId(ctx context.Context) string {
if o := options.FromContext(ctx); o != nil {
if data, found := o.Get(tracerID); found {
return data.(string)
if traceID, ok := data.(string); ok {
return traceID
}
}
}
if logCtx := loggers.FromContext(ctx); logCtx != nil {
Expand Down
101 changes: 101 additions & 0 deletions notifier/notifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package notifier

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/go-coldbrew/errors"
"github.com/go-coldbrew/options"
)

func TestGetTraceId_NonStringValue(t *testing.T) {
// Regression test: GetTraceId must not panic when the tracerID
// option holds a non-string value.
ctx := options.AddToOptions(context.Background(), tracerID, 12345)

// Before the fix this panicked with "interface conversion: interface {} is int, not string".
got := GetTraceId(ctx)
if got != "" {
t.Errorf("expected empty string for non-string tracerID, got %q", got)
}
}

func TestGetTraceId_StringValue(t *testing.T) {
ctx := options.AddToOptions(context.Background(), tracerID, "abc-123")

got := GetTraceId(ctx)
if got != "abc-123" {
t.Errorf("expected 'abc-123', got %q", got)
}
}

func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
// Set a tiny semaphore so we can observe drops.
ch := make(chan struct{}, 1)
asyncSem.Store(&ch)
t.Cleanup(func() {
// Drain any tokens left by test goroutines.
select {
case <-ch:
default:
}

Copilot AI Mar 31, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup drains a token from ch without ensuring all NotifyAsync goroutines have finished. If a goroutine is still running, draining its token can cause its deferred <-s release to block forever. Consider avoiding the drain entirely, or synchronizing on goroutine completion before manipulating the semaphore channel.

Suggested change
// Drain any tokens left by test goroutines.
select {
case <-ch:
default:
}

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — cleanup now drains with a for-len loop, and since we pre-fill rather than spawning goroutines, there's no risk of blocking a deferred release.

// Restore default.
def := make(chan struct{}, 20)
asyncSem.Store(&def)
})

// Fill the single slot with a blocking goroutine.
block := make(chan struct{})
blockErr := errors.New("blocker")
NotifyAsync(blockErr) // takes the one slot
// Give the goroutine a moment to acquire the semaphore token.
time.Sleep(10 * time.Millisecond)

Copilot AI Mar 31, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "blocking goroutine" setup doesn’t actually block anything: block is never observed by the async notification goroutine, so the semaphore token is likely released immediately. Relying on time.Sleep here makes the test flaky; prefer a synchronization mechanism that guarantees the token is held (or just pre-fill the semaphore channel to simulate a full pool).

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — removed the blocking goroutine approach entirely. Now pre-fills the channel directly, making the test deterministic with no time.Sleep.

// Now the semaphore is full. Additional calls should be dropped.
var dropped atomic.Int32
originalDebug := NotifyAsync(errors.New("should-drop"))
// NotifyAsync returns the error regardless of drop/send, so we can't
// check the return value. Instead, verify the semaphore is still full
// by checking we can't send another token.
select {
case ch <- struct{}{}:
// We could send — means the slot was free, which means the previous
// call was dropped (it didn't acquire). That's the expected path.
<-ch // put it back
dropped.Add(1)
default:
// Slot is full — the previous NotifyAsync got in, which shouldn't
// happen since we already filled it. This is also fine if timing
// allowed the blocker to finish.
}

Copilot AI Mar 31, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestNotifyAsync_BoundedConcurrency is currently non-asserting: the select treats both the "could send" and "default" branches as acceptable, so the test will pass regardless of whether NotifyAsync actually drops when the semaphore is full. Please make the test deterministic by forcing the semaphore to be full (e.g., pre-fill the channel) and asserting the drop path (e.g., channel remains full / no token acquired).

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — rewrote the test to pre-fill the semaphore channel directly (no timing). Asserts len(ch) == cap(ch) after NotifyAsync to verify the drop path was taken.

_ = originalDebug

// Unblock the first goroutine so it releases the token.
close(block)
// Wait a bit for cleanup.

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestNotifyAsync_BoundedConcurrency doesn’t actually ensure the semaphore slot is held: block is never used by the notify path, and the test accepts both outcomes in the select without asserting that any calls were dropped. As written, it can pass even if NotifyAsync ignores the semaphore. Consider using a custom error implementing errors.NotifyExt where Notified(true) blocks to hold the token, then assert subsequent NotifyAsync calls hit the drop path (and wait for goroutines to exit to avoid leaks).

Suggested change
func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
// Set a tiny semaphore so we can observe drops.
ch := make(chan struct{}, 1)
asyncSem.Store(&ch)
t.Cleanup(func() {
// Drain any tokens left by test goroutines.
select {
case <-ch:
default:
}
// Restore default.
def := make(chan struct{}, 20)
asyncSem.Store(&def)
})
// Fill the single slot with a blocking goroutine.
block := make(chan struct{})
blockErr := errors.New("blocker")
NotifyAsync(blockErr) // takes the one slot
// Give the goroutine a moment to acquire the semaphore token.
time.Sleep(10 * time.Millisecond)
// Now the semaphore is full. Additional calls should be dropped.
var dropped atomic.Int32
originalDebug := NotifyAsync(errors.New("should-drop"))
// NotifyAsync returns the error regardless of drop/send, so we can't
// check the return value. Instead, verify the semaphore is still full
// by checking we can't send another token.
select {
case ch <- struct{}{}:
// We could send — means the slot was free, which means the previous
// call was dropped (it didn't acquire). That's the expected path.
<-ch // put it back
dropped.Add(1)
default:
// Slot is full — the previous NotifyAsync got in, which shouldn't
// happen since we already filled it. This is also fine if timing
// allowed the blocker to finish.
}
_ = originalDebug
// Unblock the first goroutine so it releases the token.
close(block)
// Wait a bit for cleanup.
// testNotifyErr implements errors.NotifyExt and allows the test to
// distinguish between accepted and dropped async notifications.
type testNotifyErr struct {
msg string
block <-chan struct{}
dropped *atomic.Int32
}
func (e *testNotifyErr) Error() string {
return e.msg
}
// Notified is called by NotifyAsync. When async is true, the notification
// was accepted and we block to hold the semaphore token. When async is
// false, the notification was dropped and we record that fact.
func (e *testNotifyErr) Notified(async bool) {
if async {
// Hold the token until the test closes e.block.
<-e.block
return
}
if e.dropped != nil {
e.dropped.Add(1)
}
}
func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
// Set a tiny semaphore so we can observe drops.
ch := make(chan struct{}, 1)
asyncSem.Store(&ch)
t.Cleanup(func() {
// Drain any tokens left by test goroutines.
for {
select {
case <-ch:
default:
goto drained
}
}
drained:
// Restore default.
def := make(chan struct{}, 20)
asyncSem.Store(&def)
})
// Fill the single slot with a blocking notification.
block := make(chan struct{})
var dropped atomic.Int32
holder := &testNotifyErr{
msg: "holder",
block: block,
dropped: &dropped,
}
NotifyAsync(holder) // should take the one slot and block in Notified(true)
// Give the goroutine a moment to acquire the semaphore token.
time.Sleep(10 * time.Millisecond)
// Now the semaphore is full. Additional calls should be dropped and
// reported via Notified(false).
for i := 0; i < 5; i++ {
NotifyAsync(&testNotifyErr{
msg: "should-drop",
block: block,
dropped: &dropped,
})
}
// Allow callbacks to run.
time.Sleep(50 * time.Millisecond)
if got := dropped.Load(); got == 0 {
t.Fatalf("expected some async notifications to be dropped while semaphore slot held, got %d drops", got)
}
// Unblock the goroutines so they can release the token and exit.
close(block)

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — test now pre-fills the 1-slot semaphore and asserts len(ch) == cap(ch) after NotifyAsync.

time.Sleep(50 * time.Millisecond)
}
Comment thread
ankurs marked this conversation as resolved.

func TestSetMaxAsyncNotifications_ConcurrentAccess(t *testing.T) {
// Regression test: SetMaxAsyncNotifications and NotifyAsync must not
// race on the asyncSem variable. Run with -race to verify.
var wg sync.WaitGroup

wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
SetMaxAsyncNotifications(50)
}
}()
Comment thread
ankurs marked this conversation as resolved.
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
NotifyAsync(errors.New("race test"))
}
}()
wg.Wait()
}
Loading