Skip to content

Commit

Permalink
Added a prometheus provider
Browse files Browse the repository at this point in the history
Signed-off-by: Yash Sharma <[email protected]>
  • Loading branch information
yashrsharma44 committed Jan 15, 2021
1 parent 08b17eb commit 60b2d64
Show file tree
Hide file tree
Showing 15 changed files with 970 additions and 4 deletions.
8 changes: 4 additions & 4 deletions interceptors/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ type monitoredClientStream struct {
}

func (s *monitoredClientStream) SendMsg(m interface{}) error {
start := time.Now()
timer := s.reporter.StartTimeCall(time.Now(), "send")
err := s.ClientStream.SendMsg(m)
s.reporter.PostMsgSend(m, err, time.Since(start))
s.reporter.PostMsgSend(m, err, timer.ObserveDuration())
return err
}

func (s *monitoredClientStream) RecvMsg(m interface{}) error {
start := time.Now()
timer := s.reporter.StartTimeCall(time.Now(), "recv")
err := s.ClientStream.RecvMsg(m)
s.reporter.PostMsgReceive(m, err, time.Since(start))
s.reporter.PostMsgReceive(m, err, timer.ObserveDuration())

if err == nil {
return nil
Expand Down
4 changes: 4 additions & 0 deletions interceptors/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type mockedReporter struct {
postMsgReceives []error
}

func (m *mockedReporter) StartTimeCall(startTime time.Time, _ string) Timer {
return EmptyTimer
}

func (m *mockedReporter) PostCall(err error, _ time.Duration) {
m.m.Lock()
defer m.m.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions interceptors/logging/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (c *reporter) logMessage(logger Logger, err error, msg string, duration tim
logger.With(c.opts.durationFieldFunc(duration)...).Log(c.opts.levelFunc(code), msg)
}

func (c *reporter) StartTimeCall(startTime time.Time, callType string) interceptors.Timer {
return interceptors.EmptyTimer
}

func (c *reporter) PostCall(err error, duration time.Duration) {
switch c.opts.shouldLog(interceptors.FullMethod(c.service, c.method), err) {
case LogFinishCall, LogStartAndFinishCall:
Expand Down
8 changes: 8 additions & 0 deletions interceptors/logging/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type serverPayloadReporter struct {
logger Logger
}

func (c *serverPayloadReporter) StartTimeCall(_ time.Time, _ string) interceptors.Timer {
return interceptors.EmptyTimer
}

func (c *serverPayloadReporter) PostCall(error, time.Duration) {}

func (c *serverPayloadReporter) PostMsgSend(req interface{}, err error, duration time.Duration) {
Expand All @@ -50,6 +54,10 @@ type clientPayloadReporter struct {
logger Logger
}

func (c *clientPayloadReporter) StartTimeCall(_ time.Time, _ string) interceptors.Timer {
return interceptors.EmptyTimer
}

func (c *clientPayloadReporter) PostCall(error, time.Duration) {}

func (c *clientPayloadReporter) PostMsgSend(req interface{}, err error, duration time.Duration) {
Expand Down
28 changes: 28 additions & 0 deletions interceptors/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,32 @@ import (

type GRPCType string

// timer is a helper interface to time functions.
// Useful for interceptors to record the total
// time elapsed since completion of a call.
type Timer interface {
ObserveDuration() time.Duration
}

// NoOp timer
type noOpTimer struct {
}

func (noOpTimer) ObserveDuration() time.Duration {
return 0
}

// Default Timer
type DefaultTimer struct {
startTime time.Time
}

func (tm *DefaultTimer) ObserveDuration() time.Duration {
return time.Since(tm.startTime)
}

var EmptyTimer = &noOpTimer{}

const (
Unary GRPCType = "unary"
ClientStream GRPCType = "client_stream"
Expand Down Expand Up @@ -51,6 +77,7 @@ type ServerReportable interface {
}

type Reporter interface {
StartTimeCall(time.Time, string) Timer
PostCall(err error, rpcDuration time.Duration)

PostMsgSend(reqProto interface{}, err error, sendDuration time.Duration)
Expand All @@ -61,6 +88,7 @@ var _ Reporter = NoopReporter{}

type NoopReporter struct{}

func (NoopReporter) StartTimeCall(time.Time, string) Timer { return EmptyTimer }
func (NoopReporter) PostCall(error, time.Duration) {}
func (NoopReporter) PostMsgSend(interface{}, error, time.Duration) {}
func (NoopReporter) PostMsgReceive(interface{}, error, time.Duration) {}
Expand Down
4 changes: 4 additions & 0 deletions interceptors/tags/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type reporter struct {
initial bool
}

func (c *reporter) StartTimeCall(_ time.Time, _ string) interceptors.Timer {
return interceptors.EmptyTimer
}

func (c *reporter) PostCall(error, time.Duration) {}

func (c *reporter) PostMsgSend(interface{}, error, time.Duration) {}
Expand Down
4 changes: 4 additions & 0 deletions interceptors/tracing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type opentracingClientReporter struct {
clientSpan opentracing.Span
}

func (o *opentracingClientReporter) StartTimeCall(_ time.Time, _ string) interceptors.Timer {
return interceptors.EmptyTimer
}

func (o *opentracingClientReporter) PostCall(err error, _ time.Duration) {
// Finish span.
if err != nil && err != io.EOF {
Expand Down
4 changes: 4 additions & 0 deletions interceptors/tracing/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type opentracingServerReporter struct {
serverSpan opentracing.Span
}

func (o *opentracingServerReporter) StartTimeCall(_ time.Time, _ string) interceptors.Timer {
return interceptors.EmptyTimer
}

func (o *opentracingServerReporter) PostCall(err error, _ time.Duration) {
// Finish span and log context information.
tags := tags.Extract(o.ctx)
Expand Down
168 changes: 168 additions & 0 deletions providers/prometheus/client_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package metrics

import (
prom "github.com/prometheus/client_golang/prometheus"
)

// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec

clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
clientHandledHistogram *prom.HistogramVec

clientStreamRecvHistogramEnabled bool
clientStreamRecvHistogramOpts prom.HistogramOpts
clientStreamRecvHistogram *prom.HistogramVec

clientStreamSendHistogramEnabled bool
clientStreamSendHistogramOpts prom.HistogramOpts
clientStreamSendHistogram *prom.HistogramVec
}

// NewClientMetrics returns a ClientMetrics object. Use a new instance of
// ClientMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions.
func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
opts := counterOptions(counterOpts)
return &ClientMetrics{
clientStartedCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_started_total",
Help: "Total number of RPCs started on the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),

clientHandledCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_handled_total",
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),

clientStreamMsgReceived: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_received_total",
Help: "Total number of RPC stream messages received by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),

clientStreamMsgSent: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),

clientHandledHistogramEnabled: false,
clientHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
Buckets: prom.DefBuckets,
},
clientHandledHistogram: nil,
clientStreamRecvHistogramEnabled: false,
clientStreamRecvHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_recv_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
Buckets: prom.DefBuckets,
},
clientStreamRecvHistogram: nil,
clientStreamSendHistogramEnabled: false,
clientStreamSendHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_send_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
Buckets: prom.DefBuckets,
},
clientStreamSendHistogram: nil,
}
}

// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
m.clientStartedCounter.Describe(ch)
m.clientHandledCounter.Describe(ch)
m.clientStreamMsgReceived.Describe(ch)
m.clientStreamMsgSent.Describe(ch)
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Describe(ch)
}
if m.clientStreamRecvHistogramEnabled {
m.clientStreamRecvHistogram.Describe(ch)
}
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Describe(ch)
}
}

// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
m.clientStartedCounter.Collect(ch)
m.clientHandledCounter.Collect(ch)
m.clientStreamMsgReceived.Collect(ch)
m.clientStreamMsgSent.Collect(ch)
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Collect(ch)
}
if m.clientStreamRecvHistogramEnabled {
m.clientStreamRecvHistogram.Collect(ch)
}
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Collect(ch)
}
}

// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientHandledHistogramOpts)
}
if !m.clientHandledHistogramEnabled {
m.clientHandledHistogram = prom.NewHistogramVec(
m.clientHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
m.clientHandledHistogramEnabled = true
}

// EnableClientStreamReceiveTimeHistogram turns on recording of single message receive time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientStreamReceiveTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientStreamRecvHistogramOpts)
}

if !m.clientStreamRecvHistogramEnabled {
m.clientStreamRecvHistogram = prom.NewHistogramVec(
m.clientStreamRecvHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}

m.clientStreamRecvHistogramEnabled = true
}

// EnableClientStreamSendTimeHistogram turns on recording of single message send time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientStreamSendHistogramOpts)
}

if !m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram = prom.NewHistogramVec(
m.clientStreamSendHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}

m.clientStreamSendHistogramEnabled = true
}
24 changes: 24 additions & 0 deletions providers/prometheus/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package metrics

type grpcType string

const (
Unary grpcType = "unary"
ClientStream grpcType = "client_stream"
ServerStream grpcType = "server_stream"
BidiStream grpcType = "bidi_stream"
)

type Kind string

const (
KindClient Kind = "client"
KindServer Kind = "server"
)

type RPCMethod string

const (
Send RPCMethod = "send"
Receive RPCMethod = "recv"
)
9 changes: 9 additions & 0 deletions providers/prometheus/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus

go 1.15

require (
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2
github.com/prometheus/client_golang v1.9.0
google.golang.org/grpc v1.35.0
)
Loading

0 comments on commit 60b2d64

Please sign in to comment.