Skip to content

Commit e4c4ba5

Browse files
committed
Unify with common gRPC client options.
Signed-off-by: Tom Wilkie <[email protected]>
1 parent e0468d8 commit e4c4ba5

File tree

4 files changed

+16
-10
lines changed

4 files changed

+16
-10
lines changed

Diff for: pkg/chunk/gcp/bigtable_index_client.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ type storageClientV1 struct {
6565
// NewStorageClientV1 returns a new v1 StorageClient.
6666
func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
6767
opts := instrumentation()
68-
opts = append(opts, option.WithGRPCDialOption(cfg.GRPCClientConfig.DialOption()))
68+
for _, dialOption := range cfg.GRPCClientConfig.DialOptions() {
69+
opts = append(opts, option.WithGRPCDialOption(dialOption))
70+
}
6971

7072
client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...)
7173
if err != nil {

Diff for: pkg/ingester/client/client.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,8 @@ func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error
5050
middleware.StreamClientUserHeaderInterceptor,
5151
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
5252
)),
53-
cfg.GRPCClientConfig.DialOption(),
54-
grpc.WithBalancer(grpc.RoundRobin(grpcclient.NewPoolResolver(10))),
5553
}
54+
opts = append(opts, cfg.GRPCClientConfig.DialOptions()...)
5655
conn, err := grpc.Dial(addr, opts...)
5756
if err != nil {
5857
return nil, err

Diff for: pkg/querier/frontend/worker.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,14 @@ func (w *worker) process(ctx context.Context, c Frontend_ProcessClient) error {
212212
}
213213

214214
func (w *worker) connect(address string) (FrontendClient, error) {
215-
conn, err := grpc.Dial(
216-
address,
215+
opts := []grpc.DialOption{
217216
grpc.WithInsecure(),
218217
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
219218
middleware.ClientUserHeaderInterceptor,
220219
)),
221-
w.cfg.GRPCClientConfig.DialOption(),
222-
)
220+
}
221+
opts = append(opts, w.cfg.GRPCClientConfig.DialOptions()...)
222+
conn, err := grpc.Dial(address, opts...)
223223
if err != nil {
224224
return nil, err
225225
}

Diff for: pkg/util/grpcclient/grpcclient.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ type Config struct {
1111
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
1212
MaxSendMsgSize int `yaml:"max_send_msg_size"`
1313
UseGzipCompression bool `yaml:"use_gzip_compression"`
14+
ConnectionPoolSize int `yaml:"connect_pool_size"`
1415
}
1516

1617
// RegisterFlags registers flags.
1718
func (cfg *Config) RegisterFlags(prefix string, f *flag.FlagSet) {
1819
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
1920
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
2021
f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.")
22+
f.IntVar(&cfg.ConnectionPoolSize, prefix+".grpc-connection-pool-size", 1, "Number of connections to keep active per endpoint.")
2123
}
2224

2325
// CallOptions returns the config in terms of CallOptions.
@@ -31,7 +33,10 @@ func (cfg *Config) CallOptions() []grpc.CallOption {
3133
return opts
3234
}
3335

34-
// DialOption returns the config as a grpc.DialOptions.
35-
func (cfg *Config) DialOption() grpc.DialOption {
36-
return grpc.WithDefaultCallOptions(cfg.CallOptions()...)
36+
// DialOptions returns the config as a grpc.DialOptions.
37+
func (cfg *Config) DialOptions() []grpc.DialOption {
38+
return []grpc.DialOption{
39+
grpc.WithDefaultCallOptions(cfg.CallOptions()...),
40+
grpc.WithBalancer(grpc.RoundRobin(NewPoolResolver(cfg.ConnectionPoolSize))),
41+
}
3742
}

0 commit comments

Comments
 (0)