Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: add prometheus server and client prometheus metrics #642

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions api_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"math/rand"
"reflect"

proto "github.com/sourcegraph/zoekt/grpc/v1"
proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -243,11 +243,11 @@ func (lfm *LineFragmentMatch) ToProto() *proto.LineFragmentMatch {

func FlushReasonFromProto(p proto.FlushReason) FlushReason {
switch p {
case proto.FlushReason_TIMER_EXPIRED:
case proto.FlushReason_FLUSH_REASON_TIMER_EXPIRED:
return FlushReasonTimerExpired
case proto.FlushReason_FINAL_FLUSH:
case proto.FlushReason_FLUSH_REASON_FINAL_FLUSH:
return FlushReasonFinalFlush
case proto.FlushReason_MAX_SIZE:
case proto.FlushReason_FLUSH_REASON_MAX_SIZE:
return FlushReasonMaxSize
default:
return FlushReason(0)
Expand All @@ -257,13 +257,13 @@ func FlushReasonFromProto(p proto.FlushReason) FlushReason {
func (fr FlushReason) ToProto() proto.FlushReason {
switch fr {
case FlushReasonTimerExpired:
return proto.FlushReason_TIMER_EXPIRED
return proto.FlushReason_FLUSH_REASON_TIMER_EXPIRED
case FlushReasonFinalFlush:
return proto.FlushReason_FINAL_FLUSH
return proto.FlushReason_FLUSH_REASON_FINAL_FLUSH
case FlushReasonMaxSize:
return proto.FlushReason_MAX_SIZE
return proto.FlushReason_FLUSH_REASON_MAX_SIZE
default:
return proto.FlushReason_UNKNOWN
return proto.FlushReason_FLUSH_REASON_UNKNOWN_UNSPECIFIED
}
}

Expand Down Expand Up @@ -345,6 +345,14 @@ func (p *Progress) ToProto() *proto.Progress {
}
}

func SearchResultFromStreamProto(p *proto.StreamSearchResponse, repoURLs, lineFragments map[string]string) *SearchResult {
if p == nil {
return nil
}

return SearchResultFromProto(p.GetResponseChunk(), repoURLs, lineFragments)
}

func SearchResultFromProto(p *proto.SearchResponse, repoURLs, lineFragments map[string]string) *SearchResult {
if p == nil {
return nil
Expand Down Expand Up @@ -384,6 +392,14 @@ func (sr *SearchResult) ToProto() *proto.SearchResponse {
}
}

func (sr *SearchResult) ToStreamProto() *proto.StreamSearchResponse {
if sr == nil {
return nil
}

return &proto.StreamSearchResponse{ResponseChunk: sr.ToProto()}
}

func RepositoryBranchFromProto(p *proto.RepositoryBranch) RepositoryBranch {
return RepositoryBranch{
Name: p.GetName(),
Expand Down
57 changes: 39 additions & 18 deletions api_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
webproto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
"google.golang.org/protobuf/proto"

v1 "github.com/sourcegraph/zoekt/grpc/v1"
)

func TestProtoRoundtrip(t *testing.T) {
Expand Down Expand Up @@ -133,23 +132,45 @@ func TestProtoRoundtrip(t *testing.T) {
})

t.Run("SearchResult", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string
t.Run("unary", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string

if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
}

if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
p1 := f1.ToProto()
f2 := SearchResultFromProto(p1, repoURLs, lineFragments)

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
})

p1 := f1.ToProto()
f2 := SearchResultFromProto(p1, repoURLs, lineFragments)
t.Run("stream", func(t *testing.T) {
f := func(f1 *SearchResult) bool {
var repoURLs map[string]string
var lineFragments map[string]string

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
if f1 != nil {
repoURLs = f1.RepoURLs
lineFragments = f1.LineFragments
}

p1 := f1.ToStreamProto()
f2 := SearchResultFromStreamProto(p1, repoURLs, lineFragments)

return reflect.DeepEqual(f1, f2)
}
if err := quick.Check(f, nil); err != nil {
t.Fatal(err)
}
})
})

t.Run("Repository", func(t *testing.T) {
Expand Down Expand Up @@ -396,8 +417,8 @@ var (
exampleSearchResultBytes []byte

// The proto struct representation of the search result
exampleSearchResultProto = func() *v1.SearchResponse {
sr := new(v1.SearchResponse)
exampleSearchResultProto = func() *webproto.SearchResponse {
sr := new(webproto.SearchResponse)
err := proto.Unmarshal(exampleSearchResultBytes, sr)
if err != nil {
panic(err)
Expand Down Expand Up @@ -451,7 +472,7 @@ func BenchmarkProtoRoundtrip(b *testing.B) {
}

for _, buf := range buffers {
res := new(v1.SearchResponse)
res := new(webproto.SearchResponse)
err := proto.Unmarshal(buf, res)
if err != nil {
b.Fatal(err)
Expand Down
3 changes: 1 addition & 2 deletions cmd/zoekt-sourcegraph-indexserver/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"time"

"github.com/sourcegraph/log/logtest"

proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -28,7 +28,6 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"

"github.com/sourcegraph/zoekt"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
)

func TestIterateIndexOptions_Fingerprint(t *testing.T) {
Expand Down
28 changes: 27 additions & 1 deletion cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"text/tabwriter"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2"
"github.com/keegancsmith/tmpfriend"
"github.com/peterbourgon/ff/v3/ffcli"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
sglog "github.com/sourcegraph/log"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"github.com/sourcegraph/zoekt/grpc/internalerrs"
"github.com/sourcegraph/zoekt/grpc/messagesize"
"go.uber.org/automaxprocs/maxprocs"
Expand All @@ -49,7 +51,6 @@ import (

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/build"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"github.com/sourcegraph/zoekt/debugserver"
"github.com/sourcegraph/zoekt/internal/profiler"
)
Expand Down Expand Up @@ -126,6 +127,9 @@ var (
Name: "index_num_stopped_tracking_total",
Help: "Counts the number of repos we stopped tracking.",
})

clientMetricsOnce sync.Once
clientMetrics *grpc_prometheus.ClientMetrics
)

// set of repositories that we want to capture separate indexing metrics for
Expand Down Expand Up @@ -1457,14 +1461,18 @@ func internalActorStreamInterceptor() grpc.StreamClientInterceptor {
const defaultGRPCMessageReceiveSizeBytes = 90 * 1024 * 1024 // 90 MB

func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.DialOption) (proto.ZoektConfigurationServiceClient, error) {
metrics := mustGetClientMetrics()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainStreamInterceptor(
grpc_prometheus.StreamClientInterceptor(metrics),
internalActorStreamInterceptor(),
internalerrs.LoggingStreamClientInterceptor(logger),
internalerrs.PrometheusStreamClientInterceptor,
),
grpc.WithChainUnaryInterceptor(
grpc_prometheus.UnaryClientInterceptor(metrics),
internalActorUnaryInterceptor(),
internalerrs.LoggingUnaryClientInterceptor(logger),
internalerrs.PrometheusUnaryClientInterceptor,
Expand Down Expand Up @@ -1494,6 +1502,24 @@ func dialGRPCClient(addr string, logger sglog.Logger, additionalOpts ...grpc.Dia
return client, nil
}

// mustGetClientMetrics returns a singleton instance of the client metrics
// that are shared across all gRPC clients that this process creates.
//
// This function panics if the metrics cannot be registered with the default
// Prometheus registry.
func mustGetClientMetrics() *grpc_prometheus.ClientMetrics {
clientMetricsOnce.Do(func() {
clientMetrics = grpc_prometheus.NewRegisteredClientMetrics(prometheus.DefaultRegisterer,
grpc_prometheus.WithClientCounterOptions(),
grpc_prometheus.WithClientHandlingTimeHistogram(), // record the overall request latency for a gRPC request
grpc_prometheus.WithClientStreamRecvHistogram(), // record how long it takes for a client to receive a message during a streaming RPC
grpc_prometheus.WithClientStreamSendHistogram(), // record how long it takes for a client to send a message during a streaming RPC
)
})

return clientMetrics
}

// addDefaultPort adds a default port to a URL if one is not specified.
//
// If the URL scheme is "http" and no port is specified, "80" is used.
Expand Down
1 change: 0 additions & 1 deletion cmd/zoekt-sourcegraph-indexserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

sglog "github.com/sourcegraph/log"
"github.com/sourcegraph/log/logtest"

"github.com/xeipuuv/gojsonschema"
"google.golang.org/grpc"

Expand Down
2 changes: 1 addition & 1 deletion cmd/zoekt-sourcegraph-indexserver/sg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

"github.com/go-git/go-git/v5"
retryablehttp "github.com/hashicorp/go-retryablehttp"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
"golang.org/x/net/trace"
"google.golang.org/grpc"

"github.com/sourcegraph/zoekt"
proto "github.com/sourcegraph/zoekt/cmd/zoekt-sourcegraph-indexserver/protos/sourcegraph/zoekt/configuration/v1"
)

// SourcegraphListResult is the return value of Sourcegraph.List. It is its
Expand Down
44 changes: 24 additions & 20 deletions grpc/server/server.go → cmd/zoekt-webserver/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"math"

"github.com/sourcegraph/zoekt/grpc/chunk"
proto "github.com/sourcegraph/zoekt/grpc/protos/zoekt/webserver/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/sourcegraph/zoekt"
v1 "github.com/sourcegraph/zoekt/grpc/v1"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/stream"
)
Expand All @@ -21,11 +21,11 @@ func NewServer(s zoekt.Streamer) *Server {
}

type Server struct {
v1.UnimplementedWebserverServiceServer
proto.UnimplementedWebserverServiceServer
streamer zoekt.Streamer
}

func (s *Server) Search(ctx context.Context, req *v1.SearchRequest) (*v1.SearchResponse, error) {
func (s *Server) Search(ctx context.Context, req *proto.SearchRequest) (*proto.SearchResponse, error) {
q, err := query.QFromProto(req.GetQuery())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -39,23 +39,25 @@ func (s *Server) Search(ctx context.Context, req *v1.SearchRequest) (*v1.SearchR
return res.ToProto(), nil
}

func (s *Server) StreamSearch(req *v1.SearchRequest, ss v1.WebserverService_StreamSearchServer) error {
q, err := query.QFromProto(req.GetQuery())
func (s *Server) StreamSearch(req *proto.StreamSearchRequest, ss proto.WebserverService_StreamSearchServer) error {
request := req.GetRequest()

q, err := query.QFromProto(request.GetQuery())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

sender := gRPCChunkSender(ss)
sampler := stream.NewSamplingSender(sender)

err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(req.GetOpts()), sampler)
err = s.streamer.StreamSearch(ss.Context(), q, zoekt.SearchOptionsFromProto(request.GetOpts()), sampler)
if err == nil {
sampler.Flush()
}
return err
}

func (s *Server) List(ctx context.Context, req *v1.ListRequest) (*v1.ListResponse, error) {
func (s *Server) List(ctx context.Context, req *proto.ListRequest) (*proto.ListResponse, error) {
q, err := query.QFromProto(req.GetQuery())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -70,12 +72,14 @@ func (s *Server) List(ctx context.Context, req *v1.ListRequest) (*v1.ListRespons
}

// gRPCChunkSender is a zoekt.Sender that sends small chunks of FileMatches to the provided gRPC stream.
func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
func gRPCChunkSender(ss proto.WebserverService_StreamSearchServer) zoekt.Sender {
f := func(r *zoekt.SearchResult) {
result := r.ToProto()
result := r.ToStreamProto().GetResponseChunk()

if len(result.GetFiles()) == 0 { // stats-only result, send it immediately
_ = ss.Send(result)
_ = ss.Send(&proto.StreamSearchResponse{
ResponseChunk: result,
})
return
}

Expand All @@ -84,10 +88,10 @@ func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
statsSent := false
numFilesSent := 0

sendFunc := func(filesChunk []*v1.FileMatch) error {
sendFunc := func(filesChunk []*proto.FileMatch) error {
numFilesSent += len(filesChunk)

var stats *v1.Stats
var stats *proto.Stats
if !statsSent { // We only send stats back on the first chunk
statsSent = true
stats = result.GetStats()
Expand All @@ -96,7 +100,7 @@ func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
progress := result.GetProgress()

if numFilesSent < len(result.GetFiles()) { // more chunks to come
progress = &v1.Progress{
progress = &proto.Progress{
Priority: result.GetProgress().GetPriority(),

// We want the client to consume the entire set of chunks - so we manually
Expand All @@ -108,14 +112,14 @@ func gRPCChunkSender(ss v1.WebserverService_StreamSearchServer) zoekt.Sender {
}
}

response := &v1.SearchResponse{
Files: filesChunk,

Stats: stats,
Progress: progress,
}
return ss.Send(&proto.StreamSearchResponse{
ResponseChunk: &proto.SearchResponse{
Files: filesChunk,

return ss.Send(response)
Stats: stats,
Progress: progress,
},
})
}

_ = chunk.SendAll(sendFunc, result.GetFiles()...)
Expand Down
Loading
Loading