Skip to content

Commit

Permalink
fix: calling otelgrpc.UnaryClientInterceptor repeatedly creates multi…
Browse files Browse the repository at this point in the history
…ple Int64Histogram resulting in a memory leak

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Oct 8, 2023
1 parent a3f2aae commit 6ee2289
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 39 deletions.
9 changes: 4 additions & 5 deletions pkg/rpc/cdnsystem/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"

Expand Down Expand Up @@ -60,7 +59,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -70,7 +69,7 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down Expand Up @@ -99,7 +98,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts ..
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -110,7 +109,7 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts ..
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig),
Expand Down
5 changes: 2 additions & 3 deletions pkg/rpc/dfdaemon/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -50,7 +49,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -60,7 +59,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
)

// GetV2 returns v2 version of the dfdaemon client.
Expand All @@ -41,7 +41,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -50,7 +50,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/health/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
)

const (
Expand All @@ -45,12 +45,12 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/inference/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"

inferencev1 "d7y.io/api/v2/pkg/apis/inference/v1"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc"
)

const (
Expand All @@ -53,7 +53,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -62,7 +62,7 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
35 changes: 35 additions & 0 deletions pkg/rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,50 @@ package rpc

import (
"context"
"sync"

"github.com/juju/ratelimit"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"d7y.io/dragonfly/v2/internal/dferrors"
)

var (
// otelUnaryInterceptor is the unary interceptor for tracing.
otelUnaryInterceptor grpc.UnaryClientInterceptor

// otelStreamInterceptor is the stream interceptor for tracing.
otelStreamInterceptor grpc.StreamClientInterceptor

// interceptorsInitialized is used to ensure that otel interceptors are initialized only once.
interceptorsInitialized = sync.Once{}
)

// OTEL interceptors must be created once to avoid memory leak,
// refer to https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226 and
// https://github.com/argoproj/argo-cd/pull/15174.
func ensureOTELInterceptorInitialized() {
interceptorsInitialized.Do(func() {
otelUnaryInterceptor = otelgrpc.UnaryClientInterceptor()
otelStreamInterceptor = otelgrpc.StreamClientInterceptor()
})
}

// OTELUnaryClientInterceptor returns a new unary client interceptor that traces gRPC requests.
func OTELUnaryClientInterceptor() grpc.UnaryClientInterceptor {
ensureOTELInterceptorInitialized()
return otelUnaryInterceptor
}

// OTELStreamClientInterceptor returns a new stream client interceptor that traces gRPC requests.
func OTELStreamClientInterceptor() grpc.StreamClientInterceptor {
ensureOTELInterceptorInitialized()
return otelStreamInterceptor
}

// Refresher is the interface for refreshing dynconfig.
type Refresher interface {
Refresh() error
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/manager/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -37,6 +36,7 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
)

Expand All @@ -47,7 +47,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -56,7 +56,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/manager/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -37,6 +36,7 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
healthclient "d7y.io/dragonfly/v2/pkg/rpc/health/client"
)

Expand All @@ -47,7 +47,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
target,
append([]grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -56,7 +56,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
9 changes: 4 additions & 5 deletions pkg/rpc/scheduler/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -55,7 +54,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -66,7 +65,7 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig),
Expand Down Expand Up @@ -95,7 +94,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -105,7 +104,7 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
9 changes: 4 additions & 5 deletions pkg/rpc/scheduler/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -53,7 +52,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -63,7 +62,7 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
rpc.RefresherUnaryClientInterceptor(dynconfig),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig),
Expand Down Expand Up @@ -91,7 +90,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(),
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
Expand All @@ -100,7 +99,7 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(),
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
Expand Down
Loading

0 comments on commit 6ee2289

Please sign in to comment.