Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Ports for GRPC and HTTP requests in Query Server #2387

Merged
merged 13 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
queryHostPort = "query.host-port"
queryPort = "query.port"
queryPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)"
queryHTTPHostPort = "query.http-server.host-port"
queryGRPCHostPort = "query.grpc-server.host-port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
Expand All @@ -56,8 +58,12 @@ var tlsFlagsConfig = tlscfg.ServerFlagsConfig{

// QueryOptions holds configuration for query service
type QueryOptions struct {
// HostPort is the host:port address that the query service listens o n
// HostPort is the host:port address that the query service listens on
HostPort string
// HTTPHostPort is the host:port address that the query service listens in on for http requests
HTTPHostPort string
// GRPCHostPort is the host:port address that the query service listens in on for gRPC requests
GRPCHostPort string
// BasePath is the prefix for all UI and API HTTP routes
BasePath string
// StaticAssets is the path for the static assets for the UI (https://github.com/uber/jaeger-ui)
Expand All @@ -66,7 +72,7 @@ type QueryOptions struct {
UIConfig string
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
// TLS configures secure transport
// TLS configures secure transport`
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
TLS tlscfg.Options
// AdditionalHeaders
AdditionalHeaders http.Header
Expand All @@ -78,6 +84,8 @@ type QueryOptions struct {
func AddFlags(flagSet *flag.FlagSet) {
flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`)
flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server")
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
flagSet.String(queryHTTPHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server")
flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server")
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
flagSet.Int(queryPort, 0, queryPortWarning+" see --"+queryHostPort)
flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy")
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
Expand All @@ -86,14 +94,33 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(queryMaxClockSkewAdjust, time.Second, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments")
}

// InitPortsConfigFromViper initializes the port numbers and TLS configuration of ports
func (qOpts *QueryOptions) InitPortsConfigFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions {
qOpts.HTTPHostPort = v.GetString(queryHTTPHostPort)
qOpts.GRPCHostPort = v.GetString(queryGRPCHostPort)
qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort))

qOpts.TLS = tlsFlagsConfig.InitFromViper(v)

// query.host-port is not defined and atleast one of query.grpc-server.host-port or query.http-server.host-port is defined
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
// user intends to use the separate flags.
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
if !(v.IsSet(queryHostPort) || v.IsSet(queryPort)) && (v.IsSet(queryHTTPHostPort) || v.IsSet(queryGRPCHostPort)) {
return qOpts
}
logger.Warn(fmt.Sprintf("Use of %s and %s is deprecated. use %s and %s instead", queryPort, queryHostPort, queryHTTPHostPort, queryGRPCHostPort))
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
qOpts.HTTPHostPort = qOpts.HostPort
qOpts.GRPCHostPort = qOpts.HostPort
return qOpts

}

// InitFromViper initializes QueryOptions with properties from viper
func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions {
qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort))
qOpts = qOpts.InitPortsConfigFromViper(v, logger)
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
qOpts.TLS = tlsFlagsConfig.InitFromViper(v)
qOpts.MaxClockSkewAdjust = v.GetDuration(queryMaxClockSkewAdjust)

stringSlice := v.GetStringSlice(queryAdditionalHeaders)
Expand Down
51 changes: 51 additions & 0 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/mocks"
spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)
Expand Down Expand Up @@ -60,6 +61,56 @@ func TestQueryBuilderFlags(t *testing.T) {
assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust)
}

func TestQueryBuilderFlagsSeparatePorts(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
"--query.static-files=/dev/null",
"--query.ui-config=some.json",
"--query.base-path=/jaeger",
"--query.http-server.host-port=127.0.0.1:8080",
"--query.additional-headers=access-control-allow-origin:blerg",
"--query.additional-headers=whatever:thing",
"--query.max-clock-skew-adjustment=10s",
})
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())
assert.Equal(t, "/dev/null", qOpts.StaticAssets)
assert.Equal(t, "some.json", qOpts.UIConfig)
assert.Equal(t, "/jaeger", qOpts.BasePath)
assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryGRPC), qOpts.GRPCHostPort)

assert.Equal(t, http.Header{
"Access-Control-Allow-Origin": []string{"blerg"},
"Whatever": []string{"thing"},
}, qOpts.AdditionalHeaders)
assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust)
}

func TestQueryBuilderFlagsSeparateNoPorts(t *testing.T) {
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--query.static-files=/dev/null",
"--query.ui-config=some.json",
"--query.base-path=/jaeger",
"--query.additional-headers=access-control-allow-origin:blerg",
"--query.additional-headers=whatever:thing",
"--query.max-clock-skew-adjustment=10s",
})
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())
assert.Equal(t, "/dev/null", qOpts.StaticAssets)
assert.Equal(t, "some.json", qOpts.UIConfig)
assert.Equal(t, "/jaeger", qOpts.BasePath)
assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HTTPHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.GRPCHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HostPort)

assert.Equal(t, http.Header{
"Access-Control-Allow-Origin": []string{"blerg"},
"Whatever": []string{"thing"},
}, qOpts.AdditionalHeaders)
assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust)
}

func TestQueryBuilderBadHeadersFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
Expand Down
107 changes: 88 additions & 19 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/netutils"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -42,13 +43,27 @@ type Server struct {
tracer opentracing.Tracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
httpConn net.Listener
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
unavailableChannel chan healthcheck.Status
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {

httpPort, err := ports.HostPortToPort(options.HTTPHostPort)
if err != nil {
return nil, err
}
grpcPort, err := ports.HostPortToPort(options.GRPCHostPort)
if err != nil {
return nil, err
}
separatePorts := (grpcPort != httpPort)
rjs211 marked this conversation as resolved.
Show resolved Hide resolved

grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
if err != nil {
return nil, err
Expand All @@ -61,6 +76,7 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que
tracer: tracer,
grpcServer: grpcServer,
httpServer: createHTTPServer(querySvc, options, tracer, logger),
separatePorts: separatePorts,
unavailableChannel: make(chan healthcheck.Status),
}, nil
}
Expand Down Expand Up @@ -117,11 +133,27 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions,
}
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
// initListener initialises listeners of the server
func (s *Server) initListener() (cmux.CMux, error) {
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
var err error
s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPCHostPort)
if err != nil {
return nil, err
}

s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTPHostPort)
if err != nil {
return nil, err
}
s.logger.Info("Query server started")
return nil, nil
}

// old behavior using cmux
conn, err := net.Listen("tcp", s.queryOptions.HostPort)
if err != nil {
return err
return nil, err
}
s.conn = conn

Expand All @@ -138,16 +170,46 @@ func (s *Server) Start() error {
// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

grpcListener := cmuxServer.MatchWithWriters(
s.grpcConn = cmuxServer.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
)
httpListener := cmuxServer.Match(cmux.Any())
s.httpConn = cmuxServer.Match(cmux.Any())
s.queryOptions.HTTPHostPort = s.queryOptions.HostPort
s.queryOptions.GRPCHostPort = s.queryOptions.HostPort

return cmuxServer, nil

}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
cmuxServer, err := s.initListener()
if err != nil {
return err
}

var tcpPort int
if !s.separatePorts {
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
tcpPort = port
}

rjs211 marked this conversation as resolved.
Show resolved Hide resolved
}
var httpPort int
if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil {
httpPort = port
}

var grpcPort int
if port, err := netutils.GetPort(s.grpcConn.Addr()); err == nil {
grpcPort = port
}

go func() {
s.logger.Info("Starting HTTP server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
s.logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort))

switch err := s.httpServer.Serve(httpListener); err {
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
// normal exit, nothing to do
default:
Expand All @@ -158,25 +220,27 @@ func (s *Server) Start() error {

// Start GRPC server concurrently
go func() {
s.logger.Info("Starting GRPC server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
s.logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort))

if err := s.grpcServer.Serve(grpcListener); err != nil {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()

// Start cmux server concurrently.
go func() {
s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
if !s.separatePorts {
go func() {
s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))

err := cmuxServer.Serve()
// TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()
err := cmuxServer.Serve()
// TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()
}

return nil
}
Expand All @@ -186,6 +250,11 @@ func (s *Server) Close() error {
s.queryOptions.TLS.Close()
s.grpcServer.Stop()
s.httpServer.Close()
s.conn.Close()
if s.separatePorts {
s.httpConn.Close()
s.grpcConn.Close()
} else {
s.conn.Close()
}
return nil
}
Loading