Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: otelgrpc.UnaryClientInterceptor memory leak #2772

Merged
merged 1 commit into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/rpc/cdnsystem/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"

Expand Down Expand Up @@ -60,7 +59,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -70,7 +69,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down Expand Up @@ -99,7 +98,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts ..
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -110,7 +109,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts ..
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig),
Expand Down
5 changes: 2 additions & 3 deletions pkg/rpc/dfdaemon/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -50,7 +49,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -60,7 +59,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
)

// GetV2 returns v2 version of the dfdaemon client.
Expand All @@ -41,7 +41,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -50,7 +50,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/health/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
)

const (
Expand All @@ -45,12 +45,12 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/inference/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"

inferencev1 "d7y.io/api/v2/pkg/apis/inference/v1"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
)

const (
Expand All @@ -53,7 +53,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -62,7 +62,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
35 changes: 35 additions & 0 deletions pkg/rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,50 @@ package rpc

import (
"context"
"sync"

"github.com/juju/ratelimit"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"d7y.io/dragonfly/v2/internal/dferrors"
)

var (
// otelUnaryInterceptor is the unary interceptor for tracing.
otelUnaryInterceptor grpc.UnaryClientInterceptor

// otelStreamInterceptor is the stream interceptor for tracing.
otelStreamInterceptor grpc.StreamClientInterceptor

// interceptorsInitialized is used to ensure that otel interceptors are initialized only once.
interceptorsInitialized = sync.Once{}
)

// OTEL interceptors must be created once to avoid memory leak,
// refer to https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226 and
// https://github.com/argoproj/argo-cd/pull/15174.
func ensureOTELInterceptorInitialized() {
interceptorsInitialized.Do(func() {
otelUnaryInterceptor = otelgrpc.UnaryClientInterceptor()
otelStreamInterceptor = otelgrpc.StreamClientInterceptor()
})
}

// OTELUnaryClientInterceptor returns a new unary client interceptor that traces gRPC requests.
func OTELUnaryClientInterceptor() grpc.UnaryClientInterceptor {
ensureOTELInterceptorInitialized()
return otelUnaryInterceptor
}

// OTELStreamClientInterceptor returns a new stream client interceptor that traces gRPC requests.
func OTELStreamClientInterceptor() grpc.StreamClientInterceptor {
ensureOTELInterceptorInitialized()
return otelStreamInterceptor
}

// Refresher is the interface for refreshing dynconfig.
type Refresher interface {
Refresh() error
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/manager/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -37,6 +36,7 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
)

Expand All @@ -47,7 +47,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -56,7 +56,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/manager/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -37,6 +36,7 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
)

Expand All @@ -47,7 +47,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -56,7 +56,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
9 changes: 4 additions & 5 deletions pkg/rpc/scheduler/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -55,7 +54,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -66,7 +65,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig),
Expand Down Expand Up @@ -95,7 +94,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -105,7 +104,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
9 changes: 4 additions & 5 deletions pkg/rpc/scheduler/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -53,7 +52,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -63,7 +62,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
rpc.RefresherUnaryClientInterceptor(dynconfig),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig),
Expand Down Expand Up @@ -91,7 +90,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -100,7 +99,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
Loading