From ca835ad313c16aa95ac12ef0f10a13a2458e6c47 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Wed, 9 Oct 2024 14:25:43 +0530 Subject: [PATCH] use global thanos metrics and other changes (#1) use a global thanos metric. remove ruler changes, will be added in a separate pr rename configs --- pkg/bloombuild/common/tsdb.go | 5 +- pkg/logcli/query/query.go | 5 +- pkg/loki/common/common.go | 7 +-- pkg/loki/config_wrapper.go | 11 +--- pkg/loki/modules.go | 21 +++---- pkg/ruler/base/ruler_test.go | 2 +- pkg/ruler/base/storage.go | 13 +---- pkg/storage/bucket/client.go | 24 ++------ pkg/storage/bucket/client_test.go | 2 +- pkg/storage/chunk/client/client.go | 23 -------- .../client/gcp/gcs_thanos_object_client.go | 32 ++++------ pkg/storage/factory.go | 58 ++++++++----------- pkg/storage/store.go | 9 ++- .../stores/shipper/bloomshipper/store.go | 5 +- pkg/tool/audit/audit.go | 6 +- tools/tsdb/index-analyzer/main.go | 5 +- tools/tsdb/migrate-versions/main.go | 5 +- 17 files changed, 71 insertions(+), 162 deletions(-) diff --git a/pkg/bloombuild/common/tsdb.go b/pkg/bloombuild/common/tsdb.go index e465c0dbe6b44..f8a8a17a6eb10 100644 --- a/pkg/bloombuild/common/tsdb.go +++ b/pkg/bloombuild/common/tsdb.go @@ -188,13 +188,12 @@ func NewTSDBStores( stores: make([]TSDBStore, len(schemaCfg.Configs)), } - metrics := &client.Metrics{Registerer: reg} for i, cfg := range schemaCfg.Configs { if cfg.IndexType == types.TSDBType { var c client.ObjectClient var err error - if storeCfg.ThanosObjStore { - c, err = baseStore.NewObjectClientV2(component, cfg.ObjectType, storeCfg, metrics) + if storeCfg.UseThanosObjstore { + c, err = baseStore.NewObjectClientV2(component, cfg.ObjectType, storeCfg) } else { c, err = baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics) } diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index 3b5fc81f3368b..f0ed8471e40c0 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -540,9 +540,8 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) { var c chunk.ObjectClient var err error - if conf.StorageConfig.ThanosObjStore { - metrics := &chunk.Metrics{Registerer: prometheus.DefaultRegisterer} - c, err = storage.NewObjectClientV2("log-cli-query", store, conf.StorageConfig, metrics) + if conf.StorageConfig.UseThanosObjstore { + c, err = storage.NewObjectClientV2("log-cli-query", store, conf.StorageConfig) } else { c, err = storage.NewObjectClient(store, conf.StorageConfig, cm) } diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 2e7e3d7666b98..7bd4436b34b94 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -59,8 +59,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", f) c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", throwaway) - f.BoolVar(&c.Storage.ThanosObjStore, "common.thanos.enable", false, "Enable the thanos.io/objstore to be the backend for object storage") - // instance related flags. c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger) throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.") @@ -81,8 +79,7 @@ type Storage struct { Hedging hedging.Config `yaml:"hedging"` COS ibmcloud.COSConfig `yaml:"cos"` CongestionControl congestion.Config `yaml:"congestion_control,omitempty"` - ThanosObjStore bool `yaml:"thanos_objstore"` - ObjStoreConf bucket.Config `yaml:"objstore_config"` + ObjectStore bucket.Config `yaml:"object_store"` } func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -97,7 +94,7 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { s.COS.RegisterFlagsWithPrefix(prefix, f) s.CongestionControl.RegisterFlagsWithPrefix(prefix, f) - s.ObjStoreConf.RegisterFlagsWithPrefix(prefix+"thanos.", f) + s.ObjectStore.RegisterFlagsWithPrefix(prefix+"object-store.", f) } type FilesystemConfig struct { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 50c0817e1a45c..9ec68deb541c7 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -18,7 +18,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/cfg" lokiring "github.com/grafana/loki/v3/pkg/util/ring" - "github.com/grafana/loki/v3/pkg/ruler/rulestore" "github.com/grafana/loki/v3/pkg/ruler/rulestore/local" loki_net "github.com/grafana/loki/v3/pkg/util/net" ) @@ -567,17 +566,11 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error { } } - if !reflect.DeepEqual(cfg.Common.Storage.ObjStoreConf, defaults.StorageConfig.ObjStoreConf) { + if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore) { configsFound++ applyConfig = func(r *ConfigWrapper) { - r.Ruler.StoreConfig.ThanosObjStore = r.Common.Storage.ThanosObjStore - r.Ruler.StoreConfig.ObjStoreConf = rulestore.Config{ - Config: r.Common.Storage.ObjStoreConf, - } - r.StorageConfig.ThanosObjStore = r.Common.Storage.ThanosObjStore - r.StorageConfig.ObjStoreConf = r.Common.Storage.ObjStoreConf - r.StorageConfig.Hedging = r.Common.Storage.Hedging + r.StorageConfig.ObjectStore = r.Common.Storage.ObjectStore } } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 64e93ffbc2f4f..2d6ea394ce140 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1237,12 +1237,8 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) { return nil, err } } - overrides, err := validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits) - if err != nil { - return nil, err - } - t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, overrides, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger) + t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger) return } @@ -1411,7 +1407,6 @@ func (t *Loki) initCompactor() (services.Service, error) { } objectClients := make(map[config.DayTime]client.ObjectClient) - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} for _, periodConfig := range t.Cfg.SchemaConfig.Configs { if !config.IsObjectStorageIndex(periodConfig.IndexType) { continue @@ -1419,8 +1414,8 @@ func (t *Loki) initCompactor() (services.Service, error) { var objectClient client.ObjectClient var err error - if t.Cfg.StorageConfig.ThanosObjStore { - objectClient, err = storage.NewObjectClientV2("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig, metrics) + if t.Cfg.StorageConfig.UseThanosObjstore { + objectClient, err = storage.NewObjectClientV2("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig) } else { objectClient, err = storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics) } @@ -1434,9 +1429,8 @@ func (t *Loki) initCompactor() (services.Service, error) { var deleteRequestStoreClient client.ObjectClient if t.Cfg.CompactorConfig.RetentionEnabled { if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" { - if t.Cfg.StorageConfig.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - deleteRequestStoreClient, err = storage.NewObjectClientV2("compactor", deleteStore, t.Cfg.StorageConfig, metrics) + if t.Cfg.StorageConfig.UseThanosObjstore { + deleteRequestStoreClient, err = storage.NewObjectClientV2("compactor", deleteStore, t.Cfg.StorageConfig) } else { deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.ClientMetrics) } @@ -1754,9 +1748,8 @@ func (t *Loki) initAnalytics() (services.Service, error) { } var objectClient client.ObjectClient - if t.Cfg.StorageConfig.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - objectClient, err = storage.NewObjectClientV2("analytics", period.ObjectType, t.Cfg.StorageConfig, metrics) + if t.Cfg.StorageConfig.UseThanosObjstore { + objectClient, err = storage.NewObjectClientV2("analytics", period.ObjectType, t.Cfg.StorageConfig) } else { objectClient, err = storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics) } diff --git a/pkg/ruler/base/ruler_test.go b/pkg/ruler/base/ruler_test.go index f126a40e02ee9..b180c559d8d3b 100644 --- a/pkg/ruler/base/ruler_test.go +++ b/pkg/ruler/base/ruler_test.go @@ -205,7 +205,7 @@ func buildRuler(t *testing.T, rulerConfig Config, q storage.Querier, clientMetri require.NoError(t, rulerConfig.Validate(log.NewNopLogger())) engine, queryable, pusher, logger, overrides, reg := testSetup(t, q) - storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, nil, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger()) + storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger()) require.NoError(t, err) managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, reg, constants.Loki) diff --git a/pkg/ruler/base/storage.go b/pkg/ruler/base/storage.go index 1db17004f8066..3c311498c8bd0 100644 --- a/pkg/ruler/base/storage.go +++ b/pkg/ruler/base/storage.go @@ -27,7 +27,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" "github.com/grafana/loki/v3/pkg/storage/chunk/client/ibmcloud" "github.com/grafana/loki/v3/pkg/storage/chunk/client/openstack" - util_log "github.com/grafana/loki/v3/pkg/util/log" ) // RuleStoreConfig configures a rule store. @@ -45,9 +44,6 @@ type RuleStoreConfig struct { COS ibmcloud.COSConfig `yaml:"cos" doc:"description=Configures backend rule storage for IBM Cloud Object Storage (COS)."` Local local.Config `yaml:"local" doc:"description=Configures backend rule storage for a local file system directory."` - ThanosObjStore bool `yaml:"thanos_objstore"` - ObjStoreConf rulestore.Config `yaml:"objstore_config"` - mock rulestore.RuleStore `yaml:"-"` } @@ -86,7 +82,7 @@ func (cfg *RuleStoreConfig) IsDefaults() bool { // NewLegacyRuleStore returns a rule store backend client based on the provided cfg. // The client used by the function is based a legacy object store clients that shouldn't // be used anymore. -func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProvider, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) { +func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) { if cfg.mock != nil { return cfg.mock, nil } @@ -98,10 +94,6 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProv var err error var client client.ObjectClient - if cfg.ThanosObjStore { - return NewRuleStore(context.Background(), cfg.ObjStoreConf, cfgProvider, loader, util_log.Logger, prometheus.DefaultRegisterer) - } - switch cfg.Type { case "azure": client, err = azure.NewBlobStorage(&cfg.Azure, clientMetrics.AzureMetrics, hedgeCfg) @@ -144,8 +136,7 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket. if cfg.Backend == local.Name { return local.NewLocalRulesClient(cfg.Local, loader) } - metrics := &bucket.Metrics{Registerer: reg} - bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, metrics) + bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger) if err != nil { return nil, err } diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 504addbc66689..90597394004ac 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -46,6 +46,9 @@ var ( ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters") + + // TODO: Remove this global if we can tag all clients with component name + metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "") ) // StorageBackendConfig holds configuration for accessing long-term storage. @@ -154,12 +157,13 @@ func (cfg *Config) Validate() error { } // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, metrics *Metrics) (objstore.InstrumentedBucket, error) { +func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) { var ( client objstore.Bucket err error ) + // TODO: add support for other backends that loki already supports switch cfg.Backend { case S3: client, err = s3.NewBucketClient(cfg.S3, name, logger) @@ -183,7 +187,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, client = NewPrefixedBucketClient(client, cfg.StoragePrefix) } - instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, metrics)) + instrumentedClient := objstoretracing.WrapWithTraces(objstore.WrapWith(client, metrics)) // Wrap the client with any provided middleware for _, wrap := range cfg.Middlewares { @@ -195,19 +199,3 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, return instrumentedClient, nil } - -func bucketWithMetrics(bucketClient objstore.Bucket, name string, metrics *Metrics) objstore.Bucket { - if metrics == nil { - return bucketClient - } - - if metrics.BucketMetrics == nil { - reg := metrics.Registerer - reg = prometheus.WrapRegistererWithPrefix("loki_", reg) - reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg) - // Save metrics to be assigned to other buckets created with the same component name - metrics.BucketMetrics = objstore.BucketMetrics(reg, "") - } - - return objstore.WrapWith(bucketClient, metrics.BucketMetrics) -} diff --git a/pkg/storage/bucket/client_test.go b/pkg/storage/bucket/client_test.go index 489f7d2f1f269..c2034d0fec40d 100644 --- a/pkg/storage/bucket/client_test.go +++ b/pkg/storage/bucket/client_test.go @@ -80,7 +80,7 @@ func TestNewClient(t *testing.T) { require.NoError(t, err) // Instance a new bucket client from the config - bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger, nil) + bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger) require.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { diff --git a/pkg/storage/chunk/client/client.go b/pkg/storage/chunk/client/client.go index d89c540b29efa..36b65d40b6c2e 100644 --- a/pkg/storage/chunk/client/client.go +++ b/pkg/storage/chunk/client/client.go @@ -4,9 +4,6 @@ import ( "context" "errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/objstore" - "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/stores/series/index" ) @@ -33,23 +30,3 @@ type Client interface { type ObjectAndIndexClient interface { PutChunksAndIndex(ctx context.Context, chunks []chunk.Chunk, index index.WriteBatch) error } - -// Metrics holds metric related configuration for the objstore. -type Metrics struct { - // Registerer is the prometheus registerer that will be used by the objstore - // to register bucket metrics. - Registerer prometheus.Registerer - // BucketMetrics are objstore metrics that are will wrap the bucket. - // This field is used to share metrics among buckets with from the same - // component. - // This should only be set by the function that interfaces with the bucket - // package. - BucketMetrics *objstore.Metrics - // HedgingBucketMetrics are objstore metrics that are will wrap the - // heding bucket. - // This field is used to share metrics among buckets with from the same - // component. - // This should only be set by the function that interfaces with the bucket - // package. - HedgingBucketMetrics *objstore.Metrics -} diff --git a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go index a742f56da57b9..a84901e577737 100644 --- a/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go +++ b/pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go @@ -24,28 +24,16 @@ type GCSThanosObjectClient struct { hedgedClient objstore.Bucket } -func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config, metrics *client.Metrics) (*GCSThanosObjectClient, error) { - objMetrics := &bucket.Metrics{ - Registerer: prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "false"}, metrics.Registerer), - BucketMetrics: metrics.BucketMetrics, - } - client, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg, objMetrics) +func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (*GCSThanosObjectClient, error) { + client, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg) if err != nil { return nil, err } - // Save metrics in case we need to create more buckets - metrics.BucketMetrics = objMetrics.BucketMetrics - hedgingObjMetrics := &bucket.Metrics{ - Registerer: prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "true"}, metrics.Registerer), - BucketMetrics: metrics.HedgingBucketMetrics, - } - hedgedClient, err := newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg, hedgingObjMetrics) + hedgedClient, err := newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg) if err != nil { return nil, err } - // Save metrics in case we need to create more buckets - metrics.HedgingBucketMetrics = objMetrics.BucketMetrics return &GCSThanosObjectClient{ client: client, @@ -53,16 +41,16 @@ func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component }, nil } -func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config, metrics *bucket.Metrics) (objstore.Bucket, error) { +func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) { if hedging { - hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "true"}, metrics.Registerer)) + hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)) if err != nil { return nil, err } cfg.GCS.HTTP.Transport = hedgedTrasport } - return bucket.NewClient(ctx, cfg, component, logger, metrics) + return bucket.NewClient(ctx, cfg, component, logger) } func (s *GCSThanosObjectClient) Stop() {} @@ -75,7 +63,7 @@ func (s *GCSThanosObjectClient) ObjectExists(ctx context.Context, objectKey stri // GetAttributes returns the attributes of the specified object key from the configured GCS bucket. func (s *GCSThanosObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) { attr := client.ObjectAttributes{} - thanosAttr, err := s.client.Attributes(ctx, objectKey) + thanosAttr, err := s.hedgedClient.Attributes(ctx, objectKey) if err != nil { return attr, err } @@ -91,12 +79,12 @@ func (s *GCSThanosObjectClient) PutObject(ctx context.Context, objectKey string, // GetObject returns a reader and the size for the specified object key from the configured GCS bucket. func (s *GCSThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { - reader, err := s.client.Get(ctx, objectKey) + reader, err := s.hedgedClient.Get(ctx, objectKey) if err != nil { return nil, 0, err } - attr, err := s.client.Attributes(ctx, objectKey) + attr, err := s.hedgedClient.Attributes(ctx, objectKey) if err != nil { return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey) } @@ -105,7 +93,7 @@ func (s *GCSThanosObjectClient) GetObject(ctx context.Context, objectKey string) } func (s *GCSThanosObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { - reader, err := s.client.GetRange(ctx, objectKey, offset, length) + reader, err := s.hedgedClient.GetRange(ctx, objectKey, offset, length) if err != nil { return nil, err } diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 3f0ff8cb3e13d..e0b8666ae62e9 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -295,8 +295,8 @@ type Config struct { DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"` MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"` - ThanosObjStore bool `yaml:"thanos_objstore"` - ObjStoreConf bucket.Config `yaml:"objstore_config"` + UseThanosObjstore bool `yaml:"thanos_objstore"` + ObjectStore bucket.Config `yaml:"object_store"` MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."` @@ -325,8 +325,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Hedging.RegisterFlagsWithPrefix("store.", f) cfg.CongestionControl.RegisterFlagsWithPrefix("store.", f) - f.BoolVar(&cfg.ThanosObjStore, "thanos.enable", false, "Enable the thanos.io/objstore to be the backend for object storage") - cfg.ObjStoreConf.RegisterFlagsWithPrefix("thanos.", f) + f.BoolVar(&cfg.UseThanosObjstore, "use-thanos-objstore", false, "Enable the thanos.io/objstore to be the backend for object storage") + cfg.ObjectStore.RegisterFlagsWithPrefix("object-store.", f) cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "", f) f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.") @@ -365,7 +365,7 @@ func (cfg *Config) Validate() error { if err := cfg.BloomShipperConfig.Validate(); err != nil { return errors.Wrap(err, "invalid bloom shipper config") } - if err := cfg.ObjStoreConf.Validate(); err != nil { + if err := cfg.ObjectStore.Validate(); err != nil { return err } @@ -406,9 +406,8 @@ func NewIndexClient(component string, periodCfg config.PeriodConfig, tableRange var objectClient client.ObjectClient var err error - if cfg.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - objectClient, err = NewObjectClientV2(component, periodCfg.ObjectType, cfg, metrics) + if cfg.UseThanosObjstore { + objectClient, err = NewObjectClientV2(component, periodCfg.ObjectType, cfg) } else { registerer = prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, registerer) objectClient, err = NewObjectClient(periodCfg.ObjectType, cfg, cm) @@ -488,9 +487,8 @@ func NewChunkClient(component, name string, cfg Config, schemaCfg config.SchemaC case types.StorageTypeInMemory: var c client.ObjectClient var err error - if cfg.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - c, err = NewObjectClientV2(component, name, cfg, metrics) + if cfg.UseThanosObjstore { + c, err = NewObjectClientV2(component, name, cfg) } else { c, err = NewObjectClient(name, cfg, clientMetrics) } @@ -505,9 +503,8 @@ func NewChunkClient(component, name string, cfg Config, schemaCfg config.SchemaC case types.StorageTypeFileSystem: var c client.ObjectClient var err error - if cfg.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - c, err = NewObjectClientV2(component, name, cfg, metrics) + if cfg.UseThanosObjstore { + c, err = NewObjectClientV2(component, name, cfg) } else { c, err = NewObjectClient(name, cfg, clientMetrics) } @@ -519,9 +516,8 @@ func NewChunkClient(component, name string, cfg Config, schemaCfg config.SchemaC case types.StorageTypeAWS, types.StorageTypeS3, types.StorageTypeAzure, types.StorageTypeBOS, types.StorageTypeSwift, types.StorageTypeCOS, types.StorageTypeAlibabaCloud: var c client.ObjectClient var err error - if cfg.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - c, err = NewObjectClientV2(component, name, cfg, metrics) + if cfg.UseThanosObjstore { + c, err = NewObjectClientV2(component, name, cfg) } else { c, err = NewObjectClient(name, cfg, clientMetrics) } @@ -536,9 +532,8 @@ func NewChunkClient(component, name string, cfg Config, schemaCfg config.SchemaC case types.StorageTypeGCS: var c client.ObjectClient var err error - if cfg.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - c, err = NewObjectClientV2(component, name, cfg, metrics) + if cfg.UseThanosObjstore { + c, err = NewObjectClientV2(component, name, cfg) } else { c, err = NewObjectClient(name, cfg, clientMetrics) } @@ -593,9 +588,8 @@ func NewTableClient(component, name string, periodCfg config.PeriodConfig, cfg C case util.StringsContain(types.SupportedIndexTypes, name): var objectClient client.ObjectClient var err error - if cfg.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - objectClient, err = NewObjectClientV2(component, periodCfg.ObjectType, cfg, metrics) + if cfg.UseThanosObjstore { + objectClient, err = NewObjectClientV2(component, periodCfg.ObjectType, cfg) } else { objectClient, err = NewObjectClient(periodCfg.ObjectType, cfg, cm) } @@ -654,10 +648,7 @@ func (c *ClientMetrics) Unregister() { // NewObjectClient makes a new StorageClient with the prefix in the front. func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { - metrics := &client.Metrics{ - Registerer: prometheus.DefaultRegisterer, - } - actual, err := internalNewObjectClient("", name, cfg, clientMetrics, metrics) + actual, err := internalNewObjectClient("", name, cfg, clientMetrics) if err != nil { return nil, err } @@ -671,11 +662,10 @@ func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (clie } // NewObjectClient makes a new StorageClient with the prefix in the front. -func NewObjectClientV2(component, name string, cfg Config, metrics *client.Metrics) (client.ObjectClient, error) { - // Statify internalNewObjectClient signature to be removed once the old objstore is removed +func NewObjectClientV2(component, name string, cfg Config) (client.ObjectClient, error) { + // TODO: Statify internalNewObjectClient signature to be removed once the old objstore is removed clientMetrics := ClientMetrics{} - - actual, err := internalNewObjectClient(component, name, cfg, clientMetrics, metrics) + actual, err := internalNewObjectClient(component, name, cfg, clientMetrics) if err != nil { return nil, err } @@ -689,7 +679,7 @@ func NewObjectClientV2(component, name string, cfg Config, metrics *client.Metri } // internalNewObjectClient makes the underlying StorageClient of the desired types. -func internalNewObjectClient(component, name string, cfg Config, clientMetrics ClientMetrics, metrics *client.Metrics) (client.ObjectClient, error) { +func internalNewObjectClient(component, name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { var ( namedStore string storeType = name @@ -743,8 +733,8 @@ func internalNewObjectClient(component, name string, cfg Config, clientMetrics C if cfg.CongestionControl.Enabled { gcsCfg.EnableRetries = false } - if cfg.ThanosObjStore { - return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjStoreConf, component, util_log.Logger, cfg.Hedging, metrics) + if cfg.UseThanosObjstore { + return gcp.NewGCSThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging) } return gcp.NewGCSObjectClient(context.Background(), gcsCfg, cfg.Hedging) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 75d912da8becb..07443453176fe 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -193,7 +193,6 @@ func NewStore(cfg Config, storeCfg config.ChunkStoreConfig, schemaCfg config.Sch } func (s *LokiStore) init() error { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} for i, p := range s.schemaCfg.Configs { p := p chunkClient, err := s.chunkClientForPeriod(p) @@ -209,7 +208,7 @@ func (s *LokiStore) init() error { if i < len(s.schemaCfg.Configs)-1 { periodEndTime = config.DayTime{Time: s.schemaCfg.Configs[i+1].From.Time.Add(-time.Millisecond)} } - w, idx, stop, err := s.storeForPeriod(p, p.GetIndexTableNumberRange(periodEndTime), chunkClient, f, metrics) + w, idx, stop, err := s.storeForPeriod(p, p.GetIndexTableNumberRange(periodEndTime), chunkClient, f) if err != nil { return err } @@ -265,7 +264,7 @@ func shouldUseIndexGatewayClient(cfg indexshipper.Config) bool { return true } -func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.TableRange, chunkClient client.Client, f *fetcher.Fetcher, metrics *client.Metrics) (stores.ChunkWriter, index.ReaderWriter, func(), error) { +func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.TableRange, chunkClient client.Client, f *fetcher.Fetcher) (stores.ChunkWriter, index.ReaderWriter, func(), error) { component := fmt.Sprintf("index-store-%s-%s", p.IndexType, p.From.String()) indexClientReg := prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, s.registerer) indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String())) @@ -287,8 +286,8 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl var objectClient client.ObjectClient var err error - if s.cfg.ThanosObjStore { - objectClient, err = NewObjectClientV2(component, p.ObjectType, s.cfg, metrics) + if s.cfg.UseThanosObjstore { + objectClient, err = NewObjectClientV2(component, p.ObjectType, s.cfg) } else { objectClient, err = NewObjectClient(p.ObjectType, s.cfg, s.clientMetrics) } diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 1a76d6b4eaee9..5277c496569f2 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -333,12 +333,11 @@ func NewBloomStore( } } - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} for _, periodicConfig := range periodicConfigs { var objectClient client.ObjectClient var err error - if storageConfig.ThanosObjStore { - objectClient, err = storage.NewObjectClientV2("bloom-store", periodicConfig.ObjectType, storageConfig, metrics) + if storageConfig.UseThanosObjstore { + objectClient, err = storage.NewObjectClientV2("bloom-store", periodicConfig.ObjectType, storageConfig) } else { objectClient, err = storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics) } diff --git a/pkg/tool/audit/audit.go b/pkg/tool/audit/audit.go index be10e44c6d1d0..303a26cf085b6 100644 --- a/pkg/tool/audit/audit.go +++ b/pkg/tool/audit/audit.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" progressbar "github.com/schollz/progressbar/v3" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -55,9 +54,8 @@ func GetObjectClient(cfg Config) (client.ObjectClient, error) { periodCfg := cfg.SchemaConfig.Configs[len(cfg.SchemaConfig.Configs)-1] // only check the last period. var objClient client.ObjectClient var err error - if cfg.StorageConfig.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - objClient, err = storage.NewObjectClientV2("audit", periodCfg.ObjectType, cfg.StorageConfig, metrics) + if cfg.StorageConfig.UseThanosObjstore { + objClient, err = storage.NewObjectClientV2("audit", periodCfg.ObjectType, cfg.StorageConfig) } else { objClient, err = storage.NewObjectClient(periodCfg.ObjectType, cfg.StorageConfig, storage.NewClientMetrics()) } diff --git a/tools/tsdb/index-analyzer/main.go b/tools/tsdb/index-analyzer/main.go index 1a38979b9ec2f..0392b4a85ea3b 100644 --- a/tools/tsdb/index-analyzer/main.go +++ b/tools/tsdb/index-analyzer/main.go @@ -26,9 +26,8 @@ func main() { helpers.ExitErr("find period config for bucket", err) var objectClient client.ObjectClient - if conf.StorageConfig.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - objectClient, err = storage.NewObjectClientV2("index-analyzer", periodCfg.ObjectType, conf.StorageConfig, metrics) + if conf.StorageConfig.UseThanosObjstore { + objectClient, err = storage.NewObjectClientV2("index-analyzer", periodCfg.ObjectType, conf.StorageConfig) } else { objectClient, err = storage.NewObjectClient(periodCfg.ObjectType, conf.StorageConfig, clientMetrics) } diff --git a/tools/tsdb/migrate-versions/main.go b/tools/tsdb/migrate-versions/main.go index e545644bcdf34..70238d835adb3 100644 --- a/tools/tsdb/migrate-versions/main.go +++ b/tools/tsdb/migrate-versions/main.go @@ -101,9 +101,8 @@ func main() { func migrateTables(pCfg config.PeriodConfig, storageCfg storage.Config, clientMetrics storage.ClientMetrics, tableRange config.TableRange) error { var objClient client.ObjectClient var err error - if storageCfg.ThanosObjStore { - metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer} - objClient, err = storage.NewObjectClientV2("tables-migration-tool", pCfg.ObjectType, storageCfg, metrics) + if storageCfg.UseThanosObjstore { + objClient, err = storage.NewObjectClientV2("tables-migration-tool", pCfg.ObjectType, storageCfg) } else { objClient, err = storage.NewObjectClient(pCfg.ObjectType, storageCfg, clientMetrics) }