Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ build:

test:
go test -race ./...
cd examples && go test -race ./...

doc:
go tool gomarkdoc --output '{{.Dir}}/README.md' ./...
go tool gomarkdoc --exclude-dirs ./examples --output '{{.Dir}}/README.md' ./...
Comment thread
ankurs marked this conversation as resolved.

lint:
go tool golangci-lint run
Expand Down
92 changes: 62 additions & 30 deletions README.md

Large diffs are not rendered by default.

84 changes: 75 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ func DefaultClientInterceptors(defaultOpts ...any) []grpc.UnaryClientInterceptor
ints = append(ints, defaultConfig.unaryClientInterceptors...)
}
if defaultConfig.useCBClientInterceptors {
hystrixOptions := make([]grpc.CallOption, 0)
callOptions := make([]grpc.CallOption, 0)
for _, opt := range defaultOpts {
if opt == nil {
continue
}
if o, ok := opt.(grpc.CallOption); ok {
hystrixOptions = append(hystrixOptions, o)
callOptions = append(callOptions, o)
}
}
if defaultConfig.defaultExecutor != nil {
ints = append(ints, ExecutorClientInterceptor(callOptions...))
} else {
ints = append(ints, HystrixClientInterceptor(callOptions...))
}
ints = append(ints,
HystrixClientInterceptor(hystrixOptions...),
grpc_retry.UnaryClientInterceptor(),
NewRelicClientInterceptor(),
getClientMetrics().UnaryClientInterceptor(),
Expand Down Expand Up @@ -97,14 +101,76 @@ func GRPCClientInterceptor(_ ...any) grpc.UnaryClientInterceptor {
}
}

// HystrixClientInterceptor returns a unary client interceptor that executes the RPC inside a Hystrix command.
// ExecutorClientInterceptor returns a unary client interceptor that wraps each
// RPC in an [Executor]. The executor provides resilience logic such as circuit
// breaking, retries, or bulkheading.
//
// If no executor is configured (neither via [SetDefaultExecutor] nor per-call
// [WithExecutor]), the RPC is invoked directly as a passthrough.
//
// Note: This interceptor wraps github.com/afex/hystrix-go which has been unmaintained since 2018.
// Consider migrating to github.com/failsafe-go/failsafe-go for circuit breaker functionality.
// Excluded errors and codes (set via [WithExcludedErrors] / [WithExcludedCodes])
// are reported as nil to the executor, preventing them from tripping circuit
// breakers or retry logic. The original error is still returned to the caller.
func ExecutorClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
o := clientOptions{}
for _, opt := range defaultOpts {
if opt != nil {
if co, ok := opt.(clientOption); ok {
co.process(&o)
}
}
}
Comment thread
ankurs marked this conversation as resolved.
for _, opt := range opts {
if opt != nil {
if co, ok := opt.(clientOption); ok {
co.process(&o)
}
}
}

// Resolve executor: per-call > global > nil (passthrough)
exec := defaultConfig.defaultExecutor
if o.hasExecutor {
exec = o.executor
}
if exec == nil {
return invoker(ctx, method, req, reply, cc, opts...)
}
Comment thread
ankurs marked this conversation as resolved.

var invokerErr error
executorErr := exec(ctx, method, func(execCtx context.Context) (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.Wrap(fmt.Errorf("panic in executor method: %s", method), "Executor")
log.Error(execCtx, "panic", r, "method", method)
}
}()
defer notifier.NotifyOnPanic(execCtx, method)
invokerErr = invoker(execCtx, method, req, reply, cc, opts...)
for _, excludedErr := range o.excludedErrors {
if stdError.Is(invokerErr, excludedErr) {
return nil
}
}
if st, ok := status.FromError(invokerErr); ok {
if slices.Contains(o.excludedCodes, st.Code()) {
return nil
}
}
return invokerErr
})
if invokerErr != nil {
return invokerErr
}
return executorErr
}
}

// Deprecated: HystrixClientInterceptor wraps the unmaintained hystrix-go library.
// Use [SetDefaultExecutor] with a failsafe-go executor instead. Will be removed in v1.
//
// The interceptor applies provided default and per-call client options to configure Hystrix behavior (for example the command name, disabled flag, excluded errors, and excluded gRPC status codes).
// If Hystrix is disabled via options, the RPC is invoked directly. If the underlying RPC returns an error that matches any configured excluded error or whose gRPC status code matches any configured excluded code, Hystrix fallback is skipped and the RPC error is returned.
// Panics raised during the RPC invocation are captured and reported to the notifier before being converted into an error. If the RPC itself returns an error, that error is returned; otherwise any error produced by Hystrix is returned.
// See [ExecutorClientInterceptor] for the replacement.
func HystrixClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
options := clientOptions{
Expand Down
11 changes: 11 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type interceptorConfig struct {
rateLimiter ratelimit_middleware.Limiter
defaultRateLimit rate.Limit
defaultRateBurst int

// Executor for resilience (circuit breaking, etc.)
defaultExecutor Executor
}

var defaultConfig = newDefaultConfig()
Expand Down Expand Up @@ -203,3 +206,11 @@ func SetDefaultRateLimit(rps float64, burst int) {
}
defaultConfig.defaultRateBurst = burst
}

// SetDefaultExecutor sets the default [Executor] used by [ExecutorClientInterceptor]
// for all outbound unary RPCs. When set, ExecutorClientInterceptor replaces
// [HystrixClientInterceptor] in the default client interceptor chain.
// Must be called during initialization, before any RPCs are made. Not safe for concurrent use.
func SetDefaultExecutor(e Executor) {
defaultConfig.defaultExecutor = e
}
139 changes: 139 additions & 0 deletions examples/failsafe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package examples_test

import (
"context"
"fmt"
"sync"
"time"

"github.com/failsafe-go/failsafe-go"
"github.com/failsafe-go/failsafe-go/bulkhead"
"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-coldbrew/interceptors"
)

// ExampleSetDefaultExecutor demonstrates setting up a circuit breaker for
// specific gRPC methods using failsafe-go. The executor receives the method
// name, so you can filter which methods get circuit breaking.
func ExampleSetDefaultExecutor() {
cb := circuitbreaker.NewBuilder[any]().
WithFailureThreshold(5).
WithDelay(5 * time.Second).
WithSuccessThreshold(2).
Build()

// Only apply circuit breaking to specific methods
protected := map[string]bool{
"/payment.Service/Charge": true,
"/payment.Service/Refund": true,
}

interceptors.SetDefaultExecutor(func(ctx context.Context, method string, fn func(ctx context.Context) error) error {
if !protected[method] {
return fn(ctx) // passthrough for non-protected methods
}
return failsafe.With[any](cb).WithContext(ctx).Run(func() error {
return fn(ctx)
})
})

fmt.Println("method-filtered circuit breaker configured")
// Output: method-filtered circuit breaker configured
}

// ExampleSetDefaultExecutor_perMethod demonstrates per-method circuit breakers
// with different limits. Each method gets its own circuit breaker with
// tuning appropriate for that method's characteristics.
func ExampleSetDefaultExecutor_perMethod() {
type cbConfig struct {
failureThreshold uint
delay time.Duration
}

// Different limits per method
configs := map[string]cbConfig{
"/payment.Service/Charge": {failureThreshold: 3, delay: 10 * time.Second}, // sensitive — trip fast, recover slow
"/payment.Service/Refund": {failureThreshold: 3, delay: 10 * time.Second},
"/user.Service/GetUser": {failureThreshold: 10, delay: 5 * time.Second}, // tolerant — allow more failures
"/feed.Service/GetFeed": {failureThreshold: 10, delay: 5 * time.Second},
}

var (
mu sync.Mutex
breakers = make(map[string]circuitbreaker.CircuitBreaker[any])
)

interceptors.SetDefaultExecutor(func(ctx context.Context, method string, fn func(ctx context.Context) error) error {
cfg, ok := configs[method]
if !ok {
return fn(ctx) // no circuit breaker for unconfigured methods
}

mu.Lock()
cb, exists := breakers[method]
if !exists {
cb = circuitbreaker.NewBuilder[any]().
WithFailureThreshold(cfg.failureThreshold).
WithDelay(cfg.delay).
Build()
breakers[method] = cb
}
mu.Unlock()

return failsafe.With[any](cb).WithContext(ctx).Run(func() error {
return fn(ctx)
})
})

fmt.Println("per-method circuit breakers configured")
// Output: per-method circuit breakers configured
}

// ExampleSetDefaultExecutor_bulkhead demonstrates composing a circuit breaker
// with a bulkhead (concurrency limiter) using failsafe-go.
func ExampleSetDefaultExecutor_bulkhead() {
cb := circuitbreaker.NewBuilder[any]().
WithFailureThreshold(5).
WithDelay(5 * time.Second).
Build()

bh := bulkhead.New[any](200)

// Policies execute right-to-left: bulkhead limits concurrency,
// circuit breaker wraps the result.
interceptors.SetDefaultExecutor(func(ctx context.Context, method string, fn func(ctx context.Context) error) error {
return failsafe.With[any](cb, bh).WithContext(ctx).Run(func() error {
return fn(ctx)
})
})

fmt.Println("circuit breaker + bulkhead configured")
// Output: circuit breaker + bulkhead configured
}

// ExampleWithoutExecutor demonstrates disabling the executor for specific RPCs.
// This is useful for health checks or internal loopback connections that should
// not be circuit-broken.
func ExampleWithoutExecutor() {
_ = interceptors.WithoutExecutor()
fmt.Println("executor disabled for this call")
// Output: executor disabled for this call
}

// ExampleWithExecutor demonstrates setting a custom per-service executor
// with different circuit breaker tuning.
func ExampleWithExecutor() {
paymentCB := circuitbreaker.NewBuilder[any]().
WithFailureThreshold(3). // more sensitive
WithDelay(10 * time.Second). // longer recovery
Build()

_ = interceptors.WithExecutor(func(ctx context.Context, method string, fn func(ctx context.Context) error) error {
return failsafe.With[any](paymentCB).WithContext(ctx).Run(func() error {
return fn(ctx)
})
})

fmt.Println("per-service executor configured")
// Output: per-service executor configured
}
62 changes: 62 additions & 0 deletions examples/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
module github.com/go-coldbrew/interceptors/examples

go 1.25.9

require (
github.com/failsafe-go/failsafe-go v0.9.6
github.com/go-coldbrew/interceptors v0.1.25
)

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20260209202127-80ab13bee0bf.1 // indirect
buf.build/go/protovalidate v1.1.3 // indirect
cel.dev/expr v0.25.1 // indirect
github.com/adhocore/gronx v1.19.6 // indirect
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 // indirect
github.com/airbrake/gobrake/v5 v5.6.2 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.24.4 // indirect
github.com/caio/go-tdigest/v4 v4.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/getsentry/sentry-go v0.43.0 // indirect
github.com/go-coldbrew/errors v0.2.14 // indirect
github.com/go-coldbrew/log v0.3.2 // indirect
github.com/go-coldbrew/options v0.3.0 // indirect
github.com/go-coldbrew/tracing v0.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/cel-go v0.27.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gopherjs/gopherjs v1.20.1 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/k2io/hookingo v1.0.6 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/newrelic/go-agent/v3 v3.42.0 // indirect
github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.4.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.23.2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/rollbar/rollbar-go v1.4.8 // indirect
github.com/smarty/assertions v1.16.0 // indirect
go.opentelemetry.io/otel v1.43.0 // indirect
go.opentelemetry.io/otel/trace v1.43.0 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
golang.org/x/exp v0.0.0-20250813145105-42675adae3e6 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
golang.org/x/time v0.15.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260319201613-d00831a3d3e7 // indirect
google.golang.org/grpc v1.79.3 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)

replace github.com/go-coldbrew/interceptors => ../
Loading
Loading