Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
* [CHANGE] Experimental Delete Series: `/api/v1/admin/tsdb/delete_series` and `/api/v1/admin/tsdb/cancel_delete_request` purger APIs to return status code `204` instead of `200` for success. #2946
* [ENHANCEMENT] Add support for azure storage in China, German and US Government environments. #2988
* [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994
* [ENHANCEMENT] Added metrics for tracking some gRPC stats (most importantly message size histograms): #3036
* `cortex_grpc_connected_clients`
* `cortex_grpc_inflight_requests`
* `cortex_grpc_method_errors_total`
* `cortex_grpc_received_payload_size_bytes`
* `cortex_grpc_sent_payload_size_bytes`
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990

## 1.3.0 in progress
Expand Down
4 changes: 4 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/alertmanager"
"github.com/cortexproject/cortex/pkg/api"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpc/stats"
"github.com/cortexproject/cortex/pkg/util/modules"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -86,6 +88,8 @@ func (t *Cortex) initAPI() (services.Service, error) {
func (t *Cortex) initServer() (services.Service, error) {
// Cortex handles signals on its own.
DisableSignalHandling(&t.Cfg.Server)

t.Cfg.Server.GRPCOptions = append(t.Cfg.Server.GRPCOptions, grpc.StatsHandler(stats.NewStatsHandler(prometheus.DefaultRegisterer)))
serv, err := server.New(t.Cfg.Server)
if err != nil {
return nil, err
Expand Down
109 changes: 109 additions & 0 deletions pkg/util/grpc/stats/grpc_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package stats

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/stats"
)

func NewStatsHandler(r prometheus.Registerer) stats.Handler {
const MiB = 1024 * 1024
messageSizeBuckets := []float64{1 * MiB, 2.5 * MiB, 5 * MiB, 10 * MiB, 25 * MiB, 50 * MiB, 100 * MiB, 250 * MiB}

return &grpcStatsHandler{
connectedClients: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "cortex_grpc_connected_clients",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion(non-blocking): This struct has a large potential for reuse in other projects. Would you mind making the cortex prefix a configurable namespace?

Copy link
Contributor Author

@pstibrany pstibrany Aug 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion. I plan to drop this PR if/when weaveworks/common#196 gets merged. If weaveworks/common#196 doesn't get accepted, I will reconsider this feedback.

Help: "Number of clients connected to gRPC server.",
}),

inflightRPC: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_grpc_inflight_requests",
Help: "Number of inflight gRPC calls.",
}, []string{"method"}),

methodErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_grpc_method_errors_total",
Help: "Number of errors returned by method.",
}, []string{"method"}),

receivedPayloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_grpc_received_payload_size_bytes",
Help: "Size of received gRPC messages as seen on the wire (eg. compressed, signed, encrypted).",
Buckets: messageSizeBuckets,
}, []string{"method"}),

sentPayloadSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_grpc_sent_payload_size_bytes",
Help: "Size of sent gRPC messages as seen on the wire (eg. compressed, signed, encrypted).",
Buckets: messageSizeBuckets,
}, []string{"method"}),
}
}

type grpcStatsHandler struct {
connectedClients prometheus.Gauge
inflightRPC *prometheus.GaugeVec
receivedPayloadSize *prometheus.HistogramVec
sentPayloadSize *prometheus.HistogramVec
methodErrors *prometheus.CounterVec
}

// Custom type to hide it from other packages.
type contextKey int

const (
contextKeyMethodName contextKey = 1
)

func (g *grpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, contextKeyMethodName, info.FullMethodName)
}

func (g *grpcStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
// We use full method name from context, because not all RPCStats structs have it.
fullMethodName, ok := ctx.Value(contextKeyMethodName).(string)
if !ok {
return
}

switch s := rpcStats.(type) {
case *stats.Begin:
g.inflightRPC.WithLabelValues(fullMethodName).Inc()
case *stats.End:
g.inflightRPC.WithLabelValues(fullMethodName).Dec()
if s.Error != nil {
g.methodErrors.WithLabelValues(fullMethodName).Inc()
}

case *stats.InHeader:
// Ignored. Cortex doesn't use headers. Furthermore WireLength seems to be incorrect for large headers -- it uses
// length of last frame (16K) even for headers in megabytes.
case *stats.InPayload:
g.receivedPayloadSize.WithLabelValues(fullMethodName).Observe(float64(s.WireLength))
case *stats.InTrailer:
// Ignored. Cortex doesn't use trailers.

case *stats.OutHeader:
// Ignored. Cortex doesn't send headers, and since OutHeader doesn't have WireLength, we could only estimate it.
case *stats.OutPayload:
g.sentPayloadSize.WithLabelValues(fullMethodName).Observe(float64(s.WireLength))
case *stats.OutTrailer:
// Ignored, Cortex doesn't use trailers. OutTrailer doesn't have valid WireLength (there is deperecated field, always set to 0).
}
}

func (g *grpcStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

func (g *grpcStatsHandler) HandleConn(_ context.Context, connStats stats.ConnStats) {
switch connStats.(type) {
case *stats.ConnBegin:
g.connectedClients.Inc()

case *stats.ConnEnd:
g.connectedClients.Dec()
}
}
Loading