Skip to content

Commit

Permalink
Merge pull request #359 from carlosms/grpc-log-tags
Browse files Browse the repository at this point in the history
Add log fields metadata to gRPC calls
  • Loading branch information
carlosms authored Jan 25, 2019
2 parents 4a594f5 + 6ba788a commit 9d24df0
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 33 deletions.
11 changes: 11 additions & 0 deletions cmd/server-test/dummy_analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (suite *DummyIntegrationSuite) TestSuccessReview() {
})
}

func (suite *DummyIntegrationSuite) TestGRPCLogs() {
// Check that 'event-id' log field is sent from lookoutd to the dummy analyzer,
// and then received back when receiving a grpc call from the analyzer.
suite.sendEvent(successJSON)
suite.GrepAll(suite.r, []string{
`processing pull request`,
`msg="gRPC streaming server call started" analyzer=Dummy app=lookoutd event-id=16ee0f607886b841c7633ab4cea5334cbc2022a1 event-type="*pb.ReviewEvent" grpc.method=GetChanges`,
`status=success`,
})
}

func (suite *DummyIntegrationSuite) TestReviewDontPostSameComment() {
fixture := fixtures.GetByName("incremental-pr")

Expand Down
2 changes: 1 addition & 1 deletion cmd/server-test/multi_analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (suite *MultiDummyIntegrationSuite) TestSuccessReview() {

str := suite.GrepAll(suite.r, []string{
"processing pull request",
`msg="posting analysis" app=lookoutd comments=4`,
`msg="posting analysis" analyzer=Dummy1 app=lookoutd comments=4`,
`status=success`,
})

Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (s *Server) concurrentRequest(ctx context.Context, conf map[string]lookout.
var result *lookout.AnalyzerComments
defer func() { commentsCh <- result }()

aLogger := ctxlog.Get(ctx).With(log.Fields{
ctx, aLogger := ctxlog.WithLogFields(ctx, log.Fields{
"analyzer": name,
})

Expand Down
21 changes: 17 additions & 4 deletions util/grpchelper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpchelper
import (
"context"

"github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
log "gopkg.in/src-d/go-log.v1"
Expand All @@ -15,8 +16,14 @@ var LogAsDebug = false
// NewServer creates new grpc.Server with custom message size
func NewServer(opts ...grpc.ServerOption) *grpc.Server {
opts = append(opts,
grpc.StreamInterceptor(StreamServerInterceptor(log.DefaultLogger, LogAsDebug)),
grpc.UnaryInterceptor(UnaryServerInterceptor(log.DefaultLogger, LogAsDebug)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
CtxlogStreamServerInterceptor,
LogStreamServerInterceptor(LogAsDebug),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
CtxlogUnaryServerInterceptor,
LogUnaryServerInterceptor(LogAsDebug),
)),
)

return pb.NewServer(opts...)
Expand All @@ -25,8 +32,14 @@ func NewServer(opts ...grpc.ServerOption) *grpc.Server {
// DialContext creates a client connection to the given target with custom message size
func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts,
grpc.WithStreamInterceptor(StreamClientInterceptor(log.DefaultLogger, LogAsDebug)),
grpc.WithUnaryInterceptor(UnaryClientInterceptor(log.DefaultLogger, LogAsDebug)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
LogStreamClientInterceptor(LogAsDebug),
CtxlogStreamClientInterceptor,
)),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
LogUnaryClientInterceptor(LogAsDebug),
CtxlogUnaryClientInterceptor,
)),
)

return pb.DialContext(ctx, target, opts...)
Expand Down
86 changes: 86 additions & 0 deletions util/grpchelper/logfields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package grpchelper

import (
"context"
"encoding/json"

"github.com/src-d/lookout/util/ctxlog"

"github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
log "gopkg.in/src-d/go-log.v1"
)

const logFieldsKey = "log-fields"

// CtxlogUnaryClientInterceptor is a unary client interceptor that adds the
// ctxlog log.Fields to the grpc metadata, with the key 'logFieldsKey'.
func CtxlogUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = setLogFieldsMetadata(ctx)
return invoker(ctx, method, req, reply, cc, opts...)
}

// CtxlogStreamClientInterceptor is a streaming client interceptor that adds the
// ctxlog log.Fields to the grpc metadata, with the key 'logFieldsKey'.
func CtxlogStreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = setLogFieldsMetadata(ctx)
return streamer(ctx, desc, cc, method, opts...)
}

// setLogFieldsMetadata returns a new context with the ctxlog log.Fields stored
// into the grpc metadata, with the key 'logFieldsKey'.
func setLogFieldsMetadata(ctx context.Context) context.Context {
f := ctxlog.Fields(ctx)

// Delete the fields that should not cross to the gRPC server logs
delete(f, "app")

bytes, err := json.Marshal(f)
if err != nil {
ctxlog.Get(ctx).Errorf(err, "log.Fields could not be marshaled to JSON")
return ctx
}

return metadata.AppendToOutgoingContext(ctx, logFieldsKey, string(bytes))
}

// CtxlogUnaryServerInterceptor is a unary server interceptor that adds
// to the context a ctxlog configured with the log Fields found in the request
// metadata.
func CtxlogUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx = setContextLogger(ctx)
return handler(ctx, req)
}

// CtxlogStreamServerInterceptor is a streaming server interceptor that
// adds to the context a ctxlog configured with the log Fields found in the
// request metadata.
func CtxlogStreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = setContextLogger(stream.Context())

return handler(srv, wrapped)
}

// setContextLogger returns a new context containing a ctxlog configured with
// the log Fields found in the given ctx metadata.
func setContextLogger(ctx context.Context) context.Context {
md, ok := metadata.FromIncomingContext(ctx)
if !ok || len(md[logFieldsKey]) == 0 {
return ctx
}

var f log.Fields
err := json.Unmarshal([]byte(md[logFieldsKey][0]), &f)
if err != nil {
ctxlog.Get(ctx).Errorf(err, "log.Fields could not be unmarshaled from JSON")
return ctx
}

// Delete the fields that we don't want overwritten by a gRPC client
delete(f, "app")

newCtx, _ := ctxlog.WithLogFields(ctx, f)
return newCtx
}
60 changes: 33 additions & 27 deletions util/grpchelper/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"path"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/src-d/lookout/util/ctxlog"

"github.com/grpc-ecosystem/go-grpc-middleware/logging"
"google.golang.org/grpc"
log "gopkg.in/src-d/go-log.v1"
Expand All @@ -19,93 +20,98 @@ func getLogFn(l log.Logger, asDebug bool) func(msg string, args ...interface{})
return l.Infof
}

// UnaryServerInterceptor returns a new unary server interceptors that logs request/response.
func UnaryServerInterceptor(l log.Logger, asDebug bool) grpc.UnaryServerInterceptor {
// LogUnaryServerInterceptor returns a new unary server interceptor that logs
// request/response.
func LogUnaryServerInterceptor(asDebug bool) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()

l := newServerRequestLogger(l, info.FullMethod)
getLogFn(l, asDebug)("unary server call started")
l := newServerRequestLogger(ctx, info.FullMethod)
getLogFn(l, asDebug)("gRPC unary server call started")

resp, err := handler(ctx, req)

getLogFn(newResponseLogger(l, startTime, err), asDebug)("unary server call finished")
getLogFn(newResponseLogger(l, startTime, err), asDebug)("gRPC unary server call finished")

return resp, err
}
}

// StreamServerInterceptor returns a new streaming server interceptor that logs request/response.
func StreamServerInterceptor(l log.Logger, asDebug bool) grpc.StreamServerInterceptor {
// LogStreamServerInterceptor returns a new streaming server interceptor that
// logs request/response.
func LogStreamServerInterceptor(asDebug bool) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()

l := newServerRequestLogger(l, info.FullMethod)
getLogFn(l, asDebug)("streaming server call started")
l := newServerRequestLogger(stream.Context(), info.FullMethod)
getLogFn(l, asDebug)("gRPC streaming server call started")

wrapped := grpc_middleware.WrapServerStream(stream)
err := handler(srv, wrapped)
err := handler(srv, stream)

getLogFn(newResponseLogger(l, startTime, err), asDebug)("streaming server call finished")
getLogFn(newResponseLogger(l, startTime, err), asDebug)("gRPC streaming server call finished")

return err
}
}

// UnaryClientInterceptor returns a new unary client interceptor that logs the execution of external gRPC calls.
func UnaryClientInterceptor(l log.Logger, asDebug bool) grpc.UnaryClientInterceptor {
// LogUnaryClientInterceptor returns a new unary client interceptor that logs
// the execution of external gRPC calls.
func LogUnaryClientInterceptor(asDebug bool) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
startTime := time.Now()

l := newClientRequestLogger(l, method)
getLogFn(l, asDebug)("unary client call started")
l := newClientRequestLogger(ctx, method)
getLogFn(l, asDebug)("gRPC unary client call started")

err := invoker(ctx, method, req, reply, cc, opts...)

getLogFn(newResponseLogger(l, startTime, err), asDebug)("streaming client call finished")
getLogFn(newResponseLogger(l, startTime, err), asDebug)("gRPC unary client call finished")

return err
}
}

// StreamClientInterceptor returns a new striming client interceptor that logs the execution of external gRPC calls.
func StreamClientInterceptor(l log.Logger, asDebug bool) grpc.StreamClientInterceptor {
// LogStreamClientInterceptor returns a new streaming client interceptor that
// logs the execution of external gRPC calls.
func LogStreamClientInterceptor(asDebug bool) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
startTime := time.Now()

l := newClientRequestLogger(l, method)
getLogFn(l, asDebug)("streaming client call started")
l := newClientRequestLogger(ctx, method)
getLogFn(l, asDebug)("gRPC streaming client call started")

clientStream, err := streamer(ctx, desc, cc, method, opts...)

getLogFn(newResponseLogger(l, startTime, err), asDebug)("streaming client call finished")
getLogFn(newResponseLogger(l, startTime, err), asDebug)("gRPC streaming client call finished")

return clientStream, err
}
}

func newServerRequestLogger(l log.Logger, fullMethod string) log.Logger {
func newServerRequestLogger(ctx context.Context, fullMethod string) log.Logger {
service := path.Dir(fullMethod)[1:]
method := path.Base(fullMethod)

return l.With(log.Fields{
_, logger := ctxlog.WithLogFields(ctx, log.Fields{
"system": "grpc",
"span.kind": "server",
"grpc.service": service,
"grpc.method": method,
})
return logger
}

func newClientRequestLogger(l log.Logger, fullMethod string) log.Logger {
func newClientRequestLogger(ctx context.Context, fullMethod string) log.Logger {
service := path.Dir(fullMethod)[1:]
method := path.Base(fullMethod)

return l.With(log.Fields{
_, logger := ctxlog.WithLogFields(ctx, log.Fields{
"system": "grpc",
"span.kind": "client",
"grpc.service": service,
"grpc.method": method,
})
return logger
}

func newResponseLogger(l log.Logger, startTime time.Time, err error) log.Logger {
Expand Down

0 comments on commit 9d24df0

Please sign in to comment.