diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index 5435a4c39cc..265da5531c1 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -26,7 +26,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/balancer" @@ -60,7 +59,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -70,7 +69,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( rpc.ConvertErrorStreamClientInterceptor, - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), @@ -99,7 +98,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts .. grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -110,7 +109,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts .. )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( rpc.ConvertErrorStreamClientInterceptor, - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), rpc.RefresherStreamClientInterceptor(dynconfig), diff --git a/pkg/rpc/dfdaemon/client/client_v1.go b/pkg/rpc/dfdaemon/client/client_v1.go index 67a328e851b..8f39494ab6d 100644 --- a/pkg/rpc/dfdaemon/client/client_v1.go +++ b/pkg/rpc/dfdaemon/client/client_v1.go @@ -26,7 +26,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/emptypb" @@ -50,7 +49,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -60,7 +59,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( rpc.ConvertErrorStreamClientInterceptor, - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go index 7d2c6d00e06..1037ca3bf22 100644 --- a/pkg/rpc/dfdaemon/client/client_v2.go +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -25,13 +25,13 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc" ) // GetV2 returns v2 version of the dfdaemon client. @@ -41,7 +41,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err target, append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -50,7 +50,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/health/client/client.go b/pkg/rpc/health/client/client.go index 686415da452..5a53b68f142 100644 --- a/pkg/rpc/health/client/client.go +++ b/pkg/rpc/health/client/client.go @@ -26,11 +26,11 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc" ) const ( @@ -45,12 +45,12 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli target, append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/inference/client/client_v1.go b/pkg/rpc/inference/client/client_v1.go index ac3008da626..6351162e048 100644 --- a/pkg/rpc/inference/client/client_v1.go +++ b/pkg/rpc/inference/client/client_v1.go @@ -26,12 +26,12 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" inferencev1 "d7y.io/api/v2/pkg/apis/inference/v1" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc" ) const ( @@ -53,7 +53,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err target, append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -62,7 +62,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/interceptor.go b/pkg/rpc/interceptor.go index b72aa18f2bd..4b6531bad6d 100644 --- a/pkg/rpc/interceptor.go +++ b/pkg/rpc/interceptor.go @@ -18,8 +18,10 @@ package rpc import ( "context" + "sync" "github.com/juju/ratelimit" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -27,6 +29,39 @@ import ( "d7y.io/dragonfly/v2/internal/dferrors" ) +var ( + // otelUnaryInterceptor is the unary interceptor for tracing. + otelUnaryInterceptor grpc.UnaryClientInterceptor + + // otelStreamInterceptor is the stream interceptor for tracing. + otelStreamInterceptor grpc.StreamClientInterceptor + + // interceptorsInitialized is used to ensure that otel interceptors are initialized only once. + interceptorsInitialized = sync.Once{} +) + +// OTEL interceptors must be created once to avoid memory leak, +// refer to https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226 and +// https://github.com/argoproj/argo-cd/pull/15174. +func ensureOTELInterceptorInitialized() { + interceptorsInitialized.Do(func() { + otelUnaryInterceptor = otelgrpc.UnaryClientInterceptor() + otelStreamInterceptor = otelgrpc.StreamClientInterceptor() + }) +} + +// OTELUnaryClientInterceptor returns a new unary client interceptor that traces gRPC requests. +func OTELUnaryClientInterceptor() grpc.UnaryClientInterceptor { + ensureOTELInterceptorInitialized() + return otelUnaryInterceptor +} + +// OTELStreamClientInterceptor returns a new stream client interceptor that traces gRPC requests. +func OTELStreamClientInterceptor() grpc.StreamClientInterceptor { + ensureOTELInterceptorInitialized() + return otelStreamInterceptor +} + // Refresher is the interface for refreshing dynconfig. type Refresher interface { Refresh() error diff --git a/pkg/rpc/manager/client/client_v1.go b/pkg/rpc/manager/client/client_v1.go index 0fdfeeb7aa4..fed67ac3165 100644 --- a/pkg/rpc/manager/client/client_v1.go +++ b/pkg/rpc/manager/client/client_v1.go @@ -27,7 +27,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -37,6 +36,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc" healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client" ) @@ -47,7 +47,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V target, append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -56,7 +56,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/manager/client/client_v2.go b/pkg/rpc/manager/client/client_v2.go index 355963c0326..c840d9f3bf3 100644 --- a/pkg/rpc/manager/client/client_v2.go +++ b/pkg/rpc/manager/client/client_v2.go @@ -27,7 +27,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -37,6 +36,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc" healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client" ) @@ -47,7 +47,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V target, append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -56,7 +56,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/scheduler/client/client_v1.go b/pkg/rpc/scheduler/client/client_v1.go index bea284cd9d6..7216c18db5b 100644 --- a/pkg/rpc/scheduler/client/client_v1.go +++ b/pkg/rpc/scheduler/client/client_v1.go @@ -25,7 +25,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/balancer" @@ -55,7 +54,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -66,7 +65,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( rpc.ConvertErrorStreamClientInterceptor, - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), rpc.RefresherStreamClientInterceptor(dynconfig), @@ -95,7 +94,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -105,7 +104,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( rpc.ConvertErrorStreamClientInterceptor, - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/scheduler/client/client_v2.go b/pkg/rpc/scheduler/client/client_v2.go index 634635b5131..7def46e2206 100644 --- a/pkg/rpc/scheduler/client/client_v2.go +++ b/pkg/rpc/scheduler/client/client_v2.go @@ -25,7 +25,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/balancer" @@ -53,7 +52,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt append([]grpc.DialOption{ grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -63,7 +62,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt rpc.RefresherUnaryClientInterceptor(dynconfig), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), rpc.RefresherStreamClientInterceptor(dynconfig), @@ -91,7 +90,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V append([]grpc.DialOption{ grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -100,7 +99,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/security/client/client_v1.go b/pkg/rpc/security/client/client_v1.go index 2ceba0030d3..43475059e34 100644 --- a/pkg/rpc/security/client/client_v1.go +++ b/pkg/rpc/security/client/client_v1.go @@ -27,13 +27,13 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" securityv1 "d7y.io/api/v2/pkg/apis/security/v1" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc" healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client" ) @@ -56,7 +56,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err target, append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -65,7 +65,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )), diff --git a/pkg/rpc/trainer/client/client_v1.go b/pkg/rpc/trainer/client/client_v1.go index 562470ca859..576243e0033 100644 --- a/pkg/rpc/trainer/client/client_v1.go +++ b/pkg/rpc/trainer/client/client_v1.go @@ -26,12 +26,12 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" trainerv1 "d7y.io/api/v2/pkg/apis/trainer/v1" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc" ) const ( @@ -50,7 +50,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V target, append([]grpc.DialOption{ grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(), + rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), grpc_retry.UnaryClientInterceptor( @@ -59,7 +59,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V ), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(), + rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), )),