Skip to content

Commit

Permalink
grpc: add prometheus server and client prometheus metrics (#642)
Browse files Browse the repository at this point in the history
  • Loading branch information
ggilmore authored Sep 12, 2023
1 parent 40a9a23 commit 3ce1f2b
Show file tree
Hide file tree
Showing 20 changed files with 2,015 additions and 1,183 deletions.
32 changes: 24 additions & 8 deletions api_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"math/rand"
"reflect"

proto "github.com/sourcegraph/zoekt/grpc/v1"
proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -243,11 +243,11 @@ func (lfm *LineFragmentMatch) ToProto() *proto.LineFragmentMatch {

func FlushReasonFromProto(p proto.FlushReason) FlushReason {
switch p {
case proto.FlushReason_TIMER_EXPIRED:
case proto.FlushReason_FLUSH_REASON_TIMER_EXPIRED:
return FlushReasonTimerExpired
case proto.FlushReason_FINAL_FLUSH:
case proto.FlushReason_FLUSH_REASON_FINAL_FLUSH:
return FlushReasonFinalFlush
case proto.FlushReason_MAX_SIZE:
case proto.FlushReason_FLUSH_REASON_MAX_SIZE:
return FlushReasonMaxSize
default:
return FlushReason(0)
Expand All @@ -257,13 +257,13 @@ func FlushReasonFromProto(p proto.FlushReason) FlushReason {
func (fr FlushReason) ToProto() proto.FlushReason {
switch fr {
case FlushReasonTimerExpired:
return proto.FlushReason_TIMER_EXPIRED
return proto.FlushReason_FLUSH_REASON_TIMER_EXPIRED
case FlushReasonFinalFlush:
return proto.FlushReason_FINAL_FLUSH
return proto.FlushReason_FLUSH_REASON_FINAL_FLUSH
case FlushReasonMaxSize:
return proto.FlushReason_MAX_SIZE
return proto.FlushReason_FLUSH_REASON_MAX_SIZE
default:
return proto.FlushReason_UNKNOWN
return proto.FlushReason_FLUSH_REASON_UNKNOWN_UNSPECIFIED
}
}

Expand Down Expand Up @@ -345,6 +345,14 @@ func (p *Progress) ToProto() *proto.Progress {
}
}

func SearchResultFromStreamProto(p *proto.StreamSearchResponse, repoURLs, lineFragments map[string]string) *SearchResult {
if p == nil {
return nil
}

return SearchResultFromProto(p.GetResponseChunk(), repoURLs, lineFragments)
}

func SearchResultFromProto(p *proto.SearchResponse, repoURLs, lineFragments map[string]string) *SearchResult {
if p == nil {
return nil
Expand Down Expand Up @@ -384,6 +392,14 @@ func (sr *SearchResult) ToProto() *proto.SearchResponse {
}
}

func (sr *SearchResult) ToStreamProto() *proto.StreamSearchResponse {
if sr == nil {
return nil
}

return &proto.StreamSearchResponse{ResponseChunk: sr.ToProto()}
}

func RepositoryBranchFromProto(p *proto.RepositoryBranch) RepositoryBranch {
return RepositoryBranch{
Name: p.GetName(),
Expand Down
57 changes: 39 additions & 18 deletions api_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
webproto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
"google.golang.org/protobuf/proto"

v1 "github.com/sourcegraph/zoekt/grpc/v1"
)

func TestProtoRoundtrip(t *testing.T) {
Expand Down Expand Up @@ -133,23 +132,45 @@ func TestProtoRoundtrip(t *testing.T) {
})

t.Run("SearchResult", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string
t.Run("unary", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string

if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
}

if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
p1 := f1.ToProto()
f2 := SearchResultFromProto(p1, repoURLs, lineFragments)

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
})

p1 := f1.ToProto()
f2 := SearchResultFromProto(p1, repoURLs, lineFragments)
t.Run("stream", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
}

p1 := f1.ToStreamProto()
f2 := SearchResultFromStreamProto(p1, repoURLs, lineFragments)

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
})
})

t.Run("Repository", func(t *testing.T) {
Expand Down Expand Up @@ -396,8 +417,8 @@ var (
exampleSearchResultBytes []byte

// The proto struct representation of the search result
exampleSearchResultProto = func() *v1.SearchResponse {
sr := new(v1.SearchResponse)
exampleSearchResultProto = func() *webproto.SearchResponse {
sr := new(webproto.SearchResponse)
err := proto.Unmarshal(exampleSearchResultBytes, sr)
if err != nil {
panic(err)
Expand Down Expand Up @@ -451,7 +472,7 @@ func BenchmarkProtoRoundtrip(b *testing.B) {
}

for _, buf := range buffers {
res := new(v1.SearchResponse)
res := new(webproto.SearchResponse)
err := proto.Unmarshal(buf, res)
if err != nil {
b.Fatal(err)
Expand Down
3 changes: 1 addition & 2 deletions cmd/zoekt-sourcegraph-indexserver/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"

"github.com/sourcegraph/log/logtest"

proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -28,7 +28,6 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"github.com/sourcegraph/zoekt"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
)

func TestIterateIndexOptions_Fingerprint(t *testing.T) {
Expand Down
28 changes: 27 additions & 1 deletion cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"text/tabwriter"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2"
"github.com/keegancsmith/tmpfriend"
"github.com/peterbourgon/ff/v3/ffcli"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
sglog "github.com/sourcegraph/log"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"github.com/sourcegraph/zoekt/grpc/internalerrs"
"github.com/sourcegraph/zoekt/grpc/messagesize"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -49,7 +51,6 @@ import (

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/build"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"github.com/sourcegraph/zoekt/debugserver"
"github.com/sourcegraph/zoekt/internal/profiler"
)
Expand Down Expand Up @@ -126,6 +127,9 @@ var (
Name: "index_num_stopped_tracking_total",
Help: "Counts the number of repos we stopped tracking.",
})

clientMetricsOnce sync.Once
clientMetrics *grpc_prometheus.ClientMetrics
)

// set of repositories that we want to capture separate indexing metrics for
Expand Down Expand Up @@ -1457,14 +1461,18 @@ func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB

func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
metrics := mustGetClientMetrics()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainStreamInterceptor(
grpc_prometheus.StreamClientInterceptor(metrics),
internalActorStreamInterceptor(),
internalerrs.LoggingStreamClientInterceptor(logger),
internalerrs.PrometheusStreamClientInterceptor,
),
grpc.WithChainUnaryInterceptor(
grpc_prometheus.UnaryClientInterceptor(metrics),
internalActorUnaryInterceptor(),
internalerrs.LoggingUnaryClientInterceptor(logger),
internalerrs.PrometheusUnaryClientInterceptor,
Expand Down Expand Up @@ -1494,6 +1502,24 @@ func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.Dia
return client, nil
}

// mustGetClientMetrics returns a singleton instance of the client metrics
// that are shared across all gRPC clients that this process creates.
//
// This function panics if the metrics cannot be registered with the default
// Prometheus registry.
func mustGetClientMetrics() *grpc_prometheus.ClientMetrics {
clientMetricsOnce.Do(func() {
clientMetrics = grpc_prometheus.NewRegisteredClientMetrics(prometheus.DefaultRegisterer,
grpc_prometheus.WithClientCounterOptions(),
grpc_prometheus.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
grpc_prometheus.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
grpc_prometheus.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
)
})

return clientMetrics
}

// addDefaultPort adds a default port to a URL if one is not specified.
//
// If the URL scheme is "http" and no port is specified, "80" is used.
Expand Down
1 change: 0 additions & 1 deletion cmd/zoekt-sourcegraph-indexserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

sglog "github.com/sourcegraph/log"
"github.com/sourcegraph/log/logtest"

"github.com/xeipuuv/gojsonschema"
"google.golang.org/grpc"

Expand Down
2 changes: 1 addition & 1 deletion cmd/zoekt-sourcegraph-indexserver/sg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

"github.com/go-git/go-git/v5"
retryablehttp "github.com/hashicorp/go-retryablehttp"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"golang.org/x/net/trace"
"google.golang.org/grpc"

"github.com/sourcegraph/zoekt"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
)

// SourcegraphListResult is the return value of Sourcegraph.List. It is its
Expand Down
44 changes: 24 additions & 20 deletions grpc/server/server.go → cmd/zoekt-webserver/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"math"

"github.com/sourcegraph/zoekt/grpc/chunk"
proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/sourcegraph/zoekt"
v1 "github.com/sourcegraph/zoekt/grpc/v1"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/stream"
)
Expand All @@ -21,11 +21,11 @@ func NewServer(s zoekt.Streamer) *Server {
}

type Server struct {
v1.UnimplementedWebserverServiceServer
proto.UnimplementedWebserverServiceServer
streamer zoekt.Streamer
}

func (s *Server) Search(ctx context.Context, req *v1.SearchRequest) (*v1.SearchResponse, error) {
func (s *Server) Search(ctx context.Context, req *proto.SearchRequest) (*proto.SearchResponse, error) {
q, err := query.QFromProto(req.GetQuery())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -39,23 +39,25 @@ func (s *Server) Search(ctx context.Context, req *v1.SearchRequest) (*v1.SearchR
return res.ToProto(), nil
}

func (s *Server) StreamSearch(req *v1.SearchRequest, ss v1.WebserverService_StreamSearchServer) error {
q, err := query.QFromProto(req.GetQuery())
func (s *Server) StreamSearch(req *proto.StreamSearchRequest, ss proto.WebserverService_StreamSearchServer) error {
request := req.GetRequest()

q, err := query.QFromProto(request.GetQuery())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

sender := gRPCChunkSender(ss)
sampler := stream.NewSamplingSender(sender)

err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(req.GetOpts()), sampler)
err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(request.GetOpts()), sampler)
if err == nil {
sampler.Flush()
}
return err
}

func (s *Server) List(ctx context.Context, req *v1.ListRequest) (*v1.ListResponse, error) {
func (s *Server) List(ctx context.Context, req *proto.ListRequest) (*proto.ListResponse, error) {
q, err := query.QFromProto(req.GetQuery())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -70,12 +72,14 @@ func (s *Server) List(ctx context.Context, req *v1.ListRequest) (*v1.ListRespons
}

// gRPCChunkSender is a zoekt.Sender that sends small chunks of FileMatches to the provided gRPC stream.
func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
func gRPCChunkSender(ss proto.WebserverService_StreamSearchServer) zoekt.Sender {
f := func(r *zoekt.SearchResult) {
result := r.ToProto()
result := r.ToStreamProto().GetResponseChunk()

if len(result.GetFiles()) == 0 { // stats-only result, send it immediately
_ = ss.Send(result)
_ = ss.Send(&proto.StreamSearchResponse{
ResponseChunk: result,
})
return
}

Expand All @@ -84,10 +88,10 @@ func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
statsSent := false
numFilesSent := 0

sendFunc := func(filesChunk []*v1.FileMatch) error {
sendFunc := func(filesChunk []*proto.FileMatch) error {
numFilesSent += len(filesChunk)

var stats *v1.Stats
var stats *proto.Stats
if !statsSent { // We only send stats back on the first chunk
statsSent = true
stats = result.GetStats()
Expand All @@ -96,7 +100,7 @@ func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
progress := result.GetProgress()

if numFilesSent < len(result.GetFiles()) { // more chunks to come
progress = &v1.Progress{
progress = &proto.Progress{
Priority: result.GetProgress().GetPriority(),

// We want the client to consume the entire set of chunks - so we manually
Expand All @@ -108,14 +112,14 @@ func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
}
}

response := &v1.SearchResponse{
Files: filesChunk,

Stats: stats,
Progress: progress,
}
return ss.Send(&proto.StreamSearchResponse{
ResponseChunk: &proto.SearchResponse{
Files: filesChunk,

return ss.Send(response)
Stats: stats,
Progress: progress,
},
})
}

_ = chunk.SendAll(sendFunc, result.GetFiles()...)
Expand Down
Loading

0 comments on commit 3ce1f2b

Please sign in to comment.