-
Notifications
You must be signed in to change notification settings - Fork 327
gRPC integration #12
gRPC integration #12
Changes from 29 commits
4cecef5
45542ec
88f0d49
ce9353e
9714c63
0ee8a94
f1d95ba
de85e90
62d36a5
412e8ae
8496b0c
8c1b0f5
e2b9e39
a086caa
2447f0b
dacbcce
9a7ba3f
48af633
da0f143
0a2a0d2
2f7f729
c86ab26
85a5395
174ba23
695335d
b5d7b91
fb0ec85
47e4a1b
827f7c2
58b58e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
// Copyright 2017, OpenCensus Authors | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some files are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Thank you. Fixed now. |
||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
|
||
package grpc | ||
|
||
import ( | ||
"golang.org/x/net/context" | ||
|
||
"google.golang.org/grpc/stats" | ||
) | ||
|
||
// Handlers is a composite type implementing the "google.golang.org/grpc/stats.Handler" | ||
// interface to process lifecycle events from a GRPC client or server. Its only | ||
// purpose is to allow for chaining others types implementing the "google.golang.org/grpc/stats.Handler" | ||
// interface. In this package it allows both stats and tracing subHandlers to be chained together. | ||
type Handlers struct { | ||
subHandlers []stats.Handler | ||
} | ||
|
||
// TagConn can attach some information to the given context. The returned | ||
// context will be used for stats handling. For conn stats handling, the | ||
// context used in HandleConn for this connection will be derived from the | ||
// context returned. | ||
// For client RPC stats handling, the context used in HandleRPC is not derived | ||
// from the context returned. | ||
// For server RPC stats handling, the context used in HandleRPC for all RPCs on | ||
// this connection will be derived from this returned context. | ||
func (h Handlers) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { | ||
for _, sh := range h.subHandlers { | ||
ctx = sh.TagConn(ctx, info) | ||
} | ||
return ctx | ||
} | ||
|
||
// HandleConn calls all registered connection subHandlers. | ||
func (h Handlers) HandleConn(ctx context.Context, s stats.ConnStats) { | ||
for _, sh := range h.subHandlers { | ||
sh.HandleConn(ctx, s) | ||
} | ||
} | ||
|
||
// TagRPC can attach some information to the given context. The returned | ||
// context is used in the rest lifetime of the RPC. | ||
func (h Handlers) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { | ||
for _, sh := range h.subHandlers { | ||
ctx = sh.TagRPC(ctx, info) | ||
} | ||
return ctx | ||
} | ||
|
||
// HandleRPC calls all registered RPC subHandlers. | ||
func (h Handlers) HandleRPC(ctx context.Context, s stats.RPCStats) { | ||
for _, sh := range h.subHandlers { | ||
sh.HandleRPC(ctx, s) | ||
} | ||
} | ||
|
||
// AddHandler adds a handler to the list of registered subHandlers. This list | ||
// contains all the subhandlers that will be called during HandleConn and | ||
// HandleRPC. | ||
func (h Handlers) AddHandler(sh stats.Handler) { | ||
h.subHandlers = append(h.subHandlers, sh) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
// Copyright 2017, OpenCensus Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
|
||
package stats | ||
|
||
import ( | ||
"golang.org/x/net/context" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We do those checks on travis in gRPC: https://github.com/menghanl/grpc-go/blob/master/vet.sh#L51-L54 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Fixed. |
||
"strings" | ||
"sync/atomic" | ||
"time" | ||
|
||
istats "github.com/census-instrumentation/opencensus-go/stats" | ||
"github.com/census-instrumentation/opencensus-go/tags" | ||
"github.com/golang/glog" | ||
"google.golang.org/grpc/stats" | ||
) | ||
|
||
var ( | ||
// grpcClientConnKey is the key used to store client instrumentation | ||
// connection related data into the context. | ||
grpcClientConnKey *grpcInstrumentationKey | ||
// grpcClientRPCKey is the key used to store client instrumentation RPC | ||
// related data into the context. | ||
grpcClientRPCKey *grpcInstrumentationKey | ||
) | ||
|
||
// clientHandler is the type implementing the "google.golang.org/grpc/stats.Handler" | ||
// interface to process lifecycle events from the GRPC client. | ||
type clientHandler struct{} | ||
|
||
// NewClientHandler returns the "google.golang.org/grpc/stats.Handler" | ||
// implementation for the grpc client. | ||
func NewClientHandler() stats.Handler { | ||
return clientHandler{} | ||
} | ||
|
||
// TagConn adds connection related data to the given context and returns the | ||
// new context. | ||
func (ch clientHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { | ||
// Do nothing. This is here to satisfy the interface "google.golang.org/grpc/stats.Handler" | ||
return ctx | ||
} | ||
|
||
// HandleConn processes the connection events. | ||
func (ch clientHandler) HandleConn(ctx context.Context, s stats.ConnStats) { | ||
// Do nothing. This is here to satisfy the interface "google.golang.org/grpc/stats.Handler" | ||
} | ||
|
||
// TagRPC gets the github.com/census-instrumentation/opencensus-go/tags.TagsSet | ||
// populated by the application code, serializes its tags into the GRPC | ||
// metadata in order to be sent to the server. | ||
func (ch clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { | ||
startTime := time.Now() | ||
if ctx == nil { | ||
if glog.V(2) { | ||
glog.Infoln("clientHandler.TagRPC called with nil context") | ||
} | ||
return ctx | ||
} | ||
|
||
if info == nil { | ||
if glog.V(2) { | ||
glog.Infof("clientHandler.TagRPC called with nil info.", info.FullMethodName) | ||
} | ||
return ctx | ||
} | ||
names := strings.Split(info.FullMethodName, "/") | ||
if len(names) != 3 { | ||
if glog.V(2) { | ||
glog.Infof("clientHandler.TagRPC called with info.FullMethodName bad format. got %v, want '/$service/$method/'", info.FullMethodName) | ||
} | ||
return ctx | ||
} | ||
serviceName := names[1] | ||
methodName := names[2] | ||
|
||
d := &rpcData{ | ||
startTime: startTime, | ||
} | ||
|
||
ts := tags.FromContext(ctx) | ||
encoded := tags.EncodeToFullSignature(ts) | ||
ctx = stats.SetTags(ctx, encoded) | ||
|
||
tsb := tags.NewTagSetBuilder(ts) | ||
tsb.UpsertString(keyService, serviceName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this function return a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To make it easier to use the shortcut version when modifying multiple tags. It returns the same TagSetBuilder. It is just a syntactic sugar to allow for the short form: |
||
tsb.UpsertString(keyMethod, methodName) | ||
|
||
// TODO(acetechnologist): should we be recording this later? What is the | ||
// point of updating d.reqLen & d.reqCount if we update now? | ||
ctx = tags.NewContext(ctx, tsb.Build()) | ||
istats.RecordInt64(ctx, RPCClientStartedCount, 1) | ||
|
||
return context.WithValue(ctx, grpcClientRPCKey, d) | ||
} | ||
|
||
// HandleRPC processes the RPC events. | ||
func (ch clientHandler) HandleRPC(ctx context.Context, s stats.RPCStats) { | ||
switch st := s.(type) { | ||
case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer: | ||
// do nothing for client | ||
case *stats.OutPayload: | ||
ch.handleRPCOutPayload(ctx, st) | ||
case *stats.InPayload: | ||
ch.handleRPCInPayload(ctx, st) | ||
case *stats.End: | ||
ch.handleRPCEnd(ctx, st) | ||
default: | ||
glog.Infof("unexpected stats: %T", st) | ||
} | ||
} | ||
|
||
func (ch clientHandler) handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) { | ||
d, ok := ctx.Value(grpcClientRPCKey).(*rpcData) | ||
if !ok { | ||
if glog.V(2) { | ||
glog.Infoln("clientHandler.handleRPCOutPayload failed to retrieve *rpcData from context") | ||
} | ||
return | ||
} | ||
|
||
istats.RecordInt64(ctx, RPCClientRequestBytes, int64(s.Length)) | ||
atomic.AddUint64(&d.reqCount, 1) | ||
} | ||
|
||
func (ch clientHandler) handleRPCInPayload(ctx context.Context, s *stats.InPayload) { | ||
d, ok := ctx.Value(grpcClientRPCKey).(*rpcData) | ||
if !ok { | ||
if glog.V(2) { | ||
glog.Infoln("clientHandler.handleRPCInPayload failed to retrieve *rpcData from context") | ||
} | ||
return | ||
} | ||
|
||
istats.RecordInt64(ctx, RPCClientResponseBytes, int64(s.Length)) | ||
atomic.AddUint64(&d.respCount, 1) | ||
} | ||
|
||
func (ch clientHandler) handleRPCEnd(ctx context.Context, s *stats.End) { | ||
d, ok := ctx.Value(grpcClientRPCKey).(*rpcData) | ||
if !ok { | ||
if glog.V(2) { | ||
glog.Infoln("clientHandler.handleRPCEnd failed to retrieve *rpcData from context") | ||
} | ||
return | ||
} | ||
elapsedTime := time.Since(d.startTime) | ||
|
||
var measurements []istats.Measurement | ||
measurements = append(measurements, RPCClientRequestCount.Is(int64(d.reqCount))) | ||
measurements = append(measurements, RPCClientResponseCount.Is(int64(d.respCount))) | ||
measurements = append(measurements, RPCClientFinishedCount.Is(1)) | ||
measurements = append(measurements, RPCClientRoundTripLatency.Is(float64(elapsedTime)/float64(time.Millisecond))) | ||
|
||
if s.Error != nil { | ||
errorCode := s.Error.Error() | ||
ts := tags.FromContext(ctx) | ||
tsb := tags.NewTagSetBuilder(ts) | ||
tsb.UpsertString(keyOpStatus, errorCode) | ||
|
||
ctx = tags.NewContext(ctx, tsb.Build()) | ||
measurements = append(measurements, RPCClientErrorCount.Is(1)) | ||
} | ||
|
||
istats.Record(ctx, measurements...) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to keep this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we don't. Deleted it.