Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
* [ENHANCEMENT] Logger: added JSON logging support, configured via the `-log.format=json` CLI flag or its respective YAML config option. #2386
* [ENHANCEMENT] Added new flags `-bigtable.grpc-compression`, `-ingester.client.grpc-compression`, `-querier.frontend-client.grpc-compression` to configure compression used by gRPC. Valid values are `gzip`, `snappy`, or empty string (no compression, default). #2940
* [ENHANCEMENT] Clarify limitations of the `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` endpoints. #2953
* [ENHANCEMENT] Added metrics for tracking some gRPC stats (most importantly message size histograms). #3036
* [BUGFIX] Fixed a bug with `api/v1/query_range` where no responses would return null values for `result` and empty values for `resultType`. #2962
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
Expand Down
4 changes: 4 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/alertmanager"
"github.com/cortexproject/cortex/pkg/api"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpc/stats"
"github.com/cortexproject/cortex/pkg/util/modules"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -86,6 +88,8 @@ func (t *Cortex) initAPI() (services.Service, error) {
func (t *Cortex) initServer() (services.Service, error) {
// Cortex handles signals on its own.
DisableSignalHandling(&t.Cfg.Server)

t.Cfg.Server.GRPCOptions = append(t.Cfg.Server.GRPCOptions, grpc.StatsHandler(stats.NewStatsHandler(prometheus.DefaultRegisterer)))
serv, err := server.New(t.Cfg.Server)
if err != nil {
return nil, err
Expand Down
109 changes: 109 additions & 0 deletions pkg/util/grpc/stats/grpc_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package stats

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/stats"
)

func NewStatsHandler(r prometheus.Registerer) stats.Handler {
const MiB = 1024 * 1024
messageSizeBuckets := []float64{1 * MiB, 2.5 * MiB, 5 * MiB, 10 * MiB, 25 * MiB, 50 * MiB, 100 * MiB, 250 * MiB}

return &grpcStatsHandler{
connectedClients: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "cortex_grpc_connected_clients",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion(non-blocking): This struct has a large potential for reuse in other projects. Would you mind making the cortex prefix a configurable namespace?

Copy link
Contributor Author

@pstibrany pstibrany Aug 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion. I plan to drop this PR if/when weaveworks/common#196 gets merged. If weaveworks/common#196 doesn't get accepted, I will reconsider this feedback.

Help: "Number of clients connected to gRPC server",
}),

inflightRPC: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_grpc_inflight_requests",
Help: "Number of inflight RPC calls",
}, []string{"method"}),

methodErrors: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_grpc_method_errors_total",
Help: "Number of clients connected to gRPC server",
}, []string{"method"}),

receivedMessageSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_grpc_request_size_bytes",
Help: "Size of gRPC requests.",
Buckets: messageSizeBuckets,
}, []string{"method"}),

sentMessageSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_grpc_response_size_bytes",
Help: "Size of gRPC responses.",
Buckets: messageSizeBuckets,
}, []string{"method"}),
}
}

type grpcStatsHandler struct {
connectedClients prometheus.Gauge
inflightRPC *prometheus.GaugeVec
receivedMessageSize *prometheus.HistogramVec
sentMessageSize *prometheus.HistogramVec
methodErrors *prometheus.CounterVec
}

// Custom type to hide it from other packages.
type contextKey int

const (
contextKeyMethodName contextKey = 1
)

func (g *grpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, contextKeyMethodName, info.FullMethodName)
}

func (g *grpcStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
// We use full method name from context, because not all RPCStats structs have it.
fullMethodName, ok := ctx.Value(contextKeyMethodName).(string)
if !ok {
return
}

switch s := rpcStats.(type) {
case *stats.Begin:
g.inflightRPC.WithLabelValues(fullMethodName).Inc()
case *stats.End:
g.inflightRPC.WithLabelValues(fullMethodName).Dec()
if s.Error != nil {
g.methodErrors.WithLabelValues(fullMethodName).Inc()
}

case *stats.InHeader:
// Ignored. Cortex doesn't use headers. Furthermore WireLength seems to be incorrect for large headers -- it uses
// length of last frame (16K) even for headers in megabytes.
case *stats.InPayload:
g.receivedMessageSize.WithLabelValues(fullMethodName).Observe(float64(s.WireLength))
case *stats.InTrailer:
// Ignored. Cortex doesn't use trailers.

case *stats.OutHeader:
// Ignored. Cortex doesn't send headers, and since OutHeader doesn't have WireLength, we could only estimate it.
case *stats.OutPayload:
g.sentMessageSize.WithLabelValues(fullMethodName).Observe(float64(s.WireLength))
case *stats.OutTrailer:
// Ignored, Cortex doesn't use trailers. OutTrailer doesn't have valid WireLength (there is deperecated field, always set to 0).
}
}

func (g *grpcStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

func (g *grpcStatsHandler) HandleConn(_ context.Context, connStats stats.ConnStats) {
switch connStats.(type) {
case *stats.ConnBegin:
g.connectedClients.Inc()

case *stats.ConnEnd:
g.connectedClients.Dec()
}
}
255 changes: 255 additions & 0 deletions pkg/util/grpc/stats/grpc_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
package stats

import (
"bytes"
"context"
"crypto/rand"
"fmt"
"net"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/querier/frontend"
)

func TestGrpcStats(t *testing.T) {
reg := prometheus.NewRegistry()
stats := NewStatsHandler(reg)

serv := grpc.NewServer(grpc.StatsHandler(stats), grpc.MaxRecvMsgSize(10e6))
defer serv.GracefulStop()

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, serv.Serve(listener))
}()

grpc_health_v1.RegisterHealthServer(serv, health.NewServer())

closed := false
conn, err := grpc.Dial(listener.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
defer func() {
if !closed {
require.NoError(t, conn.Close())
}
}()

hc := grpc_health_v1.NewHealthClient(conn)

// First request (empty).
resp, err := hc.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
require.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING, resp.Status)

// Second request, with large service name. This returns error, which doesn't count as "payload".
_, err = hc.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{
Service: generateString(8 * 1024 * 1024),
})
require.EqualError(t, err, "rpc error: code = NotFound desc = unknown service")

err = testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_grpc_connected_clients Number of clients connected to gRPC server
# TYPE cortex_grpc_connected_clients gauge
cortex_grpc_connected_clients 1

# HELP cortex_grpc_method_errors_total Number of clients connected to gRPC server
# TYPE cortex_grpc_method_errors_total counter
cortex_grpc_method_errors_total{method="/grpc.health.v1.Health/Check"} 1

# HELP cortex_grpc_request_size_bytes Size of gRPC requests.
# TYPE cortex_grpc_request_size_bytes histogram
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="1.048576e+06"} 1
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="2.62144e+06"} 1
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="5.24288e+06"} 1
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="1.048576e+07"} 2
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="2.62144e+07"} 2
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="5.24288e+07"} 2
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="1.048576e+08"} 2
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="2.62144e+08"} 2
cortex_grpc_request_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="+Inf"} 2
cortex_grpc_request_size_bytes_sum{method="/grpc.health.v1.Health/Check"} 8.388613e+06
cortex_grpc_request_size_bytes_count{method="/grpc.health.v1.Health/Check"} 2

# HELP cortex_grpc_response_size_bytes Size of gRPC responses.
# TYPE cortex_grpc_response_size_bytes histogram
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="1.048576e+06"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="2.62144e+06"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="5.24288e+06"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="1.048576e+07"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="2.62144e+07"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="5.24288e+07"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="1.048576e+08"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="2.62144e+08"} 1
cortex_grpc_response_size_bytes_bucket{method="/grpc.health.v1.Health/Check",le="+Inf"} 1
cortex_grpc_response_size_bytes_sum{method="/grpc.health.v1.Health/Check"} 7
cortex_grpc_response_size_bytes_count{method="/grpc.health.v1.Health/Check"} 1
`), "cortex_grpc_connected_clients", "cortex_grpc_request_size_bytes", "cortex_grpc_response_size_bytes", "cortex_grpc_method_errors_total")
require.NoError(t, err)

closed = true
require.NoError(t, conn.Close())

// Give server little time to update connected clients metric.
time.Sleep(1 * time.Second)
err = testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_grpc_connected_clients Number of clients connected to gRPC server
# TYPE cortex_grpc_connected_clients gauge
cortex_grpc_connected_clients 0
`), "cortex_grpc_connected_clients")
require.NoError(t, err)
}

func TestGrpcStatsStreaming(t *testing.T) {
reg := prometheus.NewRegistry()
stats := NewStatsHandler(reg)

serv := grpc.NewServer(grpc.StatsHandler(stats), grpc.MaxSendMsgSize(10e6), grpc.MaxRecvMsgSize(10e6))
defer serv.GracefulStop()

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, serv.Serve(listener))
}()

frontend.RegisterFrontendServer(serv, &frontendServer{log: t.Log})

conn, err := grpc.Dial(listener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(10e6), grpc.MaxCallSendMsgSize(10e6)))
require.NoError(t, err)
defer func() {
require.NoError(t, conn.Close())
}()

fc := frontend.NewFrontendClient(conn)

s, err := fc.Process(context.Background())
require.NoError(t, err)

for ix := 0; ix < 5; ix++ {
req, err := s.Recv()
require.NoError(t, err)

msg := &frontend.ProcessResponse{HttpResponse: &httpgrpc.HTTPResponse{
Code: 200,
Headers: req.HttpRequest.Headers,
}}
t.Log("Client sending:", msg.Size())
err = s.Send(msg)
require.NoError(t, err)

err = testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_grpc_inflight_requests Number of inflight RPC calls
# TYPE cortex_grpc_inflight_requests gauge
cortex_grpc_inflight_requests{method="/frontend.Frontend/Process"} 1
`), "cortex_grpc_inflight_requests")
require.NoError(t, err)
}
require.NoError(t, s.CloseSend())

// Wait a second to make sure server notices close.
time.Sleep(1 * time.Second)
err = testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_grpc_inflight_requests Number of inflight RPC calls
# TYPE cortex_grpc_inflight_requests gauge
cortex_grpc_inflight_requests{method="/frontend.Frontend/Process"} 0
`), "cortex_grpc_inflight_requests")
require.NoError(t, err)

err = testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_grpc_request_size_bytes Size of gRPC requests.
# TYPE cortex_grpc_request_size_bytes histogram
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="1.048576e+06"} 1
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="2.62144e+06"} 4
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="5.24288e+06"} 5
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="1.048576e+07"} 5
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="2.62144e+07"} 5
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="5.24288e+07"} 5
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="1.048576e+08"} 5
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="2.62144e+08"} 5
cortex_grpc_request_size_bytes_bucket{method="/frontend.Frontend/Process",le="+Inf"} 5
cortex_grpc_request_size_bytes_sum{method="/frontend.Frontend/Process"} 8.017448e+06
cortex_grpc_request_size_bytes_count{method="/frontend.Frontend/Process"} 5

# HELP cortex_grpc_response_size_bytes Size of gRPC responses.
# TYPE cortex_grpc_response_size_bytes histogram
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="1.048576e+06"} 0
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="2.62144e+06"} 2
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="5.24288e+06"} 4
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="1.048576e+07"} 6
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="2.62144e+07"} 6
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="5.24288e+07"} 6
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="1.048576e+08"} 6
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="2.62144e+08"} 6
cortex_grpc_response_size_bytes_bucket{method="/frontend.Frontend/Process",le="+Inf"} 6
cortex_grpc_response_size_bytes_sum{method="/frontend.Frontend/Process"} 2.2234511e+07
cortex_grpc_response_size_bytes_count{method="/frontend.Frontend/Process"} 6
`), "cortex_grpc_request_size_bytes", "cortex_grpc_response_size_bytes")

require.NoError(t, err)
}

type frontendServer struct {
log func(args ...interface{})
}

func (f frontendServer) Process(server frontend.Frontend_ProcessServer) error {
ix := 0
for {
ix++

msg := &frontend.ProcessRequest{HttpRequest: &httpgrpc.HTTPRequest{
Method: fmt.Sprintf("%d", ix),
Url: generateString(ix * 512 * 1024),
Headers: []*httpgrpc.Header{
{
Key: generateString(100 * ix),
Values: []string{generateString(100 * ix), generateString(10000 * ix), generateString(ix * 512 * 1024)},
},
},
}}

f.log("Server sending:", msg.Size())
err := server.Send(msg)

if err != nil {
return err
}

_, err = server.Recv()
if err != nil {
return err
}
}
}

func generateString(size int) string {
// Use random bytes, to avoid compression.
buf := make([]byte, size)
_, err := rand.Read(buf)
if err != nil {
// Should not happen.
panic(err)
}

// To avoid invalid UTF-8 sequences (which protobuf complains about), we cleanup the data a bit.
for ix, b := range buf {
if b < ' ' {
b += ' '
}
b = b & 0x7f
buf[ix] = b
}
return string(buf)
}