Skip to content

Commit 2d276fa

Browse files
author
Annanay
committed
Use cmux, use stream in GetTraceResponse, regenerate *pb.go and swagger doc
Signed-off-by: Annanay <[email protected]>
1 parent d19f54b commit 2d276fa

File tree

7 files changed

+603
-1233
lines changed

7 files changed

+603
-1233
lines changed

Diff for: cmd/query/app/grpc_handler.go

+16-32
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2019 Jaegertracing Authors.
1+
// Copyright (c) 2019 The Jaeger Authors.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -24,58 +24,42 @@ import (
2424
"go.uber.org/zap"
2525
"google.golang.org/grpc"
2626

27+
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
2728
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
2829
"github.com/jaegertracing/jaeger/storage/dependencystore"
2930
"github.com/jaegertracing/jaeger/storage/spanstore"
3031
)
3132

3233
// GRPCHandler implements the GRPC endpoint of the query service.
3334
type GRPCHandler struct {
34-
spanReader spanstore.Reader
35-
archiveSpanReader spanstore.Reader
36-
dependencyReader dependencystore.Reader
37-
logger *zap.Logger
35+
queryService querysvc.QueryService
36+
logger *zap.Logger
37+
tracer opentracing.Tracer
3838
}
3939

4040
// NewGRPCHandler returns a GRPCHandler
41-
func NewGRPCHandler(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options ...HandlerOption) *GRPCHandler {
41+
func NewGRPCHandler(queryService querysvc.QueryService, logger *zap.Logger, tracer opentracing.Tracer) *GRPCHandler {
4242
gH := &GRPCHandler{
43-
spanReader: spanReader,
44-
dependencyReader: dependencyReader,
45-
}
46-
47-
if gH.logger == nil {
48-
gH.logger = zap.NewNop()
43+
queryService: queryService,
44+
logger: logger,
45+
tracer: tracer,
4946
}
5047

5148
return gH
5249
}
5350

54-
// NewCombinedHandler returns a handler where GRPC and HTTP are multiplexed.
55-
func NewCombinedHandler(grpcServer *grpc.Server, recoveryHandler http.Handler) http.Handler {
56-
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
57-
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
58-
grpcServer.ServeHTTP(w, r)
59-
} else {
60-
recoveryHandler.ServeHTTP(w, r)
61-
}
62-
})
63-
}
64-
6551
// GetTrace is the GRPC handler to fetch traces.
6652
func (g *GRPCHandler) GetTrace(ctx context.Context, r *api_v2.GetTraceRequest) (*api_v2.GetTraceResponse, error) {
6753
ID := r.GetId()
6854

69-
trace, err := g.spanReader.GetTrace(ctx, ID)
55+
trace, err := g.queryService.GetTrace(ctx, ID)
7056
if err == spanstore.ErrTraceNotFound {
71-
if g.archiveSpanReader == nil {
72-
return &api_v2.GetTraceResponse{}, err
73-
}
74-
75-
trace, err = g.archiveSpanReader.GetTrace(ctx, traceID)
76-
if err != nil {
77-
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
78-
}
57+
g.logger.Error("trace not found", zap.Error(err))
58+
return nil, err
59+
}
60+
if err != nil {
61+
g.logger.Error("Could not fetch spans from backend", zap.Error(err))
62+
return nil, err
7963
}
8064

8165
return &api_v2.GetTraceResponse{Trace: trace}, nil

Diff for: cmd/query/main.go

+32-21
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/gorilla/handlers"
2828
"github.com/opentracing/opentracing-go"
29+
"github.com/soheilhy/cmux"
2930
"github.com/spf13/cobra"
3031
"github.com/spf13/viper"
3132
jaegerClientConfig "github.com/uber/jaeger-client-go/config"
@@ -144,43 +145,53 @@ func main() {
144145
r.Handle(mBldr.HTTPRoute, h)
145146
}
146147

147-
portStr := ":" + strconv.Itoa(queryOpts.Port)
148148
compressHandler := handlers.CompressHandler(r)
149149
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)
150150

151+
// Create HTTP Server
152+
httpServer := &http.Server{
153+
Handler: recoveryHandler(compressHandler),
154+
}
155+
151156
// Create GRPC Server.
152157
grpcServer := grpc.NewServer()
153158

154-
// Add logging.
155159
grpclog.SetLoggerV2(grpclog.NewLoggerV2(ioutil.Discard, os.Stderr, os.Stderr))
156160

157-
// Register GRPC Handler.
158-
spanReaderGRPC, err := storageFactory.CreateSpanReader()
159-
if err != nil {
160-
logger.Fatal("Failed to create span reader", zap.Error(err))
161-
}
162-
spanReaderGRPC = storageMetrics.NewReadMetricsDecorator(spanReaderGRPC, baseFactory.Namespace(metrics.NSOptions{Name: "query-grpc", Tags: nil}))
163-
dependencyReaderGRPC, err := storageFactory.CreateDependencyReader()
161+
grpcHandler := app.NewGRPCHandler(queryService, logger, tracer)
162+
api_v2.RegisterQueryServiceHandler(grpcServer, grpcHandler)
163+
164+
// Prepare cmux conn.
165+
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", queryOpts.Port))
164166
if err != nil {
165-
logger.Fatal("Failed to create dependency reader", zap.Error(err))
167+
logger.Fatal("Could not start listener", zap.Error(err))
166168
}
167169

168-
grpcHandler := NewGRPCHandler(spanReaderGRPC, dependencyReaderGRPC, apiHandlerOptions)
169-
api_v2.RegisterQueryServiceHandler(grpcServer, grpcHandler)
170+
// Create cmux server.
171+
// cmux will reverse-proxy between HTTP and GRPC backends.
172+
s := cmux.New(conn)
173+
174+
// Add GRPC and HTTP listeners.
175+
grpcL := s.Match(
176+
cmux.HTTP2HeaderField("content-type", "application/grpc"),
177+
cmux.HTTP2HeaderField("content-type", "application/grpc+proto"))
178+
httpL := s.Match(cmux.Any())
170179

171180
go func() {
172-
logger.Info("Starting HTTP+GRPC server", zap.Int("port", queryOpts.Port))
173-
conn, err := net.Listen("tcp", fmt.Sprintf(":%d", queryOpts.Port))
174-
if err != nil {
175-
logger.Fatal("Could not launch service", zap.Error(err))
176-
}
177-
178-
// Multiplexed server.
179-
// Use CombinedHandler here, which will "reverse-proxy" between HTTP and GRPC backends.
180-
srv.Serve(conn, NewCombinedHandler(grpcServer, recoveryHandler(compressHandler)))
181+
logger.Info("Starting HTTP server", zap.Int("port", queryOpts.Port))
182+
httpServer.Serve(httpL)
181183
hc.Set(healthcheck.Unavailable)
182184
}()
183185

186+
go func() {
187+
logger.Info("Starting GRPC server", zap.Int("port", queryOpts.Port))
188+
grpcServer.Serve(grpcL)
189+
hc.Set(healthcheck.Unavailable)
190+
}()
191+
192+
// Start cmux server.
193+
s.Serve()
194+
184195
hc.Ready()
185196
<-serverChannel
186197
logger.Info("Shutdown complete")

0 commit comments

Comments
 (0)