Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
gRPC integration (#12)
Browse files Browse the repository at this point in the history
* Add initial stats API.

* fixed readme.md file

* fixed readme.md file

* fixing yaml file

* fixing yaml file

* remove go 1.7 and 1.6 from yaml file as we use 'sort.Slice' that is only supportted in 1.8 and above

* Added to README.md a section about modifying the reporting interval.

* Using inheritence instead of type switch for creating aggregationValue and aggregator. Also added equality as an interface to aggregationValue

* Deleted not used directories

* implemented automatic registration for measures on creation

* implemented codec for tags to be used in GRPC

* implemented codec for tags to be used in GRPC

* implemented codec for tags to be used in GRPC

* Fixed readme.md typos. Changed API to create a tagSet builder

* added convenience methods to various structs: String(), Equal(), Contains()... Also renamed some methods to be more succint and fixed some documentation issues

* added getter methods to interfaces and made some fields public to eventually support conversions to other formats

* addressed commnets for the pull request. Fixed both method names and readme.md.

* added installation and prereq comments to readme.md

* Fixed aggregationDistirbution to include oldest bucket in interval stats

* Added plugin interceptor for grpc stats

* Added all defaults measures and views to be collected for grpc stats

* Addressed PR comments. Modified readme.md to mention the requirement for Go1.9 to integrate with libraries still using golang.org/x/net/context.Context. Fixed the license headings to refer to opencensus instead of google.

* Made serverHandler and clientHandler private and added exported constructor for them.

* Requires Go 1.9 to take advantage of context type aliasing.

* Requiring Go 1.9 seems quite limiting. So I went ahead and replaced context by golang.org/x/net/context and reduced the requirement to 1.8

* Reworded comments for NewTestingAggregationDistributionValue

* Renamed NewTestingAggregationDistributionValue to NewDoNotUseTestingAggregationDistributionValue

* Fixed naming NewTestingAggregationDistributionValue to NewDoNotUseTestingAggregationDistributionValue in server_handler_test.go

* Added gofmt to .travis script. Fixed formatting issues
  • Loading branch information
acetechnologist authored Sep 22, 2017
1 parent 6da5433 commit 2d3c093
Show file tree
Hide file tree
Showing 17 changed files with 1,746 additions and 137 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ before_script:
- PKGS=$(go list ./... | grep -v /vendor/) # All the import paths, excluding vendor/ if any

script:
- gofmt -w ./
- go test -v -race $PKGS # Run all the tests with the race detector enabled
- 'if [[ $TRAVIS_GO_VERSION = 1.8* ]]; then ! golint ./... | grep -vE "(_mock|_string|\.pb)\.go:"; fi'
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ To install this package, you need to install Go and setup your Go workspace on y
$ go get -u github.com/census-instrumentation/opencensus-go

## Prerequisites
This requires Go 1.8 or later as it uses the convenience function sort.Slice(...) in few places.
This requires Go 1.8 or later as it uses the convenience function sort.Slice(...) introduced in Go 1.8.

## Tags

Expand Down
75 changes: 75 additions & 0 deletions plugins/grpc/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2017, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package grpc

import (
"golang.org/x/net/context"

"google.golang.org/grpc/stats"
)

// Handlers is a composite type implementing the "google.golang.org/grpc/stats.Handler"
// interface to process lifecycle events from a GRPC client or server. Its only
// purpose is to allow for chaining others types implementing the "google.golang.org/grpc/stats.Handler"
// interface. In this package it allows both stats and tracing subHandlers to be chained together.
type Handlers struct {
subHandlers []stats.Handler
}

// TagConn can attach some information to the given context. The returned
// context will be used for stats handling. For conn stats handling, the
// context used in HandleConn for this connection will be derived from the
// context returned.
// For client RPC stats handling, the context used in HandleRPC is not derived
// from the context returned.
// For server RPC stats handling, the context used in HandleRPC for all RPCs on
// this connection will be derived from this returned context.
func (h Handlers) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
for _, sh := range h.subHandlers {
ctx = sh.TagConn(ctx, info)
}
return ctx
}

// HandleConn calls all registered connection subHandlers.
func (h Handlers) HandleConn(ctx context.Context, s stats.ConnStats) {
for _, sh := range h.subHandlers {
sh.HandleConn(ctx, s)
}
}

// TagRPC can attach some information to the given context. The returned
// context is used in the rest lifetime of the RPC.
func (h Handlers) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
for _, sh := range h.subHandlers {
ctx = sh.TagRPC(ctx, info)
}
return ctx
}

// HandleRPC calls all registered RPC subHandlers.
func (h Handlers) HandleRPC(ctx context.Context, s stats.RPCStats) {
for _, sh := range h.subHandlers {
sh.HandleRPC(ctx, s)
}
}

// AddHandler adds a handler to the list of registered subHandlers. This list
// contains all the subhandlers that will be called during HandleConn and
// HandleRPC.
func (h Handlers) AddHandler(sh stats.Handler) {
h.subHandlers = append(h.subHandlers, sh)
}
178 changes: 178 additions & 0 deletions plugins/grpc/stats/client_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2017, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package stats

import (
"strings"
"sync/atomic"
"time"

istats "github.com/census-instrumentation/opencensus-go/stats"
"github.com/census-instrumentation/opencensus-go/tags"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/stats"
)

var (
// grpcClientConnKey is the key used to store client instrumentation
// connection related data into the context.
grpcClientConnKey *grpcInstrumentationKey
// grpcClientRPCKey is the key used to store client instrumentation RPC
// related data into the context.
grpcClientRPCKey *grpcInstrumentationKey
)

// clientHandler is the type implementing the "google.golang.org/grpc/stats.Handler"
// interface to process lifecycle events from the GRPC client.
type clientHandler struct{}

// NewClientHandler returns the "google.golang.org/grpc/stats.Handler"
// implementation for the grpc client.
func NewClientHandler() stats.Handler {
return clientHandler{}
}

// TagConn adds connection related data to the given context and returns the
// new context.
func (ch clientHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
// Do nothing. This is here to satisfy the interface "google.golang.org/grpc/stats.Handler"
return ctx
}

// HandleConn processes the connection events.
func (ch clientHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
// Do nothing. This is here to satisfy the interface "google.golang.org/grpc/stats.Handler"
}

// TagRPC gets the github.com/census-instrumentation/opencensus-go/tags.TagsSet
// populated by the application code, serializes its tags into the GRPC
// metadata in order to be sent to the server.
func (ch clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
startTime := time.Now()
if ctx == nil {
if glog.V(2) {
glog.Infoln("clientHandler.TagRPC called with nil context")
}
return ctx
}

if info == nil {
if glog.V(2) {
glog.Infof("clientHandler.TagRPC called with nil info.", info.FullMethodName)
}
return ctx
}
names := strings.Split(info.FullMethodName, "/")
if len(names) != 3 {
if glog.V(2) {
glog.Infof("clientHandler.TagRPC called with info.FullMethodName bad format. got %v, want '/$service/$method/'", info.FullMethodName)
}
return ctx
}
serviceName := names[1]
methodName := names[2]

d := &rpcData{
startTime: startTime,
}

ts := tags.FromContext(ctx)
encoded := tags.EncodeToFullSignature(ts)
ctx = stats.SetTags(ctx, encoded)

tsb := tags.NewTagSetBuilder(ts)
tsb.UpsertString(keyService, serviceName)
tsb.UpsertString(keyMethod, methodName)

// TODO(acetechnologist): should we be recording this later? What is the
// point of updating d.reqLen & d.reqCount if we update now?
ctx = tags.NewContext(ctx, tsb.Build())
istats.RecordInt64(ctx, RPCClientStartedCount, 1)

return context.WithValue(ctx, grpcClientRPCKey, d)
}

// HandleRPC processes the RPC events.
func (ch clientHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
switch st := s.(type) {
case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
// do nothing for client
case *stats.OutPayload:
ch.handleRPCOutPayload(ctx, st)
case *stats.InPayload:
ch.handleRPCInPayload(ctx, st)
case *stats.End:
ch.handleRPCEnd(ctx, st)
default:
glog.Infof("unexpected stats: %T", st)
}
}

func (ch clientHandler) handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
d, ok := ctx.Value(grpcClientRPCKey).(*rpcData)
if !ok {
if glog.V(2) {
glog.Infoln("clientHandler.handleRPCOutPayload failed to retrieve *rpcData from context")
}
return
}

istats.RecordInt64(ctx, RPCClientRequestBytes, int64(s.Length))
atomic.AddUint64(&d.reqCount, 1)
}

func (ch clientHandler) handleRPCInPayload(ctx context.Context, s *stats.InPayload) {
d, ok := ctx.Value(grpcClientRPCKey).(*rpcData)
if !ok {
if glog.V(2) {
glog.Infoln("clientHandler.handleRPCInPayload failed to retrieve *rpcData from context")
}
return
}

istats.RecordInt64(ctx, RPCClientResponseBytes, int64(s.Length))
atomic.AddUint64(&d.respCount, 1)
}

func (ch clientHandler) handleRPCEnd(ctx context.Context, s *stats.End) {
d, ok := ctx.Value(grpcClientRPCKey).(*rpcData)
if !ok {
if glog.V(2) {
glog.Infoln("clientHandler.handleRPCEnd failed to retrieve *rpcData from context")
}
return
}
elapsedTime := time.Since(d.startTime)

var measurements []istats.Measurement
measurements = append(measurements, RPCClientRequestCount.Is(int64(d.reqCount)))
measurements = append(measurements, RPCClientResponseCount.Is(int64(d.respCount)))
measurements = append(measurements, RPCClientFinishedCount.Is(1))
measurements = append(measurements, RPCClientRoundTripLatency.Is(float64(elapsedTime)/float64(time.Millisecond)))

if s.Error != nil {
errorCode := s.Error.Error()
ts := tags.FromContext(ctx)
tsb := tags.NewTagSetBuilder(ts)
tsb.UpsertString(keyOpStatus, errorCode)

ctx = tags.NewContext(ctx, tsb.Build())
measurements = append(measurements, RPCClientErrorCount.Is(1))
}

istats.Record(ctx, measurements...)
}
Loading

0 comments on commit 2d3c093

Please sign in to comment.