Skip to content

Commit

Permalink
Removed tags; Simplified interceptor code; Added logging fields edita…
Browse files Browse the repository at this point in the history
…bility.

Fixes #382

Signed-off-by: Bartlomiej Plotka <[email protected]>

Removed tags; Simplified interceptor code; Added logging fields editability.

Fixes #382

Signed-off-by: Bartlomiej Plotka <[email protected]>

Fixed tests.

Signed-off-by: Bartlomiej Plotka <[email protected]>

Fixed open metrics test.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Aug 7, 2021
1 parent 274df59 commit ff74a41
Show file tree
Hide file tree
Showing 47 changed files with 585 additions and 2,009 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,4 @@ coverage.txt
vendor/

.envrc
.bin
.bin
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Types of changes:
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## v2

### Changed

* `tags` removed. Use `logging.ExtractFields` to read logging fields from logging interceptor for your local request logger. Use `logging.InjectFields` to inject custom fields to logging interceptor to client context or interceptor before logging interceptor.

## [Unreleased]

### Added
Expand Down
10 changes: 4 additions & 6 deletions interceptors/auth/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"google.golang.org/grpc/status"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
)

var tokenInfoKey struct{}
Expand All @@ -38,12 +38,10 @@ func exampleAuthFunc(ctx context.Context) (context.Context, error) {
return nil, status.Errorf(codes.Unauthenticated, "invalid auth token: %v", err)
}

tags.Extract(ctx).Set("auth.sub", userClaimFromToken(tokenInfo))
ctx = logging.InjectFields(ctx, logging.Fields{"auth.sub", userClaimFromToken(tokenInfo)})

// WARNING: in production define your own type to avoid context collisions
newCtx := context.WithValue(ctx, tokenInfoKey, tokenInfo)

return newCtx, nil
// WARNING: In production define your own type to avoid context collisions.
return context.WithValue(ctx, tokenInfoKey, tokenInfo), nil
}

// Simple example of server initialization code
Expand Down
5 changes: 2 additions & 3 deletions interceptors/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ import (
func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
r := newReport(Unary, method)
reporter, newCtx := reportable.ClientReporter(ctx, req, r.rpcType, r.service, r.method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: req, Typ: r.rpcType, Service: r.service, Method: r.method})

reporter.PostMsgSend(req, nil, time.Since(r.startTime))
err := invoker(newCtx, method, req, reply, cc, opts...)
reporter.PostMsgReceive(reply, err, time.Since(r.startTime))

reporter.PostCall(err, time.Since(r.startTime))
return err
}
Expand All @@ -32,7 +31,7 @@ func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterce
func StreamClientInterceptor(reportable ClientReportable) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
r := newReport(clientStreamType(desc), method)
reporter, newCtx := reportable.ClientReporter(ctx, nil, r.rpcType, r.service, r.method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: nil, Typ: r.rpcType, Service: r.service, Method: r.method})

clientStream, err := streamer(newCtx, desc, cc, method, opts...)
if err != nil {
Expand Down
51 changes: 18 additions & 33 deletions interceptors/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
)

type mockReport struct {
typ GRPCType
svcName, methodName string
CallMeta

postCalls []error
postMsgSends []error
Expand All @@ -38,11 +37,13 @@ type mockReportable struct {

// Equal replaces require.Equal as google.golang.org/grpc/status errors are not easily comparable.
func (m *mockReportable) Equal(t *testing.T, expected []*mockReport) {
t.Helper()

require.Len(t, expected, len(m.reports))
for i, e := range m.reports {
require.Equal(t, expected[i].typ, e.typ, "%v", i)
require.Equal(t, expected[i].svcName, e.svcName, "%v", i)
require.Equal(t, expected[i].methodName, e.methodName, "%v", i)
require.Equal(t, expected[i].Typ, e.Typ, "%v", i)
require.Equal(t, expected[i].Service, e.Service, "%v", i)
require.Equal(t, expected[i].Method, e.Method, "%v", i)

require.Len(t, expected[i].postCalls, len(e.postCalls), "%v", i)
for k, err := range e.postCalls {
Expand Down Expand Up @@ -111,14 +112,14 @@ func (m *mockReportable) PostMsgReceive(_ interface{}, err error, _ time.Duratio
m.curr.postMsgReceives = append(m.curr.postMsgReceives, err)
}

func (m *mockReportable) ClientReporter(ctx context.Context, _ interface{}, typ GRPCType, serviceName string, methodName string) (Reporter, context.Context) {
m.curr = &mockReport{typ: typ, svcName: serviceName, methodName: methodName}
func (m *mockReportable) ClientReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
m.curr = &mockReport{CallMeta: c}
m.reports = append(m.reports, m.curr)
return m, ctx
}

func (m *mockReportable) ServerReporter(ctx context.Context, _ interface{}, typ GRPCType, serviceName string, methodName string) (Reporter, context.Context) {
m.curr = &mockReport{typ: typ, svcName: serviceName, methodName: methodName}
func (m *mockReportable) ServerReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
m.curr = &mockReport{CallMeta: c}
m.reports = append(m.reports, m.curr)
return m, ctx
}
Expand Down Expand Up @@ -206,9 +207,7 @@ func (s *ClientInterceptorTestSuite) TestUnaryReporting() {
_, err := s.testClient.PingEmpty(s.ctx, &testpb.PingEmptyRequest{}) // should return with code=OK
require.NoError(s.T(), err)
s.mock.Equal(s.T(), []*mockReport{{
typ: Unary,
svcName: testpb.TestServiceFullName,
methodName: "PingEmpty",
CallMeta: CallMeta{Typ: Unary, Service: testpb.TestServiceFullName, Method: "PingEmpty"},
postCalls: []error{nil},
postMsgReceives: []error{nil},
postMsgSends: []error{nil},
Expand All @@ -218,9 +217,7 @@ func (s *ClientInterceptorTestSuite) TestUnaryReporting() {
_, err = s.testClient.PingError(s.ctx, &testpb.PingErrorRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
require.Error(s.T(), err)
s.mock.Equal(s.T(), []*mockReport{{
typ: Unary,
svcName: testpb.TestServiceFullName,
methodName: "PingError",
CallMeta: CallMeta{Typ: Unary, Service: testpb.TestServiceFullName, Method: "PingError"},
postCalls: []error{status.Errorf(codes.FailedPrecondition, "Userspace error.")},
postMsgReceives: []error{status.Errorf(codes.FailedPrecondition, "Userspace error.")},
postMsgSends: []error{nil},
Expand All @@ -233,9 +230,7 @@ func (s *ClientInterceptorTestSuite) TestStartedListReporting() {

// Even without reading, we should get initial mockReport.
s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postMsgSends: []error{nil},
}})

Expand All @@ -244,14 +239,10 @@ func (s *ClientInterceptorTestSuite) TestStartedListReporting() {

// Even without reading, we should get initial mockReport.
s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postMsgSends: []error{nil},
}, {
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postMsgSends: []error{nil},
}})
}
Expand All @@ -273,9 +264,7 @@ func (s *ClientInterceptorTestSuite) TestListReporting() {
require.EqualValues(s.T(), testpb.ListResponseCount, count, "Number of received msg on the wire must match")

s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postCalls: []error{nil},
postMsgReceives: append(make([]error, testpb.ListResponseCount), io.EOF),
postMsgSends: []error{nil},
Expand All @@ -298,9 +287,7 @@ func (s *ClientInterceptorTestSuite) TestListReporting() {
require.Equal(s.T(), codes.FailedPrecondition, st.Code(), "Recv must return FailedPrecondition, otherwise the test is wrong")

s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postCalls: []error{status.Errorf(codes.FailedPrecondition, "foobar"), status.Errorf(codes.FailedPrecondition, "foobar")},
postMsgReceives: []error{status.Errorf(codes.FailedPrecondition, "foobar"), status.Errorf(codes.FailedPrecondition, "foobar")},
postMsgSends: []error{nil},
Expand Down Expand Up @@ -344,9 +331,7 @@ func (s *ClientInterceptorTestSuite) TestBiStreamingReporting() {

require.EqualValues(s.T(), 100, count, "Number of received msg on the wire must match")
s.mock.Equal(s.T(), []*mockReport{{
typ: BidiStream,
svcName: testpb.TestServiceFullName,
methodName: "PingStream",
CallMeta: CallMeta{Typ: BidiStream, Service: testpb.TestServiceFullName, Method: "PingStream"},
postCalls: []error{nil},
postMsgReceives: append(make([]error, 100), io.EOF),
postMsgSends: make([]error, 100),
Expand Down
15 changes: 7 additions & 8 deletions interceptors/logging/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
// Licensed under the Apache License 2.0.

/*
logging is a "parent" package for gRPC logging middlewares.
Package logging is a "parent" package for gRPC logging middlewares.
The gRPC logging middleware populates request-scoped data to `grpc_ctxtags.Tags` that relate to the current gRPC call
(e.g. service and method names).
The gRPC logging middleware populates request-scoped data to `logging.Fields` that relate to the current gRPC call
(e.g. service and method names). You can leverage that data using `logging.ExtractFields` and `logging.InjectFields`.
Once the gRPC logging middleware has added the gRPC specific Tags to the ctx they will then be written with the logs
that are made using the `ctx_logrus` or `ctx_zap` loggers.
Once the gRPC logging middleware has added the gRPC specific Fields to the ctx they will then be written with the log lines.
All logging middleware will emit a final log statement. It is based on the error returned by the handler function,
the gRPC status code, an error (if any) and it emit at a level controlled via `WithLevels`.
the gRPC status code, an error (if any) and it emits at a level controlled via `WithLevels`. You can control this behavior
using `WithDecider`.
This parent package
This particular package is intended for use by other middleware, logging or otherwise. It contains interfaces that other
logging middlewares *could* share . This allows code to be shared between different implementations.
logging middlewares *could* share. This allows code to be shared between different implementations.
Field names
Expand All @@ -31,6 +31,5 @@ Implementations:
* providers/zerolog
* providers/phuslog
See relevant packages below.
*/
package logging
100 changes: 47 additions & 53 deletions interceptors/logging/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,20 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/peer"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags"
)

// extractFields returns all fields from tags.
func extractFields(tags tags.Tags) Fields {
var fields Fields
for k, v := range tags.Values() {
fields = append(fields, k, v)
}
return fields
}

type reporter struct {
interceptors.CallMeta

ctx context.Context
typ interceptors.GRPCType
service, method string
startCallLogged bool
opts *options
logger Logger
kind string
startCallLogged bool

opts *options
logger Logger
}

func (c *reporter) logMessage(logger Logger, err error, msg string, duration time.Duration) {
Expand All @@ -40,12 +32,11 @@ func (c *reporter) logMessage(logger Logger, err error, msg string, duration tim
if err != nil {
logger = logger.With("grpc.error", fmt.Sprintf("%v", err))
}
logger = logger.With(extractFields(tags.Extract(c.ctx))...)
logger.With(c.opts.durationFieldFunc(duration)...).Log(c.opts.levelFunc(code), msg)
}

func (c *reporter) PostCall(err error, duration time.Duration) {
switch c.opts.shouldLog(interceptors.FullMethod(c.service, c.method), err) {
switch c.opts.shouldLog(c.FullMethod(), err) {
case LogFinishCall, LogStartAndFinishCall:
if err == io.EOF {
err = nil
Expand All @@ -60,7 +51,7 @@ func (c *reporter) PostMsgSend(_ interface{}, err error, duration time.Duration)
if c.startCallLogged {
return
}
switch c.opts.shouldLog(interceptors.FullMethod(c.service, c.method), err) {
switch c.opts.shouldLog(c.FullMethod(), err) {
case LogStartAndFinishCall:
c.startCallLogged = true
c.logMessage(c.logger, err, "started call", duration)
Expand All @@ -71,68 +62,71 @@ func (c *reporter) PostMsgReceive(_ interface{}, err error, duration time.Durati
if c.startCallLogged {
return
}
switch c.opts.shouldLog(interceptors.FullMethod(c.service, c.method), err) {
switch c.opts.shouldLog(c.FullMethod(), err) {
case LogStartAndFinishCall:
c.startCallLogged = true
c.logMessage(c.logger, err, "started call", duration)
}
}

type reportable struct {
opts *options
logger Logger
}

func (r *reportable) ServerReporter(ctx context.Context, _ interface{}, typ interceptors.GRPCType, service string, method string) (interceptors.Reporter, context.Context) {
return r.reporter(ctx, typ, service, method, KindServerFieldValue)
}
func reportable(logger Logger, opts *options) interceptors.CommonReportableFunc {
return func(ctx context.Context, c interceptors.CallMeta, isClient bool) (interceptors.Reporter, context.Context) {
kind := KindServerFieldValue
if isClient {
kind = KindClientFieldValue
}

func (r *reportable) ClientReporter(ctx context.Context, _ interface{}, typ interceptors.GRPCType, service string, method string) (interceptors.Reporter, context.Context) {
return r.reporter(ctx, typ, service, method, KindClientFieldValue)
}
fields := newCommonFields(kind, c)
if !isClient {
if peer, ok := peer.FromContext(ctx); ok {
fields = append(fields, "peer.address", peer.Addr.String())
}
}
fields = fields.AppendUnique(ExtractFields(ctx))

func (r *reportable) reporter(ctx context.Context, typ interceptors.GRPCType, service string, method string, kind string) (interceptors.Reporter, context.Context) {
fields := commonFields(kind, typ, service, method)
fields = append(fields, "grpc.start_time", time.Now().Format(r.opts.timestampFormat))
if d, ok := ctx.Deadline(); ok {
fields = append(fields, "grpc.request.deadline", d.Format(r.opts.timestampFormat))
singleUseFields := []string{"grpc.start_time", time.Now().Format(opts.timestampFormat)}
if d, ok := ctx.Deadline(); ok {
singleUseFields = append(singleUseFields, "grpc.request.deadline", d.Format(opts.timestampFormat))
}
return &reporter{
CallMeta: c,
ctx: ctx,
startCallLogged: false,
opts: opts,
logger: logger.With(fields...).With(singleUseFields...),
kind: kind,
}, InjectFields(ctx, fields)
}
return &reporter{
ctx: ctx,
typ: typ,
service: service,
method: method,
startCallLogged: false,
opts: r.opts,
logger: r.logger.With(fields...),
kind: kind,
}, ctx
}

// UnaryClientInterceptor returns a new unary client interceptor that optionally logs the execution of external gRPC calls.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details.
func UnaryClientInterceptor(logger Logger, opts ...Option) grpc.UnaryClientInterceptor {
o := evaluateClientOpt(opts)
return interceptors.UnaryClientInterceptor(&reportable{logger: logger, opts: o})
return interceptors.UnaryClientInterceptor(reportable(logger, o))
}

// StreamClientInterceptor returns a new streaming client interceptor that optionally logs the execution of external gRPC calls.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details.
func StreamClientInterceptor(logger Logger, opts ...Option) grpc.StreamClientInterceptor {
o := evaluateClientOpt(opts)
return interceptors.StreamClientInterceptor(&reportable{logger: logger, opts: o})
return interceptors.StreamClientInterceptor(reportable(logger, o))
}

// UnaryServerInterceptor returns a new unary server interceptors that optionally logs endpoint handling.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details.
func UnaryServerInterceptor(logger Logger, opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateServerOpt(opts)
return interceptors.UnaryServerInterceptor(&reportable{logger: logger, opts: o})
return interceptors.UnaryServerInterceptor(reportable(logger, o))
}

// StreamServerInterceptor returns a new stream server interceptors that optionally logs endpoint handling.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details..
func StreamServerInterceptor(logger Logger, opts ...Option) grpc.StreamServerInterceptor {
o := evaluateServerOpt(opts)
return interceptors.StreamServerInterceptor(&reportable{logger: logger, opts: o})
return interceptors.StreamServerInterceptor(reportable(logger, o))
}
Loading

0 comments on commit ff74a41

Please sign in to comment.