From 269e396c67cf40a1eedb2c5d3108c99ecd490ed4 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Thu, 5 Aug 2021 15:40:27 -0700 Subject: [PATCH 1/2] add memcached auto-discovery support from thanos (#4412) Signed-off-by: Roy Chiang --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 12 ++ docs/blocks-storage/store-gateway.md | 12 ++ docs/configuration/arguments.md | 2 + docs/configuration/config-file-reference.md | 12 ++ go.mod | 2 +- go.sum | 5 +- pkg/storage/tsdb/memcache_client_config.go | 3 + .../thanos/pkg/block/metadata/meta.go | 1 + .../thanos/pkg/cacheutil/memcached_client.go | 57 ++++++--- .../thanos/pkg/discovery/memcache/provider.go | 114 ++++++++++++++++++ .../thanos/pkg/discovery/memcache/resolver.go | 111 +++++++++++++++++ .../thanos-io/thanos/pkg/objstore/testing.go | 2 +- .../thanos/pkg/promclient/promclient.go | 9 +- vendor/modules.txt | 3 +- 15 files changed, 319 insertions(+), 27 deletions(-) create mode 100644 vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go create mode 100644 vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/resolver.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d525e89fdf..06ece104b9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ * [ENHANCEMENT] Exemplars are now emitted for all gRPC calls and many operations tracked by histograms. #4462 * [ENHANCEMENT] New options `-server.http-listen-network` and `-server.grpc-listen-network` allow binding as 'tcp4' or 'tcp6'. #4462 * [ENHANCEMENT] Rulers: Using shuffle sharding subring on GetRules API. #4466 +* [ENHANCEMENT] Support memcached auto-discovery via `auto-discovery` flag, introduced by thanos in https://github.com/thanos-io/thanos/pull/4487. Both AWS and Google Cloud memcached service support auto-discovery, which returns a list of nodes of the memcached cluster. #4412 * [BUGFIX] Fixes a panic in the query-tee when comparing result. #4465 * [BUGFIX] Frontend: Fixes @ modifier functions (start/end) when splitting queries by time. #4464 * [BUGFIX] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index a7080d4eb2a..8870a428cc6 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -512,6 +512,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery + [auto_discovery: | default = false] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend @@ -559,6 +563,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.auto-discovery + [auto_discovery: | default = false] + # Size of each subrange that bucket object is split into for better # caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size @@ -625,6 +633,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.auto-discovery + [auto_discovery: | default = false] + # How long to cache list of tenants in the bucket. # CLI flag: -blocks-storage.bucket-store.metadata-cache.tenants-list-ttl [tenants_list_ttl: | default = 15m] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index a00297a4455..ca48d4ff8b9 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -576,6 +576,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery + [auto_discovery: | default = false] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend @@ -623,6 +627,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.auto-discovery + [auto_discovery: | default = false] + # Size of each subrange that bucket object is split into for better # caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size @@ -689,6 +697,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.auto-discovery + [auto_discovery: | default = false] + # How long to cache list of tenants in the bucket. # CLI flag: -blocks-storage.bucket-store.metadata-cache.tenants-list-ttl [tenants_list_ttl: | default = 15m] diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 5080d313eb4..745048b0258 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -533,6 +533,8 @@ The DNS service discovery, inspired from Thanos DNS SD, supports different disco If **no prefix** is provided, the provided IP or hostname will be used straightaway without pre-resolving it. +If you are using a managed memcached service from [Google Cloud](https://cloud.google.com/memorystore/docs/memcached/auto-discovery-overview), or [AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.HowAutoDiscoveryWorks.html), use the [auto-discovery](./config-file-reference.md#memcached-client-config) flag instead of DNS discovery. + ## Logging of IP of reverse proxy If a reverse proxy is used in front of Cortex it might be diffult to troubleshoot errors. The following 3 settings can be used to log the IP address passed along by the reverse proxy in headers like X-Forwarded-For. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 138f43b93cd..8aa5424a857 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4765,6 +4765,10 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery + [auto_discovery: | default = false] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend @@ -4812,6 +4816,10 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.auto-discovery + [auto_discovery: | default = false] + # Size of each subrange that bucket object is split into for better caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size [subrange_size: | default = 16000] @@ -4877,6 +4885,10 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-item-size [max_item_size: | default = 1048576] + # Use memcached auto-discovery instead of DNS resolution + # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.auto-discovery + [auto_discovery: | default = false] + # How long to cache list of tenants in the bucket. # CLI flag: -blocks-storage.bucket-store.metadata-cache.tenants-list-ttl [tenants_list_ttl: | default = 15m] diff --git a/go.mod b/go.mod index 35ecfd76003..beba2b426bc 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/sony/gobreaker v0.4.1 github.com/spf13/afero v1.2.2 github.com/stretchr/testify v1.7.0 - github.com/thanos-io/thanos v0.22.0 + github.com/thanos-io/thanos v0.19.1-0.20210803192524-baea4ce9ef52 github.com/uber/jaeger-client-go v2.29.1+incompatible github.com/weaveworks/common v0.0.0-20210901124008-1fa3f9fa874c go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 36a5c7ecde5..b1328d15c67 100644 --- a/go.sum +++ b/go.sum @@ -1715,8 +1715,8 @@ github.com/thanos-io/thanos v0.13.1-0.20210204123931-82545cdd16fe/go.mod h1:ZLDG github.com/thanos-io/thanos v0.13.1-0.20210224074000-659446cab117/go.mod h1:kdqFpzdkveIKpNNECVJd75RPvgsAifQgJymwCdfev1w= github.com/thanos-io/thanos v0.13.1-0.20210226164558-03dace0a1aa1/go.mod h1:gMCy4oCteKTT7VuXVvXLTPGzzjovX1VPE5p+HgL1hyU= github.com/thanos-io/thanos v0.13.1-0.20210401085038-d7dff0c84d17/go.mod h1:zU8KqE+6A+HksK4wiep8e/3UvCZLm+Wrw9AqZGaAm9k= -github.com/thanos-io/thanos v0.22.0 h1:bHTzC0ZaP5rBJ2pOeJ73+hO/p3U4tSshJoR3pumM6sA= -github.com/thanos-io/thanos v0.22.0/go.mod h1:SZDWz3phcUcBr4MYFoPFRvl+Z9Nbi45HlwQlwSZSt+Q= +github.com/thanos-io/thanos v0.19.1-0.20210803192524-baea4ce9ef52 h1:aRxV+ebdkd5XqY1+vkwmdOpc66OpWtcvSF1e0qiRKsU= +github.com/thanos-io/thanos v0.19.1-0.20210803192524-baea4ce9ef52/go.mod h1:Xskx78e0CYL6w0yDNOZHGdvwQMlsuzPsePmPtbp9Xuk= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -1900,6 +1900,7 @@ go.uber.org/atomic v1.8.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.2.0/go.mod h1:YfO3fm683kQpzETxlTGZhGIVmXAhaw3gxeBADbpZtnU= +go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= diff --git a/pkg/storage/tsdb/memcache_client_config.go b/pkg/storage/tsdb/memcache_client_config.go index 60c1a4e2861..9d736b6207c 100644 --- a/pkg/storage/tsdb/memcache_client_config.go +++ b/pkg/storage/tsdb/memcache_client_config.go @@ -18,6 +18,7 @@ type MemcachedClientConfig struct { MaxGetMultiConcurrency int `yaml:"max_get_multi_concurrency"` MaxGetMultiBatchSize int `yaml:"max_get_multi_batch_size"` MaxItemSize int `yaml:"max_item_size"` + AutoDiscovery bool `yaml:"auto_discovery"` } func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { @@ -29,6 +30,7 @@ func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefi f.IntVar(&cfg.MaxGetMultiConcurrency, prefix+"max-get-multi-concurrency", 100, "The maximum number of concurrent connections running get operations. If set to 0, concurrency is unlimited.") f.IntVar(&cfg.MaxGetMultiBatchSize, prefix+"max-get-multi-batch-size", 0, "The maximum number of keys a single underlying get operation should run. If more keys are specified, internally keys are split into multiple batches and fetched concurrently, honoring the max concurrency. If set to 0, the max batch size is unlimited.") f.IntVar(&cfg.MaxItemSize, prefix+"max-item-size", 1024*1024, "The maximum size of an item stored in memcached. Bigger items are not stored. If set to 0, no maximum size is enforced.") + f.BoolVar(&cfg.AutoDiscovery, prefix+"auto-discovery", false, "Use memcached auto-discovery instead of DNS resolution") } func (cfg *MemcachedClientConfig) GetAddresses() []string { @@ -59,5 +61,6 @@ func (cfg MemcachedClientConfig) ToMemcachedClientConfig() cacheutil.MemcachedCl MaxGetMultiBatchSize: cfg.MaxGetMultiBatchSize, MaxItemSize: model.Bytes(cfg.MaxItemSize), DNSProviderUpdateInterval: 30 * time.Second, + AutoDiscovery: cfg.AutoDiscovery, } } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go index 0cbda37e191..f02c09a977c 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go @@ -114,6 +114,7 @@ func (m *Matchers) UnmarshalYAML(value *yaml.Node) (err error) { type DeletionRequest struct { Matchers Matchers `json:"matchers" yaml:"matchers"` Intervals tombstones.Intervals `json:"intervals,omitempty" yaml:"intervals,omitempty"` + RequestID string `json:"request_id,omitempty" yaml:"request_id,omitempty"` } type File struct { diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go index cb5adcb0d03..445138b3e6d 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go @@ -21,6 +21,7 @@ import ( "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/discovery/dns" + memcacheDiscovery "github.com/thanos-io/thanos/pkg/discovery/memcache" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" @@ -53,6 +54,7 @@ var ( MaxGetMultiConcurrency: 100, MaxGetMultiBatchSize: 0, DNSProviderUpdateInterval: 10 * time.Second, + AutoDiscovery: false, } ) @@ -114,6 +116,9 @@ type MemcachedClientConfig struct { // DNSProviderUpdateInterval specifies the DNS discovery update interval. DNSProviderUpdateInterval time.Duration `yaml:"dns_provider_update_interval"` + + // AutoDiscovery configures memached client to perform auto-discovery instead of DNS resolution + AutoDiscovery bool `yaml:"auto_discovery"` } func (c *MemcachedClientConfig) validate() error { @@ -153,8 +158,8 @@ type memcachedClient struct { // Name provides an identifier for the instantiated Client name string - // DNS provider used to keep the memcached servers list updated. - dnsProvider *dns.Provider + // Address provider used to keep the memcached servers list updated. + addressProvider AddressProvider // Channel used to notify internal goroutines when they should quit. stop chan struct{} @@ -177,6 +182,15 @@ type memcachedClient struct { dataSize *prometheus.HistogramVec } +// AddressProvider performs node address resolution given a list of clusters. +type AddressProvider interface { + // Resolves the provided list of memcached cluster to the actual nodes + Resolve(context.Context, []string) error + + // Returns the nodes + Addresses() []string +} + type memcachedGetMultiResult struct { items map[string]*memcache.Item err error @@ -220,20 +234,31 @@ func newMemcachedClient( reg prometheus.Registerer, name string, ) (*memcachedClient, error) { - dnsProvider := dns.NewProvider( - logger, - extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), - dns.GolangResolverType, - ) + promRegisterer := extprom.WrapRegistererWithPrefix("thanos_memcached_", reg) + + var addressProvider AddressProvider + if config.AutoDiscovery { + addressProvider = memcacheDiscovery.NewProvider( + logger, + promRegisterer, + config.Timeout, + ) + } else { + addressProvider = dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_memcached_", reg), + dns.GolangResolverType, + ) + } c := &memcachedClient{ - logger: log.With(logger, "name", name), - config: config, - client: client, - selector: selector, - dnsProvider: dnsProvider, - asyncQueue: make(chan func(), config.MaxAsyncBufferSize), - stop: make(chan struct{}, 1), + logger: log.With(logger, "name", name), + config: config, + client: client, + selector: selector, + addressProvider: addressProvider, + asyncQueue: make(chan func(), config.MaxAsyncBufferSize), + stop: make(chan struct{}, 1), getMultiGate: gate.New( extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg), config.MaxGetMultiConcurrency, @@ -561,11 +586,11 @@ func (c *memcachedClient) resolveAddrs() error { defer cancel() // If some of the dns resolution fails, log the error. - if err := c.dnsProvider.Resolve(ctx, c.config.Addresses); err != nil { + if err := c.addressProvider.Resolve(ctx, c.config.Addresses); err != nil { level.Error(c.logger).Log("msg", "failed to resolve addresses for memcached", "addresses", strings.Join(c.config.Addresses, ","), "err", err) } // Fail in case no server address is resolved. - servers := c.dnsProvider.Addresses() + servers := c.addressProvider.Addresses() if len(servers) == 0 { return fmt.Errorf("no server address resolved for %s", c.name) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go b/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go new file mode 100644 index 00000000000..9bb13178240 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/provider.go @@ -0,0 +1,114 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package memcache + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/errutil" + "github.com/thanos-io/thanos/pkg/extprom" +) + +// Provider is a stateful cache for asynchronous memcached auto-discovery resolution. It provides a way to resolve +// addresses and obtain them. +type Provider struct { + sync.RWMutex + resolver Resolver + clusterConfigs map[string]*clusterConfig + logger log.Logger + + configVersion *extprom.TxGaugeVec + resolvedAddresses *extprom.TxGaugeVec + resolverFailuresCount prometheus.Counter + resolverLookupsCount prometheus.Counter +} + +func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider { + p := &Provider{ + resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout}, + clusterConfigs: map[string]*clusterConfig{}, + configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_config_version", + Help: "The current auto discovery config version", + }, []string{"addr"}), + resolvedAddresses: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Name: "auto_discovery_resolved_addresses", + Help: "The number of memcached nodes found via auto discovery", + }, []string{"addr"}), + resolverLookupsCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_total", + Help: "The number of memcache auto discovery attempts", + }), + resolverFailuresCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "auto_discovery_failures_total", + Help: "The number of memcache auto discovery failures", + }), + logger: logger, + } + return p +} + +// Resolve stores a list of nodes auto-discovered from the provided addresses. +func (p *Provider) Resolve(ctx context.Context, addresses []string) error { + clusterConfigs := map[string]*clusterConfig{} + errs := errutil.MultiError{} + + for _, address := range addresses { + clusterConfig, err := p.resolver.Resolve(ctx, address) + p.resolverLookupsCount.Inc() + + if err != nil { + level.Warn(p.logger).Log( + "msg", "failed to perform auto-discovery for memcached", + "address", address, + ) + errs.Add(err) + p.resolverFailuresCount.Inc() + + // Use cached values. + p.RLock() + clusterConfigs[address] = p.clusterConfigs[address] + p.RUnlock() + } else { + clusterConfigs[address] = clusterConfig + } + } + + p.Lock() + defer p.Unlock() + + p.resolvedAddresses.ResetTx() + p.configVersion.ResetTx() + for address, config := range clusterConfigs { + p.resolvedAddresses.WithLabelValues(address).Set(float64(len(config.nodes))) + p.configVersion.WithLabelValues(address).Set(float64(config.version)) + } + p.resolvedAddresses.Submit() + p.configVersion.Submit() + + p.clusterConfigs = clusterConfigs + + return errs.Err() +} + +// Addresses returns the latest addresses present in the Provider. +func (p *Provider) Addresses() []string { + p.RLock() + defer p.RUnlock() + + var result []string + for _, config := range p.clusterConfigs { + for _, node := range config.nodes { + result = append(result, fmt.Sprintf("%s:%d", node.dns, node.port)) + } + } + return result +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/resolver.go b/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/resolver.go new file mode 100644 index 00000000000..4e7406cfba7 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/discovery/memcache/resolver.go @@ -0,0 +1,111 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package memcache + +import ( + "bufio" + "context" + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/thanos-io/thanos/pkg/runutil" +) + +type clusterConfig struct { + version int + nodes []node +} + +type node struct { + dns string + ip string + port int +} + +type Resolver interface { + Resolve(ctx context.Context, address string) (*clusterConfig, error) +} + +type memcachedAutoDiscovery struct { + dialTimeout time.Duration +} + +func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *clusterConfig, err error) { + conn, err := net.DialTimeout("tcp", address, s.dialTimeout) + if err != nil { + return nil, err + } + defer runutil.CloseWithErrCapture(&err, conn, "closing connection") + + rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + if _, err := fmt.Fprintf(rw, "config get cluster\n"); err != nil { + return nil, err + } + if err := rw.Flush(); err != nil { + return nil, err + } + + config, err = s.parseConfig(rw.Reader) + if err != nil { + return nil, err + } + + return config, err +} + +func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*clusterConfig, error) { + clusterConfig := new(clusterConfig) + + configMeta, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read config metadata: %s", err) + } + configMeta = strings.TrimSpace(configMeta) + + // First line should be "CONFIG cluster 0 [length-of-payload-] + configMetaComponents := strings.Split(configMeta, " ") + if len(configMetaComponents) != 4 { + return nil, fmt.Errorf("expected 4 components in config metadata, and received %d, meta: %s", len(configMetaComponents), configMeta) + } + + configSize, err := strconv.Atoi(configMetaComponents[3]) + if err != nil { + return nil, fmt.Errorf("failed to parse config size from metadata: %s, error: %s", configMeta, err) + } + + configVersion, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to find config version: %s", err) + } + clusterConfig.version, err = strconv.Atoi(strings.TrimSpace(configVersion)) + if err != nil { + return nil, fmt.Errorf("failed to parser config version: %s", err) + } + + nodes, err := reader.ReadString('\n') + if err != nil { + return nil, fmt.Errorf("failed to read nodes: %s", err) + } + + if len(configVersion)+len(nodes) != configSize { + return nil, fmt.Errorf("expected %d in config payload, but got %d instead.", configSize, len(configVersion)+len(nodes)) + } + + for _, host := range strings.Split(strings.TrimSpace(nodes), " ") { + dnsIpPort := strings.Split(host, "|") + if len(dnsIpPort) != 3 { + return nil, fmt.Errorf("node not in expected format: %s", dnsIpPort) + } + port, err := strconv.Atoi(dnsIpPort[2]) + if err != nil { + return nil, fmt.Errorf("failed to parse port: %s, err: %s", dnsIpPort, err) + } + clusterConfig.nodes = append(clusterConfig.nodes, node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port}) + } + + return clusterConfig, nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/objstore/testing.go b/vendor/github.com/thanos-io/thanos/pkg/objstore/testing.go index 6854f15ca92..897772a9d5f 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/objstore/testing.go +++ b/vendor/github.com/thanos-io/thanos/pkg/objstore/testing.go @@ -21,7 +21,7 @@ func CreateTemporaryTestBucketName(t testing.TB) string { src := rand.NewSource(time.Now().UnixNano()) // Bucket name need to conform: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html. - name := strings.Replace(strings.Replace(fmt.Sprintf("test_%x_%s", src.Int63(), strings.ToLower(t.Name())), "_", "-", -1), "/", "-", -1) + name := strings.ReplaceAll(strings.Replace(fmt.Sprintf("test_%x_%s", src.Int63(), strings.ToLower(t.Name())), "_", "-", -1), "/", "-") if len(name) >= 63 { name = name[:63] } diff --git a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go index 989c09b6d57..614bf9df68b 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go +++ b/vendor/github.com/thanos-io/thanos/pkg/promclient/promclient.go @@ -26,6 +26,7 @@ import ( "github.com/gogo/status" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" @@ -175,16 +176,12 @@ func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labe if err := json.Unmarshal(body, &d); err != nil { return nil, errors.Wrapf(err, "unmarshal response: %v", string(body)) } - var cfg struct { - Global struct { - ExternalLabels map[string]string `yaml:"external_labels"` - } `yaml:"global"` - } + var cfg config.Config if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil { return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML) } - lset := labels.FromMap(cfg.Global.ExternalLabels) + lset := cfg.GlobalConfig.ExternalLabels sort.Sort(lset) return lset, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index ff4eed84e78..8bb44ca3d63 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -605,7 +605,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/thanos v0.22.0 +# github.com/thanos-io/thanos v0.19.1-0.20210803192524-baea4ce9ef52 ## explicit github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader @@ -619,6 +619,7 @@ github.com/thanos-io/thanos/pkg/discovery/cache github.com/thanos-io/thanos/pkg/discovery/dns github.com/thanos-io/thanos/pkg/discovery/dns/godns github.com/thanos-io/thanos/pkg/discovery/dns/miekgdns +github.com/thanos-io/thanos/pkg/discovery/memcache github.com/thanos-io/thanos/pkg/errutil github.com/thanos-io/thanos/pkg/exemplars/exemplarspb github.com/thanos-io/thanos/pkg/extprom From 2d05ccf0834bce83051845b8289c18ec4fe8725c Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Fri, 17 Sep 2021 16:59:36 -0700 Subject: [PATCH 2/2] update documentation Signed-off-by: Roy Chiang --- docs/blocks-storage/querier.md | 9 ++++++--- docs/blocks-storage/store-gateway.md | 9 ++++++--- docs/configuration/arguments.md | 2 +- docs/configuration/config-file-reference.md | 9 ++++++--- pkg/storage/tsdb/memcache_client_config.go | 2 +- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 8870a428cc6..9bfb2a44807 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -512,7 +512,8 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery [auto_discovery: | default = false] @@ -563,7 +564,8 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.auto-discovery [auto_discovery: | default = false] @@ -633,7 +635,8 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.auto-discovery [auto_discovery: | default = false] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index ca48d4ff8b9..d82fdcfd99c 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -576,7 +576,8 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery [auto_discovery: | default = false] @@ -627,7 +628,8 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.auto-discovery [auto_discovery: | default = false] @@ -697,7 +699,8 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.auto-discovery [auto_discovery: | default = false] diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 745048b0258..7fdd3b712e4 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -533,7 +533,7 @@ The DNS service discovery, inspired from Thanos DNS SD, supports different disco If **no prefix** is provided, the provided IP or hostname will be used straightaway without pre-resolving it. -If you are using a managed memcached service from [Google Cloud](https://cloud.google.com/memorystore/docs/memcached/auto-discovery-overview), or [AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.HowAutoDiscoveryWorks.html), use the [auto-discovery](./config-file-reference.md#memcached-client-config) flag instead of DNS discovery. +If you are using a managed memcached service from [Google Cloud](https://cloud.google.com/memorystore/docs/memcached/auto-discovery-overview), or [AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.HowAutoDiscoveryWorks.html), use the [auto-discovery](./config-file-reference.md#memcached-client-config) flag instead of DNS discovery, then use the discovery/configuration endpoint as the domain name without any prefix. ## Logging of IP of reverse proxy diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 8aa5424a857..547ffdb6419 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4765,7 +4765,8 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.index-cache.memcached.auto-discovery [auto_discovery: | default = false] @@ -4816,7 +4817,8 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.chunks-cache.memcached.auto-discovery [auto_discovery: | default = false] @@ -4885,7 +4887,8 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.max-item-size [max_item_size: | default = 1048576] - # Use memcached auto-discovery instead of DNS resolution + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS # CLI flag: -blocks-storage.bucket-store.metadata-cache.memcached.auto-discovery [auto_discovery: | default = false] diff --git a/pkg/storage/tsdb/memcache_client_config.go b/pkg/storage/tsdb/memcache_client_config.go index 9d736b6207c..becacd5f51e 100644 --- a/pkg/storage/tsdb/memcache_client_config.go +++ b/pkg/storage/tsdb/memcache_client_config.go @@ -30,7 +30,7 @@ func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefi f.IntVar(&cfg.MaxGetMultiConcurrency, prefix+"max-get-multi-concurrency", 100, "The maximum number of concurrent connections running get operations. If set to 0, concurrency is unlimited.") f.IntVar(&cfg.MaxGetMultiBatchSize, prefix+"max-get-multi-batch-size", 0, "The maximum number of keys a single underlying get operation should run. If more keys are specified, internally keys are split into multiple batches and fetched concurrently, honoring the max concurrency. If set to 0, the max batch size is unlimited.") f.IntVar(&cfg.MaxItemSize, prefix+"max-item-size", 1024*1024, "The maximum size of an item stored in memcached. Bigger items are not stored. If set to 0, no maximum size is enforced.") - f.BoolVar(&cfg.AutoDiscovery, prefix+"auto-discovery", false, "Use memcached auto-discovery instead of DNS resolution") + f.BoolVar(&cfg.AutoDiscovery, prefix+"auto-discovery", false, "Use memcached auto-discovery mechanism provided by some cloud provider like GCP and AWS") } func (cfg *MemcachedClientConfig) GetAddresses() []string {