diff --git a/CHANGELOG.md b/CHANGELOG.md index ccd3c8394..f20521261 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,5 +6,7 @@ * [CHANGE] Removed global metrics for KV package. Making a KV object will now require a prometheus registerer that will be used to register all relevant KV class metrics. #22 * [CHANGE] Added CHANGELOG.md and Pull Request template to reference the changelog * [CHANGE] Remove `cortex_` prefix for metrics registered in the ring. #46 +* [CHANGE] Rename `kv/kvtls` to `crypto/tls`. #39 * [ENHANCEMENT] Add middleware package. #38 * [ENHANCEMENT] Add limiter package. #41 +* [ENHANCEMENT] Add grpcclient, grpcencoding and grpcutil packages. #39 \ No newline at end of file diff --git a/kv/kvtls/test/ca.go b/crypto/tls/test/ca.go similarity index 100% rename from kv/kvtls/test/ca.go rename to crypto/tls/test/ca.go diff --git a/kv/kvtls/test/tls_integration_test.go b/crypto/tls/test/tls_integration_test.go similarity index 95% rename from kv/kvtls/test/tls_integration_test.go rename to crypto/tls/test/tls_integration_test.go index 23ca7069e..0e9d5f899 100644 --- a/kv/kvtls/test/tls_integration_test.go +++ b/crypto/tls/test/tls_integration_test.go @@ -27,13 +27,13 @@ import ( "google.golang.org/grpc/keepalive" "github.com/grafana/dskit/backoff" - "github.com/grafana/dskit/kv/kvtls" + "github.com/grafana/dskit/crypto/tls" ) type tcIntegrationClientServer struct { name string tlsGrpcEnabled bool - tlsConfig kvtls.ClientConfig + tlsConfig tls.ClientConfig httpExpectError func(*testing.T, error) grpcExpectError func(*testing.T, error) } @@ -183,14 +183,14 @@ func TestServerWithoutTlsEnabled(t *testing.T) { []tcIntegrationClientServer{ { name: "no-config", - tlsConfig: kvtls.ClientConfig{}, + tlsConfig: tls.ClientConfig{}, httpExpectError: errorContainsString("http: server gave HTTP response to HTTPS client"), grpcExpectError: nil, }, { name: "tls-enable", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{}, + tlsConfig: tls.ClientConfig{}, httpExpectError: errorContainsString("http: server gave HTTP response to HTTPS client"), grpcExpectError: errorContainsString("transport: authentication handshake failed: tls: first record does not look like a TLS handshake"), }, @@ -218,7 +218,7 @@ func TestServerWithLocalhostCertNoClientCertAuth(t *testing.T) { []tcIntegrationClientServer{ { name: "no-config", - tlsConfig: kvtls.ClientConfig{}, + tlsConfig: tls.ClientConfig{}, httpExpectError: errorContainsString("x509: certificate signed by unknown authority"), // For GRPC we expect this error as we try to connect without TLS to a TLS enabled server grpcExpectError: unavailableDescErr, @@ -226,21 +226,21 @@ func TestServerWithLocalhostCertNoClientCertAuth(t *testing.T) { { name: "grpc-tls-enabled", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{}, + tlsConfig: tls.ClientConfig{}, httpExpectError: errorContainsString("x509: certificate signed by unknown authority"), grpcExpectError: errorContainsString("x509: certificate signed by unknown authority"), }, { name: "tls-skip-verify", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ InsecureSkipVerify: true, }, }, { name: "tls-skip-verify-no-grpc-tls-enabled", tlsGrpcEnabled: false, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ InsecureSkipVerify: true, }, grpcExpectError: unavailableDescErr, @@ -248,14 +248,14 @@ func TestServerWithLocalhostCertNoClientCertAuth(t *testing.T) { { name: "ca-path-set", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, }, }, { name: "ca-path-no-grpc-tls-enabled", tlsGrpcEnabled: false, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, }, grpcExpectError: unavailableDescErr, @@ -283,7 +283,7 @@ func TestServerWithoutLocalhostCertNoClientCertAuth(t *testing.T) { []tcIntegrationClientServer{ { name: "no-config", - tlsConfig: kvtls.ClientConfig{}, + tlsConfig: tls.ClientConfig{}, httpExpectError: errorContainsString("x509: certificate is valid for my-other-name, not localhost"), // For GRPC we expect this error as we try to connect without TLS to a TLS enabled server grpcExpectError: unavailableDescErr, @@ -291,14 +291,14 @@ func TestServerWithoutLocalhostCertNoClientCertAuth(t *testing.T) { { name: "grpc-tls-enabled", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{}, + tlsConfig: tls.ClientConfig{}, httpExpectError: errorContainsString("x509: certificate is valid for my-other-name, not localhost"), grpcExpectError: errorContainsString("x509: certificate is valid for my-other-name, not localhost"), }, { name: "ca-path", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, }, httpExpectError: errorContainsString("x509: certificate is valid for my-other-name, not localhost"), @@ -307,7 +307,7 @@ func TestServerWithoutLocalhostCertNoClientCertAuth(t *testing.T) { { name: "server-name", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, ServerName: "my-other-name", }, @@ -315,7 +315,7 @@ func TestServerWithoutLocalhostCertNoClientCertAuth(t *testing.T) { { name: "tls-skip-verify", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ InsecureSkipVerify: true, }, }, @@ -352,7 +352,7 @@ func TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA { name: "tls-skip-verify", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ InsecureSkipVerify: true, }, httpExpectError: badCertErr, @@ -361,7 +361,7 @@ func TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA { name: "ca-path", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, }, httpExpectError: badCertErr, @@ -370,7 +370,7 @@ func TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA { name: "ca-path-and-client-cert-ca1", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, CertPath: certs.client1CertFile, KeyPath: certs.client1KeyFile, @@ -379,7 +379,7 @@ func TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA { name: "tls-skip-verify-and-client-cert-ca1", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ InsecureSkipVerify: true, CertPath: certs.client1CertFile, KeyPath: certs.client1KeyFile, @@ -388,7 +388,7 @@ func TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA { name: "ca-cert-and-client-cert-ca2", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, CertPath: certs.client2CertFile, KeyPath: certs.client2KeyFile, @@ -423,7 +423,7 @@ func TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA { name: "ca-cert-and-client-cert-ca1", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, CertPath: certs.client1CertFile, KeyPath: certs.client1KeyFile, @@ -432,7 +432,7 @@ func TestTLSServerWithLocalhostCertWithClientCertificateEnforcementUsingClientCA { name: "ca-cert-and-client-cert-ca2", tlsGrpcEnabled: true, - tlsConfig: kvtls.ClientConfig{ + tlsConfig: tls.ClientConfig{ CAPath: certs.caCertFile, CertPath: certs.client2CertFile, KeyPath: certs.client2KeyFile, @@ -577,8 +577,8 @@ type grpcConfig struct { BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` BackoffConfig backoff.Config `yaml:"backoff_config"` - TLSEnabled bool `yaml:"tls_enabled"` - TLS kvtls.ClientConfig `yaml:",inline"` + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` } // RegisterFlags registers flags. diff --git a/kv/kvtls/tls.go b/crypto/tls/tls.go similarity index 99% rename from kv/kvtls/tls.go rename to crypto/tls/tls.go index 2338e02ba..9886b208d 100644 --- a/kv/kvtls/tls.go +++ b/crypto/tls/tls.go @@ -1,4 +1,4 @@ -package kvtls +package tls import ( "crypto/tls" diff --git a/kv/kvtls/tls_test.go b/crypto/tls/tls_test.go similarity index 92% rename from kv/kvtls/tls_test.go rename to crypto/tls/tls_test.go index 079c358cc..122a21c33 100644 --- a/kv/kvtls/tls_test.go +++ b/crypto/tls/tls_test.go @@ -1,9 +1,8 @@ -package kvtls +package tls import ( - "fmt" - "io/ioutil" "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -70,32 +69,28 @@ type x509Paths struct { } func newTestX509Files(t *testing.T, cert, key, ca []byte) x509Paths { - // create empty file - certsPath, err := ioutil.TempDir("", "*-x509") - require.NoError(t, err) + t.Helper() - t.Cleanup(func() { - os.RemoveAll(certsPath) - }) + certsPath := t.TempDir() paths := x509Paths{ - cert: fmt.Sprintf("%s/cert.pem", certsPath), - key: fmt.Sprintf("%s/key.pem", certsPath), - ca: fmt.Sprintf("%s/ca.pem", certsPath), + cert: filepath.Join(certsPath, "cert.pem"), + key: filepath.Join(certsPath, "key.pem"), + ca: filepath.Join(certsPath, "ca.pem"), } if cert != nil { - err = ioutil.WriteFile(paths.cert, cert, 0600) + err := os.WriteFile(paths.cert, cert, 0600) require.NoError(t, err) } if key != nil { - err = ioutil.WriteFile(paths.key, key, 0600) + err := os.WriteFile(paths.key, key, 0600) require.NoError(t, err) } if ca != nil { - err = ioutil.WriteFile(paths.ca, ca, 0600) + err := os.WriteFile(paths.ca, ca, 0600) require.NoError(t, err) } diff --git a/go.mod b/go.mod index 7bdfad196..4ad5dc184 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,8 @@ require ( github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-sockaddr v1.0.2 github.com/hashicorp/memberlist v0.2.3 - github.com/opentracing/opentracing-go v1.2.0 // indirect + github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e + github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.26.0 diff --git a/go.sum b/go.sum index b175bc0f2..56c23c16a 100644 --- a/go.sum +++ b/go.sum @@ -162,6 +162,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xC github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/api v1.9.1 h1:SngrdG2L62qqLsUz85qcPhFZ78rPf8tcD5qjMgs6MME= github.com/hashicorp/consul/api v1.9.1/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M= @@ -304,8 +305,9 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= -github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02 h1:0R5mDLI66Qw13qN80TRz85zthQ2nf2+uDyiV23w6c3Q= github.com/opentracing-contrib/go-grpc v0.0.0-20180928155321-4b5a12d3ff02/go.mod h1:JNdpVEzCpXBgIiv4ds+TzhN1hrtxq6ClLrTlT9OQRSc= +github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e h1:4cPxUYdgaGzZIT5/j0IfqOrrXmq6bG8AwvwisMXpdrg= +github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e/go.mod h1:DYR5Eij8rJl8h7gblRrOZ8g0kW1umSpKqYIBTgeDtLo= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9 h1:QsgXACQhd9QJhEmRumbsMQQvBtmdS0mafoVEBplWXEg= github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w= @@ -495,6 +497,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190921015927-1a5e07d1ff72/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= diff --git a/grpcclient/backoff_retry.go b/grpcclient/backoff_retry.go new file mode 100644 index 000000000..21abbb786 --- /dev/null +++ b/grpcclient/backoff_retry.go @@ -0,0 +1,31 @@ +package grpcclient + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/grafana/dskit/backoff" +) + +// NewBackoffRetry gRPC middleware. +func NewBackoffRetry(cfg backoff.Config) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + backoff := backoff.New(ctx, cfg) + for backoff.Ongoing() { + err := invoker(ctx, method, req, reply, cc, opts...) + if err == nil { + return nil + } + + if status.Code(err) != codes.ResourceExhausted { + return err + } + + backoff.Wait() + } + return backoff.Err() + } +} diff --git a/grpcclient/grpcclient.go b/grpcclient/grpcclient.go new file mode 100644 index 000000000..6114d15d8 --- /dev/null +++ b/grpcclient/grpcclient.go @@ -0,0 +1,103 @@ +package grpcclient + +import ( + "flag" + "time" + + "github.com/go-kit/kit/log" + middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/keepalive" + + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/crypto/tls" + "github.com/grafana/dskit/grpcencoding/snappy" +) + +// Config for a gRPC client. +type Config struct { + MaxRecvMsgSize int `yaml:"max_recv_msg_size"` + MaxSendMsgSize int `yaml:"max_send_msg_size"` + GRPCCompression string `yaml:"grpc_compression"` + RateLimit float64 `yaml:"rate_limit"` + RateLimitBurst int `yaml:"rate_limit_burst"` + + BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` + BackoffConfig backoff.Config `yaml:"backoff_config"` + + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +// RegisterFlags registers flags. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + +// RegisterFlagsWithPrefix registers flags with prefix. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") + f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).") + f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)") + f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") + f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") + f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.") + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS in the GRPC client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to gRPC server will be used.") + + cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix, f) + + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) +} + +func (cfg *Config) Validate(log log.Logger) error { + switch cfg.GRPCCompression { + case gzip.Name, snappy.Name, "": + // valid + default: + return errors.Errorf("unsupported compression type: %s", cfg.GRPCCompression) + } + return nil +} + +// CallOptions returns the config in terms of CallOptions. +func (cfg *Config) CallOptions() []grpc.CallOption { + var opts []grpc.CallOption + opts = append(opts, grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize)) + opts = append(opts, grpc.MaxCallSendMsgSize(cfg.MaxSendMsgSize)) + if cfg.GRPCCompression != "" { + opts = append(opts, grpc.UseCompressor(cfg.GRPCCompression)) + } + return opts +} + +// DialOption returns the config as a grpc.DialOptions. +func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) { + var opts []grpc.DialOption + tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) + if err != nil { + return nil, err + } + opts = append(opts, tlsOpts...) + + if cfg.BackoffOnRatelimits { + unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...) + } + + if cfg.RateLimit > 0 { + unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...) + } + + return append( + opts, + grpc.WithDefaultCallOptions(cfg.CallOptions()...), + grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)), + grpc.WithStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Second * 20, + Timeout: time.Second * 10, + PermitWithoutStream: true, + }), + ), nil +} diff --git a/grpcclient/instrumentation.go b/grpcclient/instrumentation.go new file mode 100644 index 000000000..b22a58834 --- /dev/null +++ b/grpcclient/instrumentation.go @@ -0,0 +1,23 @@ +package grpcclient + +import ( + otgrpc "github.com/opentracing-contrib/go-grpc" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/middleware" + "google.golang.org/grpc" + + dsmiddleware "github.com/grafana/dskit/middleware" +) + +func Instrument(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + return []grpc.UnaryClientInterceptor{ + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.ClientUserHeaderInterceptor, + dsmiddleware.PrometheusGRPCUnaryInstrumentation(requestDuration), + }, []grpc.StreamClientInterceptor{ + otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), + middleware.StreamClientUserHeaderInterceptor, + dsmiddleware.PrometheusGRPCStreamInstrumentation(requestDuration), + } +} diff --git a/grpcclient/ratelimit.go b/grpcclient/ratelimit.go new file mode 100644 index 000000000..59ba3b7f0 --- /dev/null +++ b/grpcclient/ratelimit.go @@ -0,0 +1,26 @@ +package grpcclient + +import ( + "context" + + "golang.org/x/time/rate" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// NewRateLimiter creates a UnaryClientInterceptor for client side rate limiting. +func NewRateLimiter(cfg *Config) grpc.UnaryClientInterceptor { + burst := cfg.RateLimitBurst + if burst == 0 { + burst = int(cfg.RateLimit) + } + limiter := rate.NewLimiter(rate.Limit(cfg.RateLimit), burst) + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + err := limiter.Wait(ctx) + if err != nil { + return status.Error(codes.ResourceExhausted, err.Error()) + } + return invoker(ctx, method, req, reply, cc, opts...) + } +} diff --git a/grpcclient/ratelimit_test.go b/grpcclient/ratelimit_test.go new file mode 100644 index 000000000..ad9f2f2ff --- /dev/null +++ b/grpcclient/ratelimit_test.go @@ -0,0 +1,36 @@ +package grpcclient_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/grafana/dskit/grpcclient" +) + +func TestRateLimiterFailureResultsInResourceExhaustedError(t *testing.T) { + config := grpcclient.Config{ + RateLimitBurst: 0, + RateLimit: 0, + } + conn := grpc.ClientConn{} + invoker := func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error { + return nil + } + + limiter := grpcclient.NewRateLimiter(&config) + err := limiter(context.Background(), "methodName", "", "expectedReply", &conn, invoker) + + if se, ok := err.(interface { + GRPCStatus() *status.Status + }); ok { + assert.Equal(t, se.GRPCStatus().Code(), codes.ResourceExhausted) + assert.Equal(t, se.GRPCStatus().Message(), "rate: Wait(n=1) exceeds limiter's burst 0") + } else { + assert.Fail(t, "Could not convert error into expected Status type") + } +} diff --git a/grpcencoding/snappy/snappy.go b/grpcencoding/snappy/snappy.go new file mode 100644 index 000000000..fe01b4ca3 --- /dev/null +++ b/grpcencoding/snappy/snappy.go @@ -0,0 +1,87 @@ +package snappy + +import ( + "io" + "sync" + + "github.com/golang/snappy" + "google.golang.org/grpc/encoding" +) + +// Name is the name registered for the snappy compressor. +const Name = "snappy" + +func init() { + encoding.RegisterCompressor(newCompressor()) +} + +type compressor struct { + writersPool sync.Pool + readersPool sync.Pool +} + +func newCompressor() *compressor { + c := &compressor{} + c.readersPool = sync.Pool{ + New: func() interface{} { + return snappy.NewReader(nil) + }, + } + c.writersPool = sync.Pool{ + New: func() interface{} { + return snappy.NewBufferedWriter(nil) + }, + } + return c +} + +func (c *compressor) Name() string { + return Name +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + wr := c.writersPool.Get().(*snappy.Writer) + wr.Reset(w) + return writeCloser{wr, &c.writersPool}, nil +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + dr := c.readersPool.Get().(*snappy.Reader) + dr.Reset(r) + return reader{dr, &c.readersPool}, nil +} + +type writeCloser struct { + writer *snappy.Writer + pool *sync.Pool +} + +func (w writeCloser) Write(p []byte) (n int, err error) { + return w.writer.Write(p) +} + +func (w writeCloser) Close() error { + defer func() { + w.writer.Reset(nil) + w.pool.Put(w.writer) + }() + + if w.writer != nil { + return w.writer.Close() + } + return nil +} + +type reader struct { + reader *snappy.Reader + pool *sync.Pool +} + +func (r reader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + if err == io.EOF { + r.reader.Reset(nil) + r.pool.Put(r.reader) + } + return n, err +} diff --git a/grpcencoding/snappy/snappy_test.go b/grpcencoding/snappy/snappy_test.go new file mode 100644 index 000000000..315136f90 --- /dev/null +++ b/grpcencoding/snappy/snappy_test.go @@ -0,0 +1,71 @@ +package snappy + +import ( + "bytes" + "io" + "io/ioutil" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSnappy(t *testing.T) { + c := newCompressor() + assert.Equal(t, "snappy", c.Name()) + + tests := []struct { + test string + input string + }{ + {"empty", ""}, + {"short", "hello world"}, + {"long", strings.Repeat("123456789", 1024)}, + } + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + var buf bytes.Buffer + // Compress + w, err := c.Compress(&buf) + require.NoError(t, err) + n, err := w.Write([]byte(test.input)) + require.NoError(t, err) + assert.Len(t, test.input, n) + err = w.Close() + require.NoError(t, err) + // Decompress + r, err := c.Decompress(&buf) + require.NoError(t, err) + out, err := ioutil.ReadAll(r) + require.NoError(t, err) + assert.Equal(t, test.input, string(out)) + }) + } +} + +func BenchmarkSnappyCompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + b.ResetTimer() + for i := 0; i < b.N; i++ { + w, _ := c.Compress(ioutil.Discard) + _, _ = w.Write(data) + _ = w.Close() + } +} + +func BenchmarkSnappyDecompress(b *testing.B) { + data := []byte(strings.Repeat("123456789", 1024)) + c := newCompressor() + var buf bytes.Buffer + w, _ := c.Compress(&buf) + _, _ = w.Write(data) + reader := bytes.NewReader(buf.Bytes()) + b.ResetTimer() + for i := 0; i < b.N; i++ { + r, _ := c.Decompress(reader) + _, _ = ioutil.ReadAll(r) + _, _ = reader.Seek(0, io.SeekStart) + } +} diff --git a/grpcutil/dns_resolver.go b/grpcutil/dns_resolver.go new file mode 100644 index 000000000..ae5dc5cb4 --- /dev/null +++ b/grpcutil/dns_resolver.go @@ -0,0 +1,284 @@ +package grpcutil + +// Copied from https://github.com/grpc/grpc-go/tree/v1.29.x/naming. + +import ( + "context" + "errors" + "fmt" + "net" + "strconv" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" +) + +const ( + defaultPort = "443" + defaultFreq = time.Minute * 30 +) + +var ( + errMissingAddr = errors.New("missing address") + errWatcherClose = errors.New("watcher has been closed") + + lookupHost = net.DefaultResolver.LookupHost + lookupSRV = net.DefaultResolver.LookupSRV +) + +// NewDNSResolverWithFreq creates a DNS Resolver that can resolve DNS names, and +// create watchers that poll the DNS server using the frequency set by freq. +func NewDNSResolverWithFreq(freq time.Duration, logger log.Logger) (Resolver, error) { + return &dnsResolver{ + logger: logger, + freq: freq, + }, nil +} + +// NewDNSResolver creates a DNS Resolver that can resolve DNS names, and create +// watchers that poll the DNS server using the default frequency defined by defaultFreq. +func NewDNSResolver(logger log.Logger) (Resolver, error) { + return NewDNSResolverWithFreq(defaultFreq, logger) +} + +// dnsResolver handles name resolution for names following the DNS scheme +type dnsResolver struct { + logger log.Logger + // frequency of polling the DNS server that the watchers created by this resolver will use. + freq time.Duration +} + +// formatIP returns ok = false if addr is not a valid textual representation of an IP address. +// If addr is an IPv4 address, return the addr and ok = true. +// If addr is an IPv6 address, return the addr enclosed in square brackets and ok = true. +func formatIP(addr string) (addrIP string, ok bool) { + ip := net.ParseIP(addr) + if ip == nil { + return "", false + } + if ip.To4() != nil { + return addr, true + } + return "[" + addr + "]", true +} + +// parseTarget takes the user input target string, returns formatted host and port info. +// If target doesn't specify a port, set the port to be the defaultPort. +// If target is in IPv6 format and host-name is enclosed in square brackets, brackets +// are stripped when setting the host. +// examples: +// target: "www.google.com" returns host: "www.google.com", port: "443" +// target: "ipv4-host:80" returns host: "ipv4-host", port: "80" +// target: "[ipv6-host]" returns host: "ipv6-host", port: "443" +// target: ":80" returns host: "localhost", port: "80" +// target: ":" returns host: "localhost", port: "443" +func parseTarget(target string) (host, port string, err error) { + if target == "" { + return "", "", errMissingAddr + } + + if ip := net.ParseIP(target); ip != nil { + // target is an IPv4 or IPv6(without brackets) address + return target, defaultPort, nil + } + if host, port, err := net.SplitHostPort(target); err == nil { + // target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port + if host == "" { + // Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed. + host = "localhost" + } + if port == "" { + // If the port field is empty(target ends with colon), e.g. "[::1]:", defaultPort is used. + port = defaultPort + } + return host, port, nil + } + if host, port, err := net.SplitHostPort(target + ":" + defaultPort); err == nil { + // target doesn't have port + return host, port, nil + } + return "", "", fmt.Errorf("invalid target address %v", target) +} + +// Resolve creates a watcher that watches the name resolution of the target. +func (r *dnsResolver) Resolve(target string) (Watcher, error) { + host, port, err := parseTarget(target) + if err != nil { + return nil, err + } + + if net.ParseIP(host) != nil { + ipWatcher := &ipWatcher{ + updateChan: make(chan *Update, 1), + } + host, _ = formatIP(host) + ipWatcher.updateChan <- &Update{Op: Add, Addr: host + ":" + port} + return ipWatcher, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + return &dnsWatcher{ + r: r, + logger: r.logger, + host: host, + port: port, + ctx: ctx, + cancel: cancel, + t: time.NewTimer(0), + }, nil +} + +// dnsWatcher watches for the name resolution update for a specific target +type dnsWatcher struct { + r *dnsResolver + logger log.Logger + host string + port string + // The latest resolved address set + curAddrs map[string]*Update + ctx context.Context + cancel context.CancelFunc + t *time.Timer +} + +// ipWatcher watches for the name resolution update for an IP address. +type ipWatcher struct { + updateChan chan *Update +} + +// Next returns the address resolution Update for the target. For IP address, +// the resolution is itself, thus polling name server is unnecessary. Therefore, +// Next() will return an Update the first time it is called, and will be blocked +// for all following calls as no Update exists until watcher is closed. +func (i *ipWatcher) Next() ([]*Update, error) { + u, ok := <-i.updateChan + if !ok { + return nil, errWatcherClose + } + return []*Update{u}, nil +} + +// Close closes the ipWatcher. +func (i *ipWatcher) Close() { + close(i.updateChan) +} + +// AddressType indicates the address type returned by name resolution. +type AddressType uint8 + +const ( + // Backend indicates the server is a backend server. + Backend AddressType = iota + // GRPCLB indicates the server is a grpclb load balancer. + GRPCLB +) + +// AddrMetadataGRPCLB contains the information the name resolver for grpclb should provide. The +// name resolver used by the grpclb balancer is required to provide this type of metadata in +// its address updates. +type AddrMetadataGRPCLB struct { + // AddrType is the type of server (grpc load balancer or backend). + AddrType AddressType + // ServerName is the name of the grpc load balancer. Used for authentication. + ServerName string +} + +// compileUpdate compares the old resolved addresses and newly resolved addresses, +// and generates an update list +func (w *dnsWatcher) compileUpdate(newAddrs map[string]*Update) []*Update { + var res []*Update + for a, u := range w.curAddrs { + if _, ok := newAddrs[a]; !ok { + u.Op = Delete + res = append(res, u) + } + } + for a, u := range newAddrs { + if _, ok := w.curAddrs[a]; !ok { + res = append(res, u) + } + } + return res +} + +func (w *dnsWatcher) lookupSRV() map[string]*Update { + newAddrs := make(map[string]*Update) + _, srvs, err := lookupSRV(w.ctx, "grpclb", "tcp", w.host) + if err != nil { + level.Info(w.logger).Log("msg", "failed DNS SRV record lookup", "err", err) + return nil + } + for _, s := range srvs { + lbAddrs, err := lookupHost(w.ctx, s.Target) + if err != nil { + level.Warn(w.logger).Log("msg", "failed load balancer address DNS lookup", "err", err) + continue + } + for _, a := range lbAddrs { + a, ok := formatIP(a) + if !ok { + level.Error(w.logger).Log("failed IP parsing", "err", err) + continue + } + addr := a + ":" + strconv.Itoa(int(s.Port)) + newAddrs[addr] = &Update{Addr: addr, + Metadata: AddrMetadataGRPCLB{AddrType: GRPCLB, ServerName: s.Target}} + } + } + return newAddrs +} + +func (w *dnsWatcher) lookupHost() map[string]*Update { + newAddrs := make(map[string]*Update) + addrs, err := lookupHost(w.ctx, w.host) + if err != nil { + level.Warn(w.logger).Log("msg", "failed DNS A record lookup", "err", err) + return nil + } + for _, a := range addrs { + a, ok := formatIP(a) + if !ok { + level.Error(w.logger).Log("msg", "failed IP parsing", "err", err) + continue + } + addr := a + ":" + w.port + newAddrs[addr] = &Update{Addr: addr} + } + return newAddrs +} + +func (w *dnsWatcher) lookup() []*Update { + newAddrs := w.lookupSRV() + if newAddrs == nil { + // If failed to get any balancer address (either no corresponding SRV for the + // target, or caused by failure during resolution/parsing of the balancer target), + // return any A record info available. + newAddrs = w.lookupHost() + } + result := w.compileUpdate(newAddrs) + w.curAddrs = newAddrs + return result +} + +// Next returns the resolved address update(delta) for the target. If there's no +// change, it will sleep for 30 mins and try to resolve again after that. +func (w *dnsWatcher) Next() ([]*Update, error) { + for { + select { + case <-w.ctx.Done(): + return nil, errWatcherClose + case <-w.t.C: + } + result := w.lookup() + // Next lookup should happen after an interval defined by w.r.freq. + w.t.Reset(w.r.freq) + if len(result) > 0 { + return result, nil + } + } +} + +func (w *dnsWatcher) Close() { + w.cancel() +} diff --git a/grpcutil/health_check.go b/grpcutil/health_check.go new file mode 100644 index 000000000..2b567b368 --- /dev/null +++ b/grpcutil/health_check.go @@ -0,0 +1,53 @@ +package grpcutil + +import ( + "context" + + "github.com/gogo/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/grafana/dskit/services" +) + +// HealthCheck fulfills the grpc_health_v1.HealthServer interface by ensuring +// the services being managed by the provided service manager are healthy. +type HealthCheck struct { + sm *services.Manager +} + +// NewHealthCheck returns a new HealthCheck for the provided service manager. +func NewHealthCheck(sm *services.Manager) *HealthCheck { + return &HealthCheck{ + sm: sm, + } +} + +// Check implements the grpc healthcheck. +func (h *HealthCheck) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + if !h.isHealthy() { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil + } + + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil +} + +// Watch implements the grpc healthcheck. +func (h *HealthCheck) Watch(_ *grpc_health_v1.HealthCheckRequest, _ grpc_health_v1.Health_WatchServer) error { + return status.Error(codes.Unimplemented, "Watching is not supported") +} + +// isHealthy returns whether the instance should be considered healthy. +func (h *HealthCheck) isHealthy() bool { + states := h.sm.ServicesByState() + + // Given this is an health check endpoint for the whole instance, we should consider + // it healthy after all services have been started (running) and until all + // services are terminated. Some services, like ingesters, are still + // fully functioning while stopping. + if len(states[services.New]) > 0 || len(states[services.Starting]) > 0 || len(states[services.Failed]) > 0 { + return false + } + + return len(states[services.Running]) > 0 || len(states[services.Stopping]) > 0 +} diff --git a/grpcutil/health_check_test.go b/grpcutil/health_check_test.go new file mode 100644 index 000000000..174f0f686 --- /dev/null +++ b/grpcutil/health_check_test.go @@ -0,0 +1,146 @@ +package grpcutil + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/dskit/services" +) + +func TestHealthCheck_isHealthy(t *testing.T) { + tests := map[string]struct { + states []services.State + expected bool + }{ + "all services are new": { + states: []services.State{services.New, services.New}, + expected: false, + }, + "all services are starting": { + states: []services.State{services.Starting, services.Starting}, + expected: false, + }, + "some services are starting and some running": { + states: []services.State{services.Starting, services.Running}, + expected: false, + }, + "all services are running": { + states: []services.State{services.Running, services.Running}, + expected: true, + }, + "some services are stopping": { + states: []services.State{services.Running, services.Stopping}, + expected: true, + }, + "some services are terminated while others running": { + states: []services.State{services.Running, services.Terminated}, + expected: true, + }, + "all services are stopping": { + states: []services.State{services.Stopping, services.Stopping}, + expected: true, + }, + "some services are terminated while others stopping": { + states: []services.State{services.Stopping, services.Terminated}, + expected: true, + }, + "a service has failed while others are running": { + states: []services.State{services.Running, services.Failed}, + expected: false, + }, + "all services are terminated": { + states: []services.State{services.Terminated, services.Terminated}, + expected: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + var svcs []services.Service + for range testData.states { + svcs = append(svcs, &mockService{}) + } + + sm, err := services.NewManager(svcs...) + require.NoError(t, err) + + // Switch the state of each mocked services. + for i, s := range svcs { + s.(*mockService).switchState(testData.states[i]) + } + + h := NewHealthCheck(sm) + assert.Equal(t, testData.expected, h.isHealthy()) + }) + } +} + +type mockService struct { + services.Service + state services.State + listeners []services.Listener +} + +func (s *mockService) switchState(desiredState services.State) { + // Simulate all the states between the current state and the desired one. + orderedStates := []services.State{services.New, services.Starting, services.Running, services.Failed, services.Stopping, services.Terminated} + simulationStarted := false + + for _, orderedState := range orderedStates { + // Skip until we reach the current state. + if !simulationStarted && orderedState != s.state { + continue + } + + // Start the simulation once we reach the current state. + if orderedState == s.state { + simulationStarted = true + continue + } + + // Skip the failed state, unless it's the desired one. + if orderedState == services.Failed && desiredState != services.Failed { + continue + } + + s.state = orderedState + + // Synchronously call listeners to avoid flaky tests. + for _, listener := range s.listeners { + switch orderedState { + case services.Starting: + listener.Starting() + case services.Running: + listener.Running() + case services.Stopping: + listener.Stopping(services.Running) + case services.Failed: + listener.Failed(services.Running, errors.New("mocked error")) + case services.Terminated: + listener.Terminated(services.Stopping) + } + } + + if orderedState == desiredState { + break + } + } +} + +func (s *mockService) State() services.State { + return s.state +} + +func (s *mockService) AddListener(listener services.Listener) { + s.listeners = append(s.listeners, listener) +} + +func (s *mockService) StartAsync(_ context.Context) error { return nil } +func (s *mockService) AwaitRunning(_ context.Context) error { return nil } +func (s *mockService) StopAsync() {} +func (s *mockService) AwaitTerminated(_ context.Context) error { return nil } +func (s *mockService) FailureCase() error { return nil } diff --git a/grpcutil/naming.go b/grpcutil/naming.go new file mode 100644 index 000000000..441b3ad9f --- /dev/null +++ b/grpcutil/naming.go @@ -0,0 +1,41 @@ +package grpcutil + +// Copied from https://github.com/grpc/grpc-go/tree/v1.29.x/naming. + +// Operation defines the corresponding operations for a name resolution change. +type Operation uint8 + +const ( + // Add indicates a new address is added. + Add Operation = iota + // Delete indicates an existing address is deleted. + Delete +) + +// Update defines a name resolution update. Notice that it is not valid having both +// empty string Addr and nil Metadata in an Update. +type Update struct { + // Op indicates the operation of the update. + Op Operation + // Addr is the updated address. It is empty string if there is no address update. + Addr string + // Metadata is the updated metadata. It is nil if there is no metadata update. + // Metadata is not required for a custom naming implementation. + Metadata interface{} +} + +// Resolver creates a Watcher for a target to track its resolution changes. +type Resolver interface { + // Resolve creates a Watcher for target. + Resolve(target string) (Watcher, error) +} + +// Watcher watches for the updates on the specified target. +type Watcher interface { + // Next blocks until an update or error happens. It may return one or more + // updates. The first call should get the full set of the results. It should + // return an error if and only if Watcher cannot recover. + Next() ([]*Update, error) + // Close closes the Watcher. + Close() +} diff --git a/grpcutil/util.go b/grpcutil/util.go new file mode 100644 index 000000000..858a0e862 --- /dev/null +++ b/grpcutil/util.go @@ -0,0 +1,17 @@ +package grpcutil + +import ( + "github.com/gogo/status" + "google.golang.org/grpc/codes" +) + +// IsGRPCContextCanceled returns whether the input error is a GRPC error wrapping +// the context.Canceled error. +func IsGRPCContextCanceled(err error) bool { + s, ok := status.FromError(err) + if !ok { + return false + } + + return s.Code() == codes.Canceled +} diff --git a/kv/etcd/etcd.go b/kv/etcd/etcd.go index 963cc3a34..e72922ea3 100644 --- a/kv/etcd/etcd.go +++ b/kv/etcd/etcd.go @@ -14,9 +14,9 @@ import ( "go.etcd.io/etcd/pkg/transport" "github.com/grafana/dskit/backoff" + dstls "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv/codec" - "github.com/grafana/dskit/kv/kvtls" ) // Config for a new etcd.Client. @@ -25,7 +25,7 @@ type Config struct { DialTimeout time.Duration `yaml:"dial_timeout"` MaxRetries int `yaml:"max_retries"` EnableTLS bool `yaml:"tls_enabled"` - TLS kvtls.ClientConfig `yaml:",inline"` + TLS dstls.ClientConfig `yaml:",inline"` UserName string `yaml:"username"` Password string `yaml:"password"` diff --git a/kv/memberlist/tcp_transport.go b/kv/memberlist/tcp_transport.go index 4c6badee0..19cf57d72 100644 --- a/kv/memberlist/tcp_transport.go +++ b/kv/memberlist/tcp_transport.go @@ -21,8 +21,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" + dstls "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/kv/kvtls" ) type messageType uint8 @@ -58,7 +58,7 @@ type TCPTransportConfig struct { MetricsNamespace string `yaml:"-"` TLSEnabled bool `yaml:"tls_enabled"` - TLS kvtls.ClientConfig `yaml:",inline"` + TLS dstls.ClientConfig `yaml:",inline"` } // RegisterFlags registers flags.