From c561d2199cdb53f2cdbd7840d73a798eb1e51cf3 Mon Sep 17 00:00:00 2001 From: Nukitt Date: Wed, 17 Sep 2025 14:46:10 +0530 Subject: [PATCH] rpc, server: add DRPC metrics server interceptor Previously, there was no support for metrics interceptor in DRPC, and capturing drpc metrics wouldn't be possible. In this PR, we add metrics server interceptor support in DRPC and reuse the `rpc_server_request_duration_nanos` metric for both gRPC and DRPC since the metric reflects server-side duration agnostic of transport and interceptor chains are not expected to differ materially in practice. When differentiation is needed, it can be correlated with other metadata. This way we can observe both gRPC and DRPC latency through interceptors. Since this metric is behind a cluster setting, we also had to change the key of the metric to `server.rpc.request_metrics.enabled` so that it can be related to both the rpc framework and set an alias as its retired name `server.grpc.request_metrics.enabled`. Epic: CRDB-49359 Fixes: #144373 Release note: None --- docs/generated/metrics/metrics.yaml | 2 +- pkg/rpc/drpc.go | 6 +++++ pkg/rpc/metrics.go | 38 ++++++++++++++++++++++++++- pkg/rpc/metrics_test.go | 4 +-- pkg/rpc/settings.go | 12 +++++++-- pkg/server/drpc_server.go | 10 ++++++-- pkg/server/grpc_server.go | 37 +------------------------- pkg/server/grpc_server_test.go | 4 +-- pkg/server/serve_mode.go | 40 ++++++++++++++++++++++++++++- pkg/server/server.go | 7 +++-- pkg/server/tenant.go | 7 +++-- 11 files changed, 116 insertions(+), 51 deletions(-) diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index 832f9c6392f4..0e5c8bb54966 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -7198,7 +7198,7 @@ layers: derivative: NONE - name: rpc.server.request.duration.nanos exported_name: rpc_server_request_duration_nanos - description: Duration of an grpc request in nanoseconds. + description: Duration of an RPC request in nanoseconds. y_axis_label: Duration type: HISTOGRAM unit: NANOSECONDS diff --git a/pkg/rpc/drpc.go b/pkg/rpc/drpc.go index 48e15ca622cf..f9181e8f65cd 100644 --- a/pkg/rpc/drpc.go +++ b/pkg/rpc/drpc.go @@ -222,6 +222,12 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR // Recover from any uncaught panics caused by DB Console requests. unaryInterceptors = append(unaryInterceptors, DRPCGatewayRequestRecoveryInterceptor) + // If the metrics interceptor is set, it should be registered second so + // that all other interceptors are included in the response time durations. + if o.drpcRequestMetricsInterceptor != nil { + unaryInterceptors = append(unaryInterceptors, drpcmux.UnaryServerInterceptor(o.drpcRequestMetricsInterceptor)) + } + if !rpcCtx.ContextOptions.Insecure { a := kvAuth{ sv: &rpcCtx.Settings.SV, diff --git a/pkg/rpc/metrics.go b/pkg/rpc/metrics.go index 39ed4256ef90..52ae53bb7bb0 100644 --- a/pkg/rpc/metrics.go +++ b/pkg/rpc/metrics.go @@ -207,7 +207,7 @@ over this connection. } metaRequestDuration = metric.Metadata{ Name: "rpc.server.request.duration.nanos", - Help: "Duration of an grpc request in nanoseconds.", + Help: "Duration of an RPC request in nanoseconds.", Measurement: "Duration", Unit: metric.Unit_NANOSECONDS, MetricType: prometheusgo.MetricType_HISTOGRAM, @@ -449,6 +449,7 @@ func NewRequestMetrics() *RequestMetrics { } type RequestMetricsInterceptor grpc.UnaryServerInterceptor +type DRPCRequestMetricsInterceptor drpcmux.UnaryServerInterceptor // NewRequestMetricsInterceptor creates a new gRPC server interceptor that records // the duration of each RPC. The metric is labeled by the method name and the @@ -485,6 +486,41 @@ func NewRequestMetricsInterceptor( } } +// NewDRPCRequestMetricsInterceptor creates a new DRPC server interceptor that records +// the duration of each RPC. The metric is labeled by the method name and the +// status code of the RPC. The interceptor will only record durations if +// shouldRecord returns true. Otherwise, this interceptor will be a no-op. +func NewDRPCRequestMetricsInterceptor( + requestMetrics *RequestMetrics, shouldRecord func(rpc string) bool, +) DRPCRequestMetricsInterceptor { + return func( + ctx context.Context, + req any, + rpc string, + handler drpcmux.UnaryHandler, + ) (any, error) { + if !shouldRecord(rpc) { + return handler(ctx, req) + } + startTime := timeutil.Now() + resp, err := handler(ctx, req) + duration := timeutil.Since(startTime) + var code codes.Code + if err != nil { + // TODO(server): use drpc status code + code = status.Code(err) + } else { + code = codes.OK + } + + requestMetrics.Duration.Observe(map[string]string{ + RpcMethodLabel: rpc, + RpcStatusCodeLabel: code.String(), + }, float64(duration.Nanoseconds())) + return resp, err + } +} + // MarkGatewayRequest returns a grpc metadata object that contains the // gwRequestKey field. This is used by the gRPC gateway that forwards HTTP // requests to their respective gRPC handlers. See gatewayRequestRecoveryInterceptor below. diff --git a/pkg/rpc/metrics_test.go b/pkg/rpc/metrics_test.go index 5383f6fd0595..2c201f031f2a 100644 --- a/pkg/rpc/metrics_test.go +++ b/pkg/rpc/metrics_test.go @@ -134,7 +134,7 @@ func TestServerRequestInstrumentInterceptor(t *testing.T) { if tc.shouldRecord { expectedCount = 1 } - assertGrpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{ + assertRpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{ fmt.Sprintf("%s %s", tc.methodName, tc.statusCode): expectedCount, }) }) @@ -209,7 +209,7 @@ func TestGatewayRequestRecoveryInterceptor(t *testing.T) { }) } -func assertGrpcMetrics( +func assertRpcMetrics( t *testing.T, metrics []*io_prometheus_client.Metric, expected map[string]uint64, ) { t.Helper() diff --git a/pkg/rpc/settings.go b/pkg/rpc/settings.go index 0ddd1c2900f6..3370f71b29f0 100644 --- a/pkg/rpc/settings.go +++ b/pkg/rpc/settings.go @@ -122,8 +122,9 @@ var sourceAddr = func() net.Addr { }() type serverOpts struct { - interceptor func(fullMethod string) error - metricsInterceptor RequestMetricsInterceptor + interceptor func(fullMethod string) error + metricsInterceptor RequestMetricsInterceptor + drpcRequestMetricsInterceptor DRPCRequestMetricsInterceptor } // ServerOption is a configuration option passed to NewServer. @@ -153,3 +154,10 @@ func WithMetricsServerInterceptor(interceptor RequestMetricsInterceptor) ServerO opts.metricsInterceptor = interceptor } } + +// WithDRPCMetricsServerInterceptor adds a DRPCRequestMetricsInterceptor to the DRPC server. +func WithDRPCMetricsServerInterceptor(interceptor DRPCRequestMetricsInterceptor) ServerOption { + return func(opts *serverOpts) { + opts.drpcRequestMetricsInterceptor = interceptor + } +} diff --git a/pkg/server/drpc_server.go b/pkg/server/drpc_server.go index c3e812cc9989..d0680f6ad08a 100644 --- a/pkg/server/drpc_server.go +++ b/pkg/server/drpc_server.go @@ -30,7 +30,9 @@ type drpcServer struct { // newDRPCServer creates and configures a new drpcServer instance. It enables // DRPC if the experimental setting is on, otherwise returns a dummy server. -func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) { +func newDRPCServer( + ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics, +) (*drpcServer, error) { drpcServer := &drpcServer{} drpcServer.setMode(modeInitializing) @@ -41,7 +43,11 @@ func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error func(path string) error { return drpcServer.intercept(path) }), - ) + rpc.WithDRPCMetricsServerInterceptor( + rpc.NewDRPCRequestMetricsInterceptor(requestMetrics, func(method string) bool { + return shouldRecordRequestDuration(rpcCtx.Settings, method) + }), + )) if err != nil { return nil, err } diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index b182bfa47be6..7d3916b4e722 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -7,13 +7,9 @@ package server import ( "context" - "strings" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/srverrors" - "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -30,12 +26,10 @@ type grpcServer struct { } func newGRPCServer( - ctx context.Context, rpcCtx *rpc.Context, metricsRegistry *metric.Registry, + ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics, ) (*grpcServer, error) { s := &grpcServer{} s.mode.set(modeInitializing) - requestMetrics := rpc.NewRequestMetrics() - metricsRegistry.AddMetricStruct(requestMetrics) srv, interceptorInfo, err := rpc.NewServerEx( ctx, rpcCtx, rpc.WithInterceptor(func(path string) error { return s.intercept(path) @@ -67,32 +61,3 @@ func (s *grpcServer) health(ctx context.Context) error { return srverrors.ServerError(ctx, errors.Newf("unknown mode: %v", sm)) } } - -// NewWaitingForInitError creates an error indicating that the server cannot run -// the specified method until the node has been initialized. -func NewWaitingForInitError(methodName string) error { - // NB: this error string is sadly matched in grpcutil.IsWaitingForInit(). - return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName) -} - -const ( - serverPrefix = "/cockroach.server" - tsdbPrefix = "/cockroach.ts" -) - -// serverGRPCRequestMetricsEnabled is a cluster setting that enables the -// collection of gRPC request duration metrics. This uses export only -// metrics so the metrics are only exported to external sources such as -// /_status/vars and DataDog. -var serverGRPCRequestMetricsEnabled = settings.RegisterBoolSetting( - settings.ApplicationLevel, - "server.grpc.request_metrics.enabled", - "enables the collection of grpc metrics", - false, -) - -func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool { - return serverGRPCRequestMetricsEnabled.Get(&settings.SV) && - (strings.HasPrefix(method, serverPrefix) || - strings.HasPrefix(method, tsdbPrefix)) -} diff --git a/pkg/server/grpc_server_test.go b/pkg/server/grpc_server_test.go index cf171de02daf..9f82e764b0cd 100644 --- a/pkg/server/grpc_server_test.go +++ b/pkg/server/grpc_server_test.go @@ -43,7 +43,7 @@ func TestRequestMetricRegistered(t *testing.T) { _, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{}) require.Len(t, histogramVec.ToPrometheusMetrics(), 0, "Should not have recorded any metrics yet") - serverGRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true) + serverRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true) _, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{}) require.Len(t, histogramVec.ToPrometheusMetrics(), 1, "Should have recorded metrics for request") } @@ -67,7 +67,7 @@ func TestShouldRecordRequestDuration(t *testing.T) { settings := cluster.MakeTestingClusterSettings() for _, tt := range tests { t.Run(tt.methodName, func(t *testing.T) { - serverGRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled) + serverRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled) require.Equal(t, tt.expected, shouldRecordRequestDuration(settings, tt.methodName)) }) } diff --git a/pkg/server/serve_mode.go b/pkg/server/serve_mode.go index 61fbf1d8667b..ae0141ff6408 100644 --- a/pkg/server/serve_mode.go +++ b/pkg/server/serve_mode.go @@ -5,7 +5,15 @@ package server -import "sync/atomic" +import ( + "strings" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) type serveModeHandler struct { mode serveMode @@ -61,3 +69,33 @@ func (s *serveMode) set(mode serveMode) { func (s *serveMode) get() serveMode { return serveMode(atomic.LoadInt32((*int32)(s))) } + +const ( + serverPrefix = "/cockroach.server" + tsdbPrefix = "/cockroach.ts" +) + +// serverRPCRequestMetricsEnabled is a cluster setting that enables the +// collection of gRPC and DRPC request duration metrics. This uses export only +// metrics so the metrics are only exported to external sources such as +// /_status/vars and DataDog. +var serverRPCRequestMetricsEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "server.rpc.request_metrics.enabled", + "enables the collection of rpc metrics", + false, /* defaultValue */ + settings.WithRetiredName("server.grpc.request_metrics.enabled"), +) + +func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool { + return serverRPCRequestMetricsEnabled.Get(&settings.SV) && + (strings.HasPrefix(method, serverPrefix) || + strings.HasPrefix(method, tsdbPrefix)) +} + +// NewWaitingForInitError creates an error indicating that the server cannot run +// the specified method until the node has been initialized. +func NewWaitingForInitError(methodName string) error { + // NB: this error string is sadly matched in grpcutil.IsWaitingForInit(). + return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 5a5137ca0f4c..99e7986f492b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -406,12 +406,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf // and after ValidateAddrs(). rpcContext.CheckCertificateAddrs(ctx) - grpcServer, err := newGRPCServer(ctx, rpcContext, appRegistry) + requestMetrics := rpc.NewRequestMetrics() + appRegistry.AddMetricStruct(requestMetrics) + + grpcServer, err := newGRPCServer(ctx, rpcContext, requestMetrics) if err != nil { return nil, err } - drpcServer, err := newDRPCServer(ctx, rpcContext) + drpcServer, err := newDRPCServer(ctx, rpcContext, requestMetrics) if err != nil { return nil, err } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index cdfb15e9d715..86596d97c36c 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -1329,11 +1329,14 @@ func makeTenantSQLServerArgs( externalStorage := esb.makeExternalStorage externalStorageFromURI := esb.makeExternalStorageFromURI - grpcServer, err := newGRPCServer(startupCtx, rpcContext, registry) + requestMetrics := rpc.NewRequestMetrics() + registry.AddMetricStruct(requestMetrics) + + grpcServer, err := newGRPCServer(startupCtx, rpcContext, requestMetrics) if err != nil { return sqlServerArgs{}, err } - drpcServer, err := newDRPCServer(startupCtx, rpcContext) + drpcServer, err := newDRPCServer(startupCtx, rpcContext, requestMetrics) if err != nil { return sqlServerArgs{}, err }