diff --git a/README.md b/README.md index 48644fa..58a23c5 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,7 @@ func DebugLoggingInterceptor() grpc.UnaryServerInterceptor DebugLoggingInterceptor is the interceptor that logs all request/response from a handler -## func [DefaultClientInterceptor]() +## func [DefaultClientInterceptor]() ```go func DefaultClientInterceptor(defaultOpts ...any) grpc.UnaryClientInterceptor @@ -174,7 +174,7 @@ func DefaultClientInterceptors(defaultOpts ...any) []grpc.UnaryClientInterceptor DefaultClientInterceptors are the set of default interceptors that should be applied to all client calls -## func [DefaultClientStreamInterceptor]() +## func [DefaultClientStreamInterceptor]() ```go func DefaultClientStreamInterceptor(defaultOpts ...any) grpc.StreamClientInterceptor @@ -183,7 +183,7 @@ func DefaultClientStreamInterceptor(defaultOpts ...any) grpc.StreamClientInterce DefaultClientStreamInterceptor are the set of default interceptors that should be applied to all stream client calls -## func [DefaultClientStreamInterceptors]() +## func [DefaultClientStreamInterceptors]() ```go func DefaultClientStreamInterceptors(defaultOpts ...any) []grpc.StreamClientInterceptor @@ -245,7 +245,7 @@ func (s *svc) echo(ctx context.Context, req *proto.EchoRequest) (*proto.EchoResp ``` -## func [ExecutorClientInterceptor]() +## func [ExecutorClientInterceptor]() ```go func ExecutorClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientInterceptor @@ -253,7 +253,7 @@ func ExecutorClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientI ExecutorClientInterceptor returns a unary client interceptor that wraps each RPC in an [Executor](<#Executor>). The executor provides resilience logic such as circuit breaking, retries, or bulkheading. -If no executor is configured \(neither via [SetDefaultExecutor](<#SetDefaultExecutor>) nor per\-call \[WithExecutor\]\), the RPC is invoked directly as a passthrough. +Executor resolution order: per\-call \[WithExecutor\] \> global [SetDefaultExecutor](<#SetDefaultExecutor>). When no executor is configured, the interceptor falls back to [HystrixClientInterceptor](<#HystrixClientInterceptor>) for backward compatibility. When the caller explicitly opts out via \[WithoutExecutor\] or \[WithoutHystrix\], the RPC is invoked directly as a passthrough. Excluded errors and codes \(set via \[WithExcludedErrors\] / \[WithExcludedCodes\]\) are reported as nil to the executor, preventing them from tripping circuit breakers or retry logic. The original error is still returned to the caller. @@ -267,7 +267,7 @@ func FilterMethodsFunc(ctx context.Context, fullMethodName string) bool FilterMethodsFunc is the default implementation of Filter function -## func [GRPCClientInterceptor]() +## func [GRPCClientInterceptor]() ```go func GRPCClientInterceptor(_ ...any) grpc.UnaryClientInterceptor @@ -285,7 +285,7 @@ func GetDebugLogHeaderName() string GetDebugLogHeaderName returns the current debug log header name. -## func [HystrixClientInterceptor]() +## func [HystrixClientInterceptor]() ```go func HystrixClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientInterceptor @@ -305,7 +305,7 @@ func NRHttpTracer(pattern string, h http.HandlerFunc) (string, http.HandlerFunc) NRHttpTracer wraps an HTTP handler with New Relic tracing. The configured filterFunc \(see SetFilterFunc\) is consulted on every request: paths it rejects run the underlying handler without starting a New Relic transaction. When pattern is non\-empty, newrelic.WrapHandleFunc is used so its route\-level instrumentation stays intact for non\-filtered paths. -## func [NewRelicClientInterceptor]() +## func [NewRelicClientInterceptor]() ```go func NewRelicClientInterceptor() grpc.UnaryClientInterceptor @@ -422,13 +422,13 @@ func SetDebugLogHeaderName(name string) SetDebugLogHeaderName sets the gRPC metadata header name that triggers per\-request log level override. Default is "x\-debug\-log\-level". The header value should be a valid log level \(e.g., "debug"\). Empty names are ignored. Must be called during initialization. -## func [SetDefaultExecutor]() +## func [SetDefaultExecutor]() ```go func SetDefaultExecutor(e Executor) ``` -SetDefaultExecutor sets the default [Executor](<#Executor>) used by [ExecutorClientInterceptor](<#ExecutorClientInterceptor>) for all outbound unary RPCs. When set, ExecutorClientInterceptor replaces [HystrixClientInterceptor](<#HystrixClientInterceptor>) in the default client interceptor chain. Must be called during initialization, before any RPCs are made. Not safe for concurrent use. +SetDefaultExecutor sets the default [Executor](<#Executor>) used by [ExecutorClientInterceptor](<#ExecutorClientInterceptor>) for outbound unary RPCs when ColdBrew client interceptors are enabled \(the default\). In that configuration, when no executor is configured \(neither global via SetDefaultExecutor nor per\-call via \[WithExecutor\]\), [ExecutorClientInterceptor](<#ExecutorClientInterceptor>) falls back to [HystrixClientInterceptor](<#HystrixClientInterceptor>) for backward compatibility. Must be called during initialization, before any RPCs are made. Not safe for concurrent use. ## func [SetDefaultRateLimit]() diff --git a/client.go b/client.go index cc49cdb..b4dfa22 100644 --- a/client.go +++ b/client.go @@ -33,11 +33,7 @@ func DefaultClientInterceptors(defaultOpts ...any) []grpc.UnaryClientInterceptor callOptions = append(callOptions, o) } } - if defaultConfig.defaultExecutor != nil { - ints = append(ints, ExecutorClientInterceptor(callOptions...)) - } else { - ints = append(ints, HystrixClientInterceptor(callOptions...)) - } + ints = append(ints, ExecutorClientInterceptor(callOptions...)) ints = append(ints, grpc_retry.UnaryClientInterceptor(), NewRelicClientInterceptor(), @@ -105,8 +101,11 @@ func GRPCClientInterceptor(_ ...any) grpc.UnaryClientInterceptor { // RPC in an [Executor]. The executor provides resilience logic such as circuit // breaking, retries, or bulkheading. // -// If no executor is configured (neither via [SetDefaultExecutor] nor per-call -// [WithExecutor]), the RPC is invoked directly as a passthrough. +// Executor resolution order: per-call [WithExecutor] > global [SetDefaultExecutor]. +// When no executor is configured, the interceptor falls back to +// [HystrixClientInterceptor] for backward compatibility. When the caller +// explicitly opts out via [WithoutExecutor] or [WithoutHystrix], the RPC is +// invoked directly as a passthrough. // // Excluded errors and codes (set via [WithExcludedErrors] / [WithExcludedCodes]) // are reported as nil to the executor, preventing them from tripping circuit @@ -129,13 +128,18 @@ func ExecutorClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientI } } - // Resolve executor: per-call > global > nil (passthrough) + // Resolve executor: per-call > global > hystrix fallback exec := defaultConfig.defaultExecutor if o.hasExecutor { exec = o.executor } if exec == nil { - return invoker(ctx, method, req, reply, cc, opts...) + if o.hasExecutor { + // Caller explicitly opted out (WithoutExecutor / WithoutHystrix). + return invoker(ctx, method, req, reply, cc, opts...) + } + // No executor configured; fall back to Hystrix for backward compat. + return hystrixFallback(ctx, method, req, reply, cc, invoker, opts, o) } var invokerErr error @@ -167,61 +171,71 @@ func ExecutorClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientI } } +// hystrixFallback runs the RPC through the deprecated Hystrix circuit breaker. +// It is the shared Hystrix implementation used both as the executor fallback +// when no executor is configured (neither global nor per-call) and by +// [HystrixClientInterceptor], preserving backward compatibility for services +// that have not migrated. +func hystrixFallback(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts []grpc.CallOption, o clientOptions) error { + if o.disableHystrix { + return invoker(ctx, method, req, reply, cc, opts...) + } + hystrixCmd := o.hystrixName + if hystrixCmd == "" { + hystrixCmd = method + } + newCtx, cancel := context.WithCancel(ctx) + defer cancel() + + var invokerErr error + hystrixErr := hystrix.Do(hystrixCmd, func() (err error) { + defer func() { + if r := recover(); r != nil { + err = errors.Wrap(fmt.Errorf("panic inside hystrix method: %s, req: %v, reply: %v", method, req, reply), "Hystrix") + log.Error(ctx, "panic", r, "method", method, "req", req, "reply", reply) + } + }() + defer notifier.NotifyOnPanic(newCtx, method) + invokerErr = invoker(newCtx, method, req, reply, cc, opts...) + for _, excludedErr := range o.excludedErrors { + if stdError.Is(invokerErr, excludedErr) { + return nil + } + } + if st, ok := status.FromError(invokerErr); ok { + if slices.Contains(o.excludedCodes, st.Code()) { + return nil + } + } + return invokerErr + }, nil) + if invokerErr != nil { + return invokerErr + } + return hystrixErr +} + // Deprecated: HystrixClientInterceptor wraps the unmaintained hystrix-go library. // Use [SetDefaultExecutor] with a failsafe-go executor instead. Will be removed in v1. // // See [ExecutorClientInterceptor] for the replacement. func HystrixClientInterceptor(defaultOpts ...grpc.CallOption) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - options := clientOptions{ - hystrixName: method, - } + o := clientOptions{} for _, opt := range defaultOpts { if opt != nil { - if o, ok := opt.(clientOption); ok { - o.process(&options) + if co, ok := opt.(clientOption); ok { + co.process(&o) } } } for _, opt := range opts { if opt != nil { - if o, ok := opt.(clientOption); ok { - o.process(&options) - } - } - } - if options.disableHystrix { - // short circuit if hystrix is disabled - return invoker(ctx, method, req, reply, cc, opts...) - } - newCtx, cancel := context.WithCancel(ctx) - defer cancel() - - var invokerErr error - hystrixErr := hystrix.Do(options.hystrixName, func() (err error) { - defer func() { - if r := recover(); r != nil { - err = errors.Wrap(fmt.Errorf("panic inside hystrix method: %s, req: %v, reply: %v", method, req, reply), "Hystrix") - log.Error(ctx, "panic", r, "method", method, "req", req, "reply", reply) - } - }() - defer notifier.NotifyOnPanic(newCtx, method) - invokerErr = invoker(newCtx, method, req, reply, cc, opts...) - for _, excludedErr := range options.excludedErrors { - if stdError.Is(invokerErr, excludedErr) { - return nil - } - } - if st, ok := status.FromError(invokerErr); ok { - if slices.Contains(options.excludedCodes, st.Code()) { - return nil + if co, ok := opt.(clientOption); ok { + co.process(&o) } } - return invokerErr - }, nil) - if invokerErr != nil { - return invokerErr } - return hystrixErr + return hystrixFallback(ctx, method, req, reply, cc, invoker, opts, o) } } diff --git a/config.go b/config.go index 935dfe0..5deaf29 100644 --- a/config.go +++ b/config.go @@ -237,8 +237,10 @@ func SetDefaultRateLimit(rps float64, burst int) { } // SetDefaultExecutor sets the default [Executor] used by [ExecutorClientInterceptor] -// for all outbound unary RPCs. When set, ExecutorClientInterceptor replaces -// [HystrixClientInterceptor] in the default client interceptor chain. +// for outbound unary RPCs when ColdBrew client interceptors are enabled (the +// default). In that configuration, when no executor is configured (neither global +// via SetDefaultExecutor nor per-call via [WithExecutor]), [ExecutorClientInterceptor] +// falls back to [HystrixClientInterceptor] for backward compatibility. // Must be called during initialization, before any RPCs are made. Not safe for concurrent use. func SetDefaultExecutor(e Executor) { defaultConfig.defaultExecutor = e diff --git a/interceptors_test.go b/interceptors_test.go index c92c61e..b560e0b 100644 --- a/interceptors_test.go +++ b/interceptors_test.go @@ -434,7 +434,7 @@ func TestHystrixClientInterceptor(t *testing.T) { func TestExecutorClientInterceptor(t *testing.T) { ctx := context.Background() - t.Run("no executor passthrough", func(t *testing.T) { + t.Run("no executor falls back to hystrix", func(t *testing.T) { resetGlobals() interceptor := ExecutorClientInterceptor() invokerCalled := false @@ -512,6 +512,35 @@ func TestExecutorClientInterceptor(t *testing.T) { } }) + t.Run("per-call executor without global executor", func(t *testing.T) { + resetGlobals() + defer resetGlobals() + + perCallCalled := false + perCallExec := func(ctx context.Context, method string, fn func(ctx context.Context) error) error { + perCallCalled = true + return fn(ctx) + } + + interceptor := ExecutorClientInterceptor() + invokerCalled := false + invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + invokerCalled = true + return nil + } + + err := interceptor(ctx, "/test/Method", nil, nil, nil, invoker, WithExecutor(perCallExec)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !perCallCalled { + t.Error("per-call executor should be used even without global executor") + } + if !invokerCalled { + t.Error("invoker was not called") + } + }) + t.Run("WithoutExecutor disables global", func(t *testing.T) { resetGlobals() executorCalled := false @@ -718,6 +747,102 @@ func TestDefaultClientInterceptors_ExecutorBranching(t *testing.T) { t.Error("executor interceptor should have been used, not hystrix") } }) + + t.Run("per-call WithExecutor works without global executor", func(t *testing.T) { + resetGlobals() + defer resetGlobals() + + perCallCalled := false + perCallExec := func(ctx context.Context, method string, fn func(ctx context.Context) error) error { + perCallCalled = true + return fn(ctx) + } + + ints := DefaultClientInterceptors() + if len(ints) == 0 { + t.Fatal("expected interceptors in default chain") + } + invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + return nil + } + err := ints[0](context.Background(), "/test/Method", nil, nil, nil, invoker, WithExecutor(perCallExec)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !perCallCalled { + t.Error("per-call executor should be used even without global executor") + } + }) + + t.Run("per-connection WithExecutor works without global executor", func(t *testing.T) { + resetGlobals() + defer resetGlobals() + + connExecCalled := false + connExec := func(ctx context.Context, method string, fn func(ctx context.Context) error) error { + connExecCalled = true + return fn(ctx) + } + + ints := DefaultClientInterceptors(WithExecutor(connExec)) + if len(ints) == 0 { + t.Fatal("expected interceptors in default chain") + } + invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + return nil + } + err := ints[0](context.Background(), "/test/Method", nil, nil, nil, invoker) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !connExecCalled { + t.Error("per-connection executor should be used even without global executor") + } + }) + + t.Run("WithoutHystrix passthrough without global executor", func(t *testing.T) { + resetGlobals() + defer resetGlobals() + + ints := DefaultClientInterceptors() + if len(ints) == 0 { + t.Fatal("expected interceptors in default chain") + } + invokerCalled := false + invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + invokerCalled = true + return nil + } + err := ints[0](context.Background(), "/test/Method", nil, nil, nil, invoker, WithoutHystrix()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !invokerCalled { + t.Error("invoker should be called directly when WithoutHystrix is set") + } + }) + + t.Run("WithoutExecutor passthrough without global executor", func(t *testing.T) { + resetGlobals() + defer resetGlobals() + + ints := DefaultClientInterceptors() + if len(ints) == 0 { + t.Fatal("expected interceptors in default chain") + } + invokerCalled := false + invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + invokerCalled = true + return nil + } + err := ints[0](context.Background(), "/test/Method", nil, nil, nil, invoker, WithoutExecutor()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !invokerCalled { + t.Error("invoker should be called directly when WithoutExecutor is set") + } + }) } func TestChainUnaryServer(t *testing.T) {