Skip to content

Commit

Permalink
use global thanos metrics and other changes (#1)
Browse files Browse the repository at this point in the history
    use a global thanos metric.
    remove ruler changes, will be added in a separate pr
    rename configs
  • Loading branch information
ashwanthgoli authored Oct 9, 2024
1 parent e908566 commit ca835ad
Show file tree
Hide file tree
Showing 17 changed files with 71 additions and 162 deletions.
5 changes: 2 additions & 3 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,12 @@ func NewTSDBStores(
stores: make([]TSDBStore, len(schemaCfg.Configs)),
}

metrics := &client.Metrics{Registerer: reg}
for i, cfg := range schemaCfg.Configs {
if cfg.IndexType == types.TSDBType {
var c client.ObjectClient
var err error
if storeCfg.ThanosObjStore {
c, err = baseStore.NewObjectClientV2(component, cfg.ObjectType, storeCfg, metrics)
if storeCfg.UseThanosObjstore {
c, err = baseStore.NewObjectClientV2(component, cfg.ObjectType, storeCfg)
} else {
c, err = baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,8 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) {
var c chunk.ObjectClient
var err error
if conf.StorageConfig.ThanosObjStore {
metrics := &chunk.Metrics{Registerer: prometheus.DefaultRegisterer}
c, err = storage.NewObjectClientV2("log-cli-query", store, conf.StorageConfig, metrics)
if conf.StorageConfig.UseThanosObjstore {
c, err = storage.NewObjectClientV2("log-cli-query", store, conf.StorageConfig)
} else {
c, err = storage.NewObjectClient(store, conf.StorageConfig, cm)
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", f)
c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", throwaway)

f.BoolVar(&c.Storage.ThanosObjStore, "common.thanos.enable", false, "Enable the thanos.io/objstore to be the backend for object storage")

// instance related flags.
c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.")
Expand All @@ -81,8 +79,7 @@ type Storage struct {
Hedging hedging.Config `yaml:"hedging"`
COS ibmcloud.COSConfig `yaml:"cos"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
ThanosObjStore bool `yaml:"thanos_objstore"`
ObjStoreConf bucket.Config `yaml:"objstore_config"`
ObjectStore bucket.Config `yaml:"object_store"`
}

func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -97,7 +94,7 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
s.COS.RegisterFlagsWithPrefix(prefix, f)
s.CongestionControl.RegisterFlagsWithPrefix(prefix, f)

s.ObjStoreConf.RegisterFlagsWithPrefix(prefix+"thanos.", f)
s.ObjectStore.RegisterFlagsWithPrefix(prefix+"object-store.", f)
}

type FilesystemConfig struct {
Expand Down
11 changes: 2 additions & 9 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/cfg"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"

"github.com/grafana/loki/v3/pkg/ruler/rulestore"
"github.com/grafana/loki/v3/pkg/ruler/rulestore/local"
loki_net "github.com/grafana/loki/v3/pkg/util/net"
)
Expand Down Expand Up @@ -567,17 +566,11 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}
}

if !reflect.DeepEqual(cfg.Common.Storage.ObjStoreConf, defaults.StorageConfig.ObjStoreConf) {
if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore) {
configsFound++

applyConfig = func(r *ConfigWrapper) {
r.Ruler.StoreConfig.ThanosObjStore = r.Common.Storage.ThanosObjStore
r.Ruler.StoreConfig.ObjStoreConf = rulestore.Config{
Config: r.Common.Storage.ObjStoreConf,
}
r.StorageConfig.ThanosObjStore = r.Common.Storage.ThanosObjStore
r.StorageConfig.ObjStoreConf = r.Common.Storage.ObjStoreConf
r.StorageConfig.Hedging = r.Common.Storage.Hedging
r.StorageConfig.ObjectStore = r.Common.Storage.ObjectStore
}
}

Expand Down
21 changes: 7 additions & 14 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,12 +1237,8 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
return nil, err
}
}
overrides, err := validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits)
if err != nil {
return nil, err
}

t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, overrides, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger)
t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger)

return
}
Expand Down Expand Up @@ -1411,16 +1407,15 @@ func (t *Loki) initCompactor() (services.Service, error) {
}

objectClients := make(map[config.DayTime]client.ObjectClient)
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
for _, periodConfig := range t.Cfg.SchemaConfig.Configs {
if !config.IsObjectStorageIndex(periodConfig.IndexType) {
continue
}

var objectClient client.ObjectClient
var err error
if t.Cfg.StorageConfig.ThanosObjStore {
objectClient, err = storage.NewObjectClientV2("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig, metrics)
if t.Cfg.StorageConfig.UseThanosObjstore {
objectClient, err = storage.NewObjectClientV2("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig)
} else {
objectClient, err = storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
}
Expand All @@ -1434,9 +1429,8 @@ func (t *Loki) initCompactor() (services.Service, error) {
var deleteRequestStoreClient client.ObjectClient
if t.Cfg.CompactorConfig.RetentionEnabled {
if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" {
if t.Cfg.StorageConfig.ThanosObjStore {
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
deleteRequestStoreClient, err = storage.NewObjectClientV2("compactor", deleteStore, t.Cfg.StorageConfig, metrics)
if t.Cfg.StorageConfig.UseThanosObjstore {
deleteRequestStoreClient, err = storage.NewObjectClientV2("compactor", deleteStore, t.Cfg.StorageConfig)
} else {
deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.ClientMetrics)
}
Expand Down Expand Up @@ -1754,9 +1748,8 @@ func (t *Loki) initAnalytics() (services.Service, error) {
}

var objectClient client.ObjectClient
if t.Cfg.StorageConfig.ThanosObjStore {
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
objectClient, err = storage.NewObjectClientV2("analytics", period.ObjectType, t.Cfg.StorageConfig, metrics)
if t.Cfg.StorageConfig.UseThanosObjstore {
objectClient, err = storage.NewObjectClientV2("analytics", period.ObjectType, t.Cfg.StorageConfig)
} else {
objectClient, err = storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/base/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func buildRuler(t *testing.T, rulerConfig Config, q storage.Querier, clientMetri
require.NoError(t, rulerConfig.Validate(log.NewNopLogger()))

engine, queryable, pusher, logger, overrides, reg := testSetup(t, q)
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, nil, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
require.NoError(t, err)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, reg, constants.Loki)
Expand Down
13 changes: 2 additions & 11 deletions pkg/ruler/base/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/ibmcloud"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/openstack"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

// RuleStoreConfig configures a rule store.
Expand All @@ -45,9 +44,6 @@ type RuleStoreConfig struct {
COS ibmcloud.COSConfig `yaml:"cos" doc:"description=Configures backend rule storage for IBM Cloud Object Storage (COS)."`
Local local.Config `yaml:"local" doc:"description=Configures backend rule storage for a local file system directory."`

ThanosObjStore bool `yaml:"thanos_objstore"`
ObjStoreConf rulestore.Config `yaml:"objstore_config"`

mock rulestore.RuleStore `yaml:"-"`
}

Expand Down Expand Up @@ -86,7 +82,7 @@ func (cfg *RuleStoreConfig) IsDefaults() bool {
// NewLegacyRuleStore returns a rule store backend client based on the provided cfg.
// The client used by the function is based a legacy object store clients that shouldn't
// be used anymore.
func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProvider, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
if cfg.mock != nil {
return cfg.mock, nil
}
Expand All @@ -98,10 +94,6 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProv
var err error
var client client.ObjectClient

if cfg.ThanosObjStore {
return NewRuleStore(context.Background(), cfg.ObjStoreConf, cfgProvider, loader, util_log.Logger, prometheus.DefaultRegisterer)
}

switch cfg.Type {
case "azure":
client, err = azure.NewBlobStorage(&cfg.Azure, clientMetrics.AzureMetrics, hedgeCfg)
Expand Down Expand Up @@ -144,8 +136,7 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.
if cfg.Backend == local.Name {
return local.NewLocalRulesClient(cfg.Local, loader)
}
metrics := &bucket.Metrics{Registerer: reg}
bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, metrics)
bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger)
if err != nil {
return nil, err
}
Expand Down
24 changes: 6 additions & 18 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ var (

ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")

// TODO: Remove this global if we can tag all clients with component name
metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "")
)

// StorageBackendConfig holds configuration for accessing long-term storage.
Expand Down Expand Up @@ -154,12 +157,13 @@ func (cfg *Config) Validate() error {
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, metrics *Metrics) (objstore.InstrumentedBucket, error) {
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) {
var (
client objstore.Bucket
err error
)

// TODO: add support for other backends that loki already supports
switch cfg.Backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
Expand All @@ -183,7 +187,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
client = NewPrefixedBucketClient(client, cfg.StoragePrefix)
}

instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, metrics))
instrumentedClient := objstoretracing.WrapWithTraces(objstore.WrapWith(client, metrics))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
Expand All @@ -195,19 +199,3 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,

return instrumentedClient, nil
}

func bucketWithMetrics(bucketClient objstore.Bucket, name string, metrics *Metrics) objstore.Bucket {
if metrics == nil {
return bucketClient
}

if metrics.BucketMetrics == nil {
reg := metrics.Registerer
reg = prometheus.WrapRegistererWithPrefix("loki_", reg)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)
// Save metrics to be assigned to other buckets created with the same component name
metrics.BucketMetrics = objstore.BucketMetrics(reg, "")
}

return objstore.WrapWith(bucketClient, metrics.BucketMetrics)
}
2 changes: 1 addition & 1 deletion pkg/storage/bucket/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewClient(t *testing.T) {
require.NoError(t, err)

// Instance a new bucket client from the config
bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger, nil)
bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger)
require.Equal(t, testData.expectedErr, err)

if testData.expectedErr == nil {
Expand Down
23 changes: 0 additions & 23 deletions pkg/storage/chunk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"context"
"errors"

"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
)
Expand All @@ -33,23 +30,3 @@ type Client interface {
type ObjectAndIndexClient interface {
PutChunksAndIndex(ctx context.Context, chunks []chunk.Chunk, index index.WriteBatch) error
}

// Metrics holds metric related configuration for the objstore.
type Metrics struct {
// Registerer is the prometheus registerer that will be used by the objstore
// to register bucket metrics.
Registerer prometheus.Registerer
// BucketMetrics are objstore metrics that are will wrap the bucket.
// This field is used to share metrics among buckets with from the same
// component.
// This should only be set by the function that interfaces with the bucket
// package.
BucketMetrics *objstore.Metrics
// HedgingBucketMetrics are objstore metrics that are will wrap the
// heding bucket.
// This field is used to share metrics among buckets with from the same
// component.
// This should only be set by the function that interfaces with the bucket
// package.
HedgingBucketMetrics *objstore.Metrics
}
32 changes: 10 additions & 22 deletions pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,33 @@ type GCSThanosObjectClient struct {
hedgedClient objstore.Bucket
}

func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config, metrics *client.Metrics) (*GCSThanosObjectClient, error) {
objMetrics := &bucket.Metrics{
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "false"}, metrics.Registerer),
BucketMetrics: metrics.BucketMetrics,
}
client, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg, objMetrics)
func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (*GCSThanosObjectClient, error) {
client, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg)
if err != nil {
return nil, err
}
// Save metrics in case we need to create more buckets
metrics.BucketMetrics = objMetrics.BucketMetrics

hedgingObjMetrics := &bucket.Metrics{
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "true"}, metrics.Registerer),
BucketMetrics: metrics.HedgingBucketMetrics,
}
hedgedClient, err := newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg, hedgingObjMetrics)
hedgedClient, err := newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg)
if err != nil {
return nil, err
}
// Save metrics in case we need to create more buckets
metrics.HedgingBucketMetrics = objMetrics.BucketMetrics

return &GCSThanosObjectClient{
client: client,
hedgedClient: hedgedClient,
}, nil
}

func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config, metrics *bucket.Metrics) (objstore.Bucket, error) {
func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) {
if hedging {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "true"}, metrics.Registerer))
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}

cfg.GCS.HTTP.Transport = hedgedTrasport
}
return bucket.NewClient(ctx, cfg, component, logger, metrics)
return bucket.NewClient(ctx, cfg, component, logger)
}

func (s *GCSThanosObjectClient) Stop() {}
Expand All @@ -75,7 +63,7 @@ func (s *GCSThanosObjectClient) ObjectExists(ctx context.Context, objectKey stri
// GetAttributes returns the attributes of the specified object key from the configured GCS bucket.
func (s *GCSThanosObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
attr := client.ObjectAttributes{}
thanosAttr, err := s.client.Attributes(ctx, objectKey)
thanosAttr, err := s.hedgedClient.Attributes(ctx, objectKey)
if err != nil {
return attr, err
}
Expand All @@ -91,12 +79,12 @@ func (s *GCSThanosObjectClient) PutObject(ctx context.Context, objectKey string,

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
func (s *GCSThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := s.client.Get(ctx, objectKey)
reader, err := s.hedgedClient.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

attr, err := s.client.Attributes(ctx, objectKey)
attr, err := s.hedgedClient.Attributes(ctx, objectKey)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}
Expand All @@ -105,7 +93,7 @@ func (s *GCSThanosObjectClient) GetObject(ctx context.Context, objectKey string)
}

func (s *GCSThanosObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
reader, err := s.client.GetRange(ctx, objectKey, offset, length)
reader, err := s.hedgedClient.GetRange(ctx, objectKey, offset, length)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit ca835ad

Please sign in to comment.