Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
* [CHANGE] Experimental Delete Series: `/api/v1/admin/tsdb/delete_series` and `/api/v1/admin/tsdb/cancel_delete_request` purger APIs to return status code `204` instead of `200` for success. #2946
* [ENHANCEMENT] Add support for azure storage in China, German and US Government environments. #2988
* [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994
* [ENHANCEMENT] Added metrics for tracking some gRPC stats (most importantly message size histograms): #3036
* `cortex_grpc_connected_clients`
* `cortex_grpc_inflight_requests`
* `cortex_grpc_method_errors_total`
* `cortex_grpc_received_payload_size_bytes`
* `cortex_grpc_sent_payload_size_bytes`
* [ENHANCEMENT] Query-tee: add support for doing a passthrough of requests to preferred backend for unregistered routes #3018
* [ENHANCEMENT] Expose `storage.aws.dynamodb.backoff_config` configuration file field. #3026
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990
Expand Down
4 changes: 4 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/alertmanager"
"github.com/cortexproject/cortex/pkg/api"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpc/stats"
"github.com/cortexproject/cortex/pkg/util/modules"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -86,6 +88,8 @@ func (t *Cortex) initAPI() (services.Service, error) {
func (t *Cortex) initServer() (services.Service, error) {
// Cortex handles signals on its own.
DisableSignalHandling(&t.Cfg.Server)

t.Cfg.Server.GRPCOptions = append(t.Cfg.Server.GRPCOptions, grpc.StatsHandler(stats.NewStatsHandler(prometheus.DefaultRegisterer)))
serv, err := server.New(t.Cfg.Server)
if err != nil {
return nil, err
Expand Down
109 changes: 109 additions & 0 deletions pkg/util/grpc/stats/grpc_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package stats

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/stats"
)

func NewStatsHandler(r prometheus.Registerer) stats.Handler {
const MiB = 1024 * 1024
messageSizeBuckets := []float64{1 * MiB, 2.5 * MiB, 5 * MiB, 10 * MiB, 25 * MiB, 50 * MiB, 100 * MiB, 250 * MiB}

return &grpcStatsHandler{
connectedClients: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "cortex_grpc_connected_clients",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion(non-blocking): This struct has a large potential for reuse in other projects. Would you mind making the cortex prefix a configurable namespace?

Copy link
Contributor Author

@pstibrany pstibrany Aug 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion. I plan to drop this PR if/when weaveworks/common#196 gets merged. If weaveworks/common#196 doesn't get accepted, I will reconsider this feedback.

Help: "Number of clients connected to gRPC server.",
}),

inflightRPC: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_grpc_inflight_requests",
Help: "Number of inflight gRPC calls.",
}, []string{"method"}),

methodErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_grpc_method_errors_total",
Help: "Number of errors returned by method.",
}, []string{"method"}),

receivedPayloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_grpc_received_payload_size_bytes",
Help: "Size of received gRPC messages as seen on the wire (eg. compressed, signed, encrypted).",
Buckets: messageSizeBuckets,
}, []string{"method"}),

sentPayloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_grpc_sent_payload_size_bytes",
Help: "Size of sent gRPC messages as seen on the wire (eg. compressed, signed, encrypted).",
Buckets: messageSizeBuckets,
}, []string{"method"}),
}
}

type grpcStatsHandler struct {
connectedClients prometheus.Gauge
inflightRPC *prometheus.GaugeVec
receivedPayloadSize *prometheus.HistogramVec
sentPayloadSize *prometheus.HistogramVec
methodErrors *prometheus.CounterVec
}

// Custom type to hide it from other packages.
type contextKey int

const (
contextKeyMethodName contextKey = 1
)

func (g *grpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, contextKeyMethodName, info.FullMethodName)
}

func (g *grpcStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
// We use full method name from context, because not all RPCStats structs have it.
fullMethodName, ok := ctx.Value(contextKeyMethodName).(string)
if !ok {
return
}

switch s := rpcStats.(type) {
case *stats.Begin:
g.inflightRPC.WithLabelValues(fullMethodName).Inc()
case *stats.End:
g.inflightRPC.WithLabelValues(fullMethodName).Dec()
if s.Error != nil {
g.methodErrors.WithLabelValues(fullMethodName).Inc()
}

case *stats.InHeader:
// Ignored. Cortex doesn't use headers. Furthermore WireLength seems to be incorrect for large headers -- it uses
// length of last frame (16K) even for headers in megabytes.
case *stats.InPayload:
g.receivedPayloadSize.WithLabelValues(fullMethodName).Observe(float64(s.WireLength))
case *stats.InTrailer:
// Ignored. Cortex doesn't use trailers.

case *stats.OutHeader:
// Ignored. Cortex doesn't send headers, and since OutHeader doesn't have WireLength, we could only estimate it.
case *stats.OutPayload:
g.sentPayloadSize.WithLabelValues(fullMethodName).Observe(float64(s.WireLength))
case *stats.OutTrailer:
// Ignored, Cortex doesn't use trailers. OutTrailer doesn't have valid WireLength (there is deperecated field, always set to 0).
}
}

func (g *grpcStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

func (g *grpcStatsHandler) HandleConn(_ context.Context, connStats stats.ConnStats) {
switch connStats.(type) {
case *stats.ConnBegin:
g.connectedClients.Inc()

case *stats.ConnEnd:
g.connectedClients.Dec()
}
}
Loading