Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7198,7 +7198,7 @@ layers:
derivative: NONE
- name: rpc.server.request.duration.nanos
exported_name: rpc_server_request_duration_nanos
description: Duration of an grpc request in nanoseconds.
description: Duration of an RPC request in nanoseconds.
y_axis_label: Duration
type: HISTOGRAM
unit: NANOSECONDS
Expand Down
6 changes: 6 additions & 0 deletions pkg/rpc/drpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR
// Recover from any uncaught panics caused by DB Console requests.
unaryInterceptors = append(unaryInterceptors, DRPCGatewayRequestRecoveryInterceptor)

// If the metrics interceptor is set, it should be registered second so
// that all other interceptors are included in the response time durations.
if o.drpcRequestMetricsInterceptor != nil {
unaryInterceptors = append(unaryInterceptors, drpcmux.UnaryServerInterceptor(o.drpcRequestMetricsInterceptor))
}

if !rpcCtx.ContextOptions.Insecure {
a := kvAuth{
sv: &rpcCtx.Settings.SV,
Expand Down
38 changes: 37 additions & 1 deletion pkg/rpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ over this connection.
}
metaRequestDuration = metric.Metadata{
Name: "rpc.server.request.duration.nanos",
Help: "Duration of an grpc request in nanoseconds.",
Help: "Duration of an RPC request in nanoseconds.",
Measurement: "Duration",
Unit: metric.Unit_NANOSECONDS,
MetricType: prometheusgo.MetricType_HISTOGRAM,
Expand Down Expand Up @@ -449,6 +449,7 @@ func NewRequestMetrics() *RequestMetrics {
}

type RequestMetricsInterceptor grpc.UnaryServerInterceptor
type DRPCRequestMetricsInterceptor drpcmux.UnaryServerInterceptor

// NewRequestMetricsInterceptor creates a new gRPC server interceptor that records
// the duration of each RPC. The metric is labeled by the method name and the
Expand Down Expand Up @@ -485,6 +486,41 @@ func NewRequestMetricsInterceptor(
}
}

// NewDRPCRequestMetricsInterceptor creates a new DRPC server interceptor that records
// the duration of each RPC. The metric is labeled by the method name and the
// status code of the RPC. The interceptor will only record durations if
// shouldRecord returns true. Otherwise, this interceptor will be a no-op.
func NewDRPCRequestMetricsInterceptor(
requestMetrics *RequestMetrics, shouldRecord func(rpc string) bool,
) DRPCRequestMetricsInterceptor {
return func(
ctx context.Context,
req any,
rpc string,
handler drpcmux.UnaryHandler,
) (any, error) {
if !shouldRecord(rpc) {
return handler(ctx, req)
}
startTime := timeutil.Now()
resp, err := handler(ctx, req)
duration := timeutil.Since(startTime)
var code codes.Code
if err != nil {
// TODO(server): use drpc status code
code = status.Code(err)
} else {
code = codes.OK
}

requestMetrics.Duration.Observe(map[string]string{
RpcMethodLabel: rpc,
RpcStatusCodeLabel: code.String(),
}, float64(duration.Nanoseconds()))
return resp, err
}
}

// MarkGatewayRequest returns a grpc metadata object that contains the
// gwRequestKey field. This is used by the gRPC gateway that forwards HTTP
// requests to their respective gRPC handlers. See gatewayRequestRecoveryInterceptor below.
Expand Down
4 changes: 2 additions & 2 deletions pkg/rpc/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestServerRequestInstrumentInterceptor(t *testing.T) {
if tc.shouldRecord {
expectedCount = 1
}
assertGrpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{
assertRpcMetrics(t, requestMetrics.Duration.ToPrometheusMetrics(), map[string]uint64{
fmt.Sprintf("%s %s", tc.methodName, tc.statusCode): expectedCount,
})
})
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestGatewayRequestRecoveryInterceptor(t *testing.T) {
})
}

func assertGrpcMetrics(
func assertRpcMetrics(
t *testing.T, metrics []*io_prometheus_client.Metric, expected map[string]uint64,
) {
t.Helper()
Expand Down
12 changes: 10 additions & 2 deletions pkg/rpc/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ var sourceAddr = func() net.Addr {
}()

type serverOpts struct {
interceptor func(fullMethod string) error
metricsInterceptor RequestMetricsInterceptor
interceptor func(fullMethod string) error
metricsInterceptor RequestMetricsInterceptor
drpcRequestMetricsInterceptor DRPCRequestMetricsInterceptor
}

// ServerOption is a configuration option passed to NewServer.
Expand Down Expand Up @@ -153,3 +154,10 @@ func WithMetricsServerInterceptor(interceptor RequestMetricsInterceptor) ServerO
opts.metricsInterceptor = interceptor
}
}

// WithDRPCMetricsServerInterceptor adds a DRPCRequestMetricsInterceptor to the DRPC server.
func WithDRPCMetricsServerInterceptor(interceptor DRPCRequestMetricsInterceptor) ServerOption {
return func(opts *serverOpts) {
opts.drpcRequestMetricsInterceptor = interceptor
}
}
10 changes: 8 additions & 2 deletions pkg/server/drpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type drpcServer struct {

// newDRPCServer creates and configures a new drpcServer instance. It enables
// DRPC if the experimental setting is on, otherwise returns a dummy server.
func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error) {
func newDRPCServer(
ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics,
) (*drpcServer, error) {
drpcServer := &drpcServer{}
drpcServer.setMode(modeInitializing)

Expand All @@ -41,7 +43,11 @@ func newDRPCServer(ctx context.Context, rpcCtx *rpc.Context) (*drpcServer, error
func(path string) error {
return drpcServer.intercept(path)
}),
)
rpc.WithDRPCMetricsServerInterceptor(
rpc.NewDRPCRequestMetricsInterceptor(requestMetrics, func(method string) bool {
return shouldRecordRequestDuration(rpcCtx.Settings, method)
}),
))
if err != nil {
return nil, err
}
Expand Down
37 changes: 1 addition & 36 deletions pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@ package server

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -30,12 +26,10 @@ type grpcServer struct {
}

func newGRPCServer(
ctx context.Context, rpcCtx *rpc.Context, metricsRegistry *metric.Registry,
ctx context.Context, rpcCtx *rpc.Context, requestMetrics *rpc.RequestMetrics,
) (*grpcServer, error) {
s := &grpcServer{}
s.mode.set(modeInitializing)
requestMetrics := rpc.NewRequestMetrics()
metricsRegistry.AddMetricStruct(requestMetrics)
srv, interceptorInfo, err := rpc.NewServerEx(
ctx, rpcCtx, rpc.WithInterceptor(func(path string) error {
return s.intercept(path)
Expand Down Expand Up @@ -67,32 +61,3 @@ func (s *grpcServer) health(ctx context.Context) error {
return srverrors.ServerError(ctx, errors.Newf("unknown mode: %v", sm))
}
}

// NewWaitingForInitError creates an error indicating that the server cannot run
// the specified method until the node has been initialized.
func NewWaitingForInitError(methodName string) error {
// NB: this error string is sadly matched in grpcutil.IsWaitingForInit().
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
}

const (
serverPrefix = "/cockroach.server"
tsdbPrefix = "/cockroach.ts"
)

// serverGRPCRequestMetricsEnabled is a cluster setting that enables the
// collection of gRPC request duration metrics. This uses export only
// metrics so the metrics are only exported to external sources such as
// /_status/vars and DataDog.
var serverGRPCRequestMetricsEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"server.grpc.request_metrics.enabled",
"enables the collection of grpc metrics",
false,
)

func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool {
return serverGRPCRequestMetricsEnabled.Get(&settings.SV) &&
(strings.HasPrefix(method, serverPrefix) ||
strings.HasPrefix(method, tsdbPrefix))
}
4 changes: 2 additions & 2 deletions pkg/server/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestRequestMetricRegistered(t *testing.T) {

_, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{})
require.Len(t, histogramVec.ToPrometheusMetrics(), 0, "Should not have recorded any metrics yet")
serverGRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true)
serverRPCRequestMetricsEnabled.Override(context.Background(), &ts.ClusterSettings().SV, true)
_, _ = ts.GetAdminClient(t).Settings(ctx, &serverpb.SettingsRequest{})
require.Len(t, histogramVec.ToPrometheusMetrics(), 1, "Should have recorded metrics for request")
}
Expand All @@ -67,7 +67,7 @@ func TestShouldRecordRequestDuration(t *testing.T) {
settings := cluster.MakeTestingClusterSettings()
for _, tt := range tests {
t.Run(tt.methodName, func(t *testing.T) {
serverGRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled)
serverRPCRequestMetricsEnabled.Override(context.Background(), &settings.SV, tt.metricsEnabled)
require.Equal(t, tt.expected, shouldRecordRequestDuration(settings, tt.methodName))
})
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/server/serve_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@

package server

import "sync/atomic"
import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)

type serveModeHandler struct {
mode serveMode
Expand Down Expand Up @@ -61,3 +69,33 @@ func (s *serveMode) set(mode serveMode) {
func (s *serveMode) get() serveMode {
return serveMode(atomic.LoadInt32((*int32)(s)))
}

const (
serverPrefix = "/cockroach.server"
tsdbPrefix = "/cockroach.ts"
)

// serverRPCRequestMetricsEnabled is a cluster setting that enables the
// collection of gRPC and DRPC request duration metrics. This uses export only
// metrics so the metrics are only exported to external sources such as
// /_status/vars and DataDog.
var serverRPCRequestMetricsEnabled = settings.RegisterBoolSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server.rpc.request_metrics.enabled is not a public setting. This change looks good to me. There is some discussion around cluster setting deprecation here. Can you followup, add the guidance for deprecating and renaming cluster settings and publish it please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will update this doc accordingly.

settings.ApplicationLevel,
"server.rpc.request_metrics.enabled",
"enables the collection of rpc metrics",
false, /* defaultValue */
settings.WithRetiredName("server.grpc.request_metrics.enabled"),
)

func shouldRecordRequestDuration(settings *cluster.Settings, method string) bool {
return serverRPCRequestMetricsEnabled.Get(&settings.SV) &&
(strings.HasPrefix(method, serverPrefix) ||
strings.HasPrefix(method, tsdbPrefix))
}

// NewWaitingForInitError creates an error indicating that the server cannot run
// the specified method until the node has been initialized.
func NewWaitingForInitError(methodName string) error {
// NB: this error string is sadly matched in grpcutil.IsWaitingForInit().
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
}
7 changes: 5 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,15 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
// and after ValidateAddrs().
rpcContext.CheckCertificateAddrs(ctx)

grpcServer, err := newGRPCServer(ctx, rpcContext, appRegistry)
requestMetrics := rpc.NewRequestMetrics()
appRegistry.AddMetricStruct(requestMetrics)

grpcServer, err := newGRPCServer(ctx, rpcContext, requestMetrics)
if err != nil {
return nil, err
}

drpcServer, err := newDRPCServer(ctx, rpcContext)
drpcServer, err := newDRPCServer(ctx, rpcContext, requestMetrics)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1329,11 +1329,14 @@ func makeTenantSQLServerArgs(
externalStorage := esb.makeExternalStorage
externalStorageFromURI := esb.makeExternalStorageFromURI

grpcServer, err := newGRPCServer(startupCtx, rpcContext, registry)
requestMetrics := rpc.NewRequestMetrics()
registry.AddMetricStruct(requestMetrics)

grpcServer, err := newGRPCServer(startupCtx, rpcContext, requestMetrics)
if err != nil {
return sqlServerArgs{}, err
}
drpcServer, err := newDRPCServer(startupCtx, rpcContext)
drpcServer, err := newDRPCServer(startupCtx, rpcContext, requestMetrics)
if err != nil {
return sqlServerArgs{}, err
}
Expand Down