diff --git a/docs/generated/metrics/metrics.yaml b/docs/generated/metrics/metrics.yaml index 832f9c6392f4..0e5c8bb54966 100644 --- a/docs/generated/metrics/metrics.yaml +++ b/docs/generated/metrics/metrics.yaml @@ -7198,7 +7198,7 @@ layers: derivative: NONE - name: rpc.server.request.duration.nanos exported_name: rpc_server_request_duration_nanos - description: Duration of an grpc request in nanoseconds. + description: Duration of an RPC request in nanoseconds. y_axis_label: Duration type: HISTOGRAM unit: NANOSECONDS diff --git a/pkg/rpc/drpc.go b/pkg/rpc/drpc.go index 48e15ca622cf..f9181e8f65cd 100644 --- a/pkg/rpc/drpc.go +++ b/pkg/rpc/drpc.go @@ -222,6 +222,12 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR // Recover from any uncaught panics caused by DB Console requests. unaryInterceptors = append(unaryInterceptors, DRPCGatewayRequestRecoveryInterceptor) + // If the metrics interceptor is set, it should be registered second so + // that all other interceptors are included in the response time durations. + if o.drpcRequestMetricsInterceptor != nil { + unaryInterceptors = append(unaryInterceptors, drpcmux.UnaryServerInterceptor(o.drpcRequestMetricsInterceptor)) + } + if !rpcCtx.ContextOptions.Insecure { a := kvAuth{ sv: &rpcCtx.Settings.SV, diff --git a/pkg/rpc/metrics.go b/pkg/rpc/metrics.go index 39ed4256ef90..52ae53bb7bb0 100644 --- a/pkg/rpc/metrics.go +++ b/pkg/rpc/metrics.go @@ -207,7 +207,7 @@ over this connection. } metaRequestDuration = metric.Metadata{ Name: "rpc.server.request.duration.nanos", - Help: "Duration of an grpc request in nanoseconds.", + Help: "Duration of an RPC request in nanoseconds.", Measurement: "Duration", Unit: metric.Unit_NANOSECONDS, MetricType: prometheusgo.MetricType_HISTOGRAM, @@ -449,6 +449,7 @@ func NewRequestMetrics() *RequestMetrics { } type RequestMetricsInterceptor grpc.UnaryServerInterceptor +type DRPCRequestMetricsInterceptor drpcmux.UnaryServerInterceptor // NewRequestMetricsInterceptor creates a new gRPC server interceptor that records // the duration of each RPC. The metric is labeled by the method name and the @@ -485,6 +486,41 @@ func NewRequestMetricsInterceptor( } } +// NewDRPCRequestMetricsInterceptor creates a new DRPC server interceptor that records +// the duration of each RPC. The metric is labeled by the method name and the +// status code of the RPC. The interceptor will only record durations if +// shouldRecord returns true. Otherwise, this interceptor will be a no-op. +func NewDRPCRequestMetricsInterceptor( + requestMetrics *RequestMetrics, shouldRecord func(rpc string) bool, +) DRPCRequestMetricsInterceptor { + return func( + ctx context.Context, + req any, + rpc string, + handler drpcmux.UnaryHandler, + ) (any, error) { + if !shouldRecord(rpc) { + return handler(ctx, req) + } + startTime := timeutil.Now() + resp, err := handler(ctx, req) + duration := timeutil.Since(startTime) + var code codes.Code + if err != nil { + // TODO(server): use drpc status code + code = status.Code(err) + } else { + code = codes.OK + } + + requestMetrics.Duration.Observe(map[string]string{ + RpcMethodLabel: rpc, + RpcStatusCodeLabel: code.String(), + }, float64(duration.Nanoseconds())) + return resp, err + } +} + // MarkGatewayRequest returns a grpc metadata object that contains the // gwRequestKey field. This is used by the gRPC gateway that forwards HTTP // requests to their respective gRPC handlers. See gatewayRequestRecoveryInterceptor below. diff --git a/pkg/rpc/metrics_test.go b/pkg/rpc/metrics_test.go index 5383f6fd0595..2c201f031f2a 100644 --- a/pkg/rpc/metrics_test.go +++ b/pkg/rpc/metrics_test.go @@ -134,7 +134,7 @@ func TestServerRequestInstrumentInterceptor(t *testing.T) { if tc.shouldRecord { expectedCount = 1 } - assertGrpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{ + assertRpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{ fmt.Sprintf("%s %s", tc.methodName, tc.statusCode): expectedCount, }) }) @@ -209,7 +209,7 @@ func TestGatewayRequestRecoveryInterceptor(t *testing.T) { }) } -func assertGrpcMetrics( +func assertRpcMetrics( t *testing.T, metrics []*io_prometheus_client.Metric, expected map[string]uint64, ) { t.Helper() diff --git a/pkg/rpc/settings.go b/pkg/rpc/settings.go index 0ddd1c2900f6..3370f71b29f0 100644 --- a/pkg/rpc/settings.go +++ b/pkg/rpc/settings.go @@ -122,8 +122,9 @@ var sourceAddr = func() net.Addr { }() type serverOpts struct { - interceptor func(fullMethod string) error - metricsInterceptor RequestMetricsInterceptor + interceptor func(fullMethod string) error + metricsInterceptor RequestMetricsInterceptor + drpcRequestMetricsInterceptor DRPCRequestMetricsInterceptor } // ServerOption is a configuration option passed to NewServer. @@ -153,3 +154,10 @@ func WithMetricsServerInterceptor(interceptor RequestMetricsInterceptor) ServerO opts.metricsInterceptor = interceptor } } + +// WithDRPCMetricsServerInterceptor adds a DRPCRequestMetricsInterceptor to the DRPC server. +func WithDRPCMetricsServerInterceptor(interceptor DRPCRequestMetricsInterceptor) ServerOption { + return func(opts *serverOpts) { + opts.drpcRequestMetricsInterceptor = interceptor + } +} diff --git a/pkg/server/drpc_server.go b/pkg/server/drpc_server.go index c3e812cc9989..d0680f6ad08a 100644 --- a/pkg/server/drpc_server.go +++ b/pkg/server/drpc_server.go @@ -30,7 +30,9 @@ type drpcServer struct { // newDRPCServer creates and configures a new drpcServer instance. It enables // DRPC if the experimental setting is on, otherwise returns a dummy server. -func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) { +func newDRPCServer( + ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics, +) (*drpcServer, error) { drpcServer := &drpcServer{} drpcServer.setMode(modeInitializing) @@ -41,7 +43,11 @@ func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error func(path string) error { return drpcServer.intercept(path) }), - ) + rpc.WithDRPCMetricsServerInterceptor( + rpc.NewDRPCRequestMetricsInterceptor(requestMetrics, func(method string) bool { + return shouldRecordRequestDuration(rpcCtx.Settings, method) + }), + )) if err != nil { return nil, err } diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index b182bfa47be6..7d3916b4e722 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -7,13 +7,9 @@ package server import ( "context" - "strings" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/srverrors" - "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -30,12 +26,10 @@ type grpcServer struct { } func newGRPCServer( - ctx context.Context, rpcCtx *rpc.Context, metricsRegistry *metric.Registry, + ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics, ) (*grpcServer, error) { s := &grpcServer{} s.mode.set(modeInitializing) - requestMetrics := rpc.NewRequestMetrics() - metricsRegistry.AddMetricStruct(requestMetrics) srv, interceptorInfo, err := rpc.NewServerEx( ctx, rpcCtx, rpc.WithInterceptor(func(path string) error { return s.intercept(path) @@ -67,32 +61,3 @@ func (s *grpcServer) health(ctx context.Context) error { return srverrors.ServerError(ctx, errors.Newf("unknown mode: %v", sm)) } } - -// NewWaitingForInitError creates an error indicating that the server cannot run -// the specified method until the node has been initialized. -func NewWaitingForInitError(methodName string) error { - // NB: this error string is sadly matched in grpcutil.IsWaitingForInit(). - return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName) -} - -const ( - serverPrefix = "/cockroach.server" - tsdbPrefix = "/cockroach.ts" -) - -// serverGRPCRequestMetricsEnabled is a cluster setting that enables the -// collection of gRPC request duration metrics. This uses export only -// metrics so the metrics are only exported to external sources such as -// /_status/vars and DataDog. -var serverGRPCRequestMetricsEnabled = settings.RegisterBoolSetting( - settings.ApplicationLevel, - "server.grpc.request_metrics.enabled", - "enables the collection of grpc metrics", - false, -) - -func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool { - return serverGRPCRequestMetricsEnabled.Get(&settings.SV) && - (strings.HasPrefix(method, serverPrefix) || - strings.HasPrefix(method, tsdbPrefix)) -} diff --git a/pkg/server/grpc_server_test.go b/pkg/server/grpc_server_test.go index cf171de02daf..9f82e764b0cd 100644 --- a/pkg/server/grpc_server_test.go +++ b/pkg/server/grpc_server_test.go @@ -43,7 +43,7 @@ func TestRequestMetricRegistered(t *testing.T) { _, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{}) require.Len(t, histogramVec.ToPrometheusMetrics(), 0, "Should not have recorded any metrics yet") - serverGRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true) + serverRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true) _, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{}) require.Len(t, histogramVec.ToPrometheusMetrics(), 1, "Should have recorded metrics for request") } @@ -67,7 +67,7 @@ func TestShouldRecordRequestDuration(t *testing.T) { settings := cluster.MakeTestingClusterSettings() for _, tt := range tests { t.Run(tt.methodName, func(t *testing.T) { - serverGRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled) + serverRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled) require.Equal(t, tt.expected, shouldRecordRequestDuration(settings, tt.methodName)) }) } diff --git a/pkg/server/serve_mode.go b/pkg/server/serve_mode.go index 61fbf1d8667b..ae0141ff6408 100644 --- a/pkg/server/serve_mode.go +++ b/pkg/server/serve_mode.go @@ -5,7 +5,15 @@ package server -import "sync/atomic" +import ( + "strings" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) type serveModeHandler struct { mode serveMode @@ -61,3 +69,33 @@ func (s *serveMode) set(mode serveMode) { func (s *serveMode) get() serveMode { return serveMode(atomic.LoadInt32((*int32)(s))) } + +const ( + serverPrefix = "/cockroach.server" + tsdbPrefix = "/cockroach.ts" +) + +// serverRPCRequestMetricsEnabled is a cluster setting that enables the +// collection of gRPC and DRPC request duration metrics. This uses export only +// metrics so the metrics are only exported to external sources such as +// /_status/vars and DataDog. +var serverRPCRequestMetricsEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "server.rpc.request_metrics.enabled", + "enables the collection of rpc metrics", + false, /* defaultValue */ + settings.WithRetiredName("server.grpc.request_metrics.enabled"), +) + +func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool { + return serverRPCRequestMetricsEnabled.Get(&settings.SV) && + (strings.HasPrefix(method, serverPrefix) || + strings.HasPrefix(method, tsdbPrefix)) +} + +// NewWaitingForInitError creates an error indicating that the server cannot run +// the specified method until the node has been initialized. +func NewWaitingForInitError(methodName string) error { + // NB: this error string is sadly matched in grpcutil.IsWaitingForInit(). + return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 5a5137ca0f4c..99e7986f492b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -406,12 +406,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf // and after ValidateAddrs(). rpcContext.CheckCertificateAddrs(ctx) - grpcServer, err := newGRPCServer(ctx, rpcContext, appRegistry) + requestMetrics := rpc.NewRequestMetrics() + appRegistry.AddMetricStruct(requestMetrics) + + grpcServer, err := newGRPCServer(ctx, rpcContext, requestMetrics) if err != nil { return nil, err } - drpcServer, err := newDRPCServer(ctx, rpcContext) + drpcServer, err := newDRPCServer(ctx, rpcContext, requestMetrics) if err != nil { return nil, err } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index cdfb15e9d715..86596d97c36c 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -1329,11 +1329,14 @@ func makeTenantSQLServerArgs( externalStorage := esb.makeExternalStorage externalStorageFromURI := esb.makeExternalStorageFromURI - grpcServer, err := newGRPCServer(startupCtx, rpcContext, registry) + requestMetrics := rpc.NewRequestMetrics() + registry.AddMetricStruct(requestMetrics) + + grpcServer, err := newGRPCServer(startupCtx, rpcContext, requestMetrics) if err != nil { return sqlServerArgs{}, err } - drpcServer, err := newDRPCServer(startupCtx, rpcContext) + drpcServer, err := newDRPCServer(startupCtx, rpcContext, requestMetrics) if err != nil { return sqlServerArgs{}, err }