Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use global thanos metrics and other changes #1

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,12 @@ func NewTSDBStores(
stores: make([]TSDBStore, len(schemaCfg.Configs)),
}

metrics := &client.Metrics{Registerer: reg}
for i, cfg := range schemaCfg.Configs {
if cfg.IndexType == types.TSDBType {
var c client.ObjectClient
var err error
if storeCfg.ThanosObjStore {
c, err = baseStore.NewObjectClientV2(component, cfg.ObjectType, storeCfg, metrics)
if storeCfg.UseThanosObjstore {
c, err = baseStore.NewObjectClientV2(component, cfg.ObjectType, storeCfg)
} else {
c, err = baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,8 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) {
var c chunk.ObjectClient
var err error
if conf.StorageConfig.ThanosObjStore {
metrics := &chunk.Metrics{Registerer: prometheus.DefaultRegisterer}
c, err = storage.NewObjectClientV2("log-cli-query", store, conf.StorageConfig, metrics)
if conf.StorageConfig.UseThanosObjstore {
c, err = storage.NewObjectClientV2("log-cli-query", store, conf.StorageConfig)
} else {
c, err = storage.NewObjectClient(store, conf.StorageConfig, cm)
}
Expand Down
7 changes: 2 additions & 5 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", f)
c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", throwaway)

f.BoolVar(&c.Storage.ThanosObjStore, "common.thanos.enable", false, "Enable the thanos.io/objstore to be the backend for object storage")

// instance related flags.
c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.")
Expand All @@ -81,8 +79,7 @@ type Storage struct {
Hedging hedging.Config `yaml:"hedging"`
COS ibmcloud.COSConfig `yaml:"cos"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
ThanosObjStore bool `yaml:"thanos_objstore"`
ObjStoreConf bucket.Config `yaml:"objstore_config"`
ObjectStore bucket.Config `yaml:"object_store"`
}

func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -97,7 +94,7 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
s.COS.RegisterFlagsWithPrefix(prefix, f)
s.CongestionControl.RegisterFlagsWithPrefix(prefix, f)

s.ObjStoreConf.RegisterFlagsWithPrefix(prefix+"thanos.", f)
s.ObjectStore.RegisterFlagsWithPrefix(prefix+"object-store.", f)
}

type FilesystemConfig struct {
Expand Down
11 changes: 2 additions & 9 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/cfg"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"

"github.com/grafana/loki/v3/pkg/ruler/rulestore"
"github.com/grafana/loki/v3/pkg/ruler/rulestore/local"
loki_net "github.com/grafana/loki/v3/pkg/util/net"
)
Expand Down Expand Up @@ -567,17 +566,11 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}
}

if !reflect.DeepEqual(cfg.Common.Storage.ObjStoreConf, defaults.StorageConfig.ObjStoreConf) {
if !reflect.DeepEqual(cfg.Common.Storage.ObjectStore, defaults.StorageConfig.ObjectStore) {
configsFound++

applyConfig = func(r *ConfigWrapper) {
r.Ruler.StoreConfig.ThanosObjStore = r.Common.Storage.ThanosObjStore
r.Ruler.StoreConfig.ObjStoreConf = rulestore.Config{
Config: r.Common.Storage.ObjStoreConf,
}
r.StorageConfig.ThanosObjStore = r.Common.Storage.ThanosObjStore
r.StorageConfig.ObjStoreConf = r.Common.Storage.ObjStoreConf
r.StorageConfig.Hedging = r.Common.Storage.Hedging
r.StorageConfig.ObjectStore = r.Common.Storage.ObjectStore
}
}

Expand Down
21 changes: 7 additions & 14 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,12 +1237,8 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
return nil, err
}
}
overrides, err := validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits)
if err != nil {
return nil, err
}

t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, overrides, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger)
t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger)

return
}
Expand Down Expand Up @@ -1411,16 +1407,15 @@ func (t *Loki) initCompactor() (services.Service, error) {
}

objectClients := make(map[config.DayTime]client.ObjectClient)
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
for _, periodConfig := range t.Cfg.SchemaConfig.Configs {
if !config.IsObjectStorageIndex(periodConfig.IndexType) {
continue
}

var objectClient client.ObjectClient
var err error
if t.Cfg.StorageConfig.ThanosObjStore {
objectClient, err = storage.NewObjectClientV2("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig, metrics)
if t.Cfg.StorageConfig.UseThanosObjstore {
objectClient, err = storage.NewObjectClientV2("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig)
} else {
objectClient, err = storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
}
Expand All @@ -1434,9 +1429,8 @@ func (t *Loki) initCompactor() (services.Service, error) {
var deleteRequestStoreClient client.ObjectClient
if t.Cfg.CompactorConfig.RetentionEnabled {
if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" {
if t.Cfg.StorageConfig.ThanosObjStore {
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
deleteRequestStoreClient, err = storage.NewObjectClientV2("compactor", deleteStore, t.Cfg.StorageConfig, metrics)
if t.Cfg.StorageConfig.UseThanosObjstore {
deleteRequestStoreClient, err = storage.NewObjectClientV2("compactor", deleteStore, t.Cfg.StorageConfig)
} else {
deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.ClientMetrics)
}
Expand Down Expand Up @@ -1754,9 +1748,8 @@ func (t *Loki) initAnalytics() (services.Service, error) {
}

var objectClient client.ObjectClient
if t.Cfg.StorageConfig.ThanosObjStore {
metrics := &client.Metrics{Registerer: prometheus.DefaultRegisterer}
objectClient, err = storage.NewObjectClientV2("analytics", period.ObjectType, t.Cfg.StorageConfig, metrics)
if t.Cfg.StorageConfig.UseThanosObjstore {
objectClient, err = storage.NewObjectClientV2("analytics", period.ObjectType, t.Cfg.StorageConfig)
} else {
objectClient, err = storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/base/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func buildRuler(t *testing.T, rulerConfig Config, q storage.Querier, clientMetri
require.NoError(t, rulerConfig.Validate(log.NewNopLogger()))

engine, queryable, pusher, logger, overrides, reg := testSetup(t, q)
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, nil, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
require.NoError(t, err)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, reg, constants.Loki)
Expand Down
13 changes: 2 additions & 11 deletions pkg/ruler/base/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/ibmcloud"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/openstack"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

// RuleStoreConfig configures a rule store.
Expand All @@ -45,9 +44,6 @@ type RuleStoreConfig struct {
COS ibmcloud.COSConfig `yaml:"cos" doc:"description=Configures backend rule storage for IBM Cloud Object Storage (COS)."`
Local local.Config `yaml:"local" doc:"description=Configures backend rule storage for a local file system directory."`

ThanosObjStore bool `yaml:"thanos_objstore"`
ObjStoreConf rulestore.Config `yaml:"objstore_config"`

mock rulestore.RuleStore `yaml:"-"`
}

Expand Down Expand Up @@ -86,7 +82,7 @@ func (cfg *RuleStoreConfig) IsDefaults() bool {
// NewLegacyRuleStore returns a rule store backend client based on the provided cfg.
// The client used by the function is based a legacy object store clients that shouldn't
// be used anymore.
func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProvider, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
if cfg.mock != nil {
return cfg.mock, nil
}
Expand All @@ -98,10 +94,6 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProv
var err error
var client client.ObjectClient

if cfg.ThanosObjStore {
return NewRuleStore(context.Background(), cfg.ObjStoreConf, cfgProvider, loader, util_log.Logger, prometheus.DefaultRegisterer)
}

switch cfg.Type {
case "azure":
client, err = azure.NewBlobStorage(&cfg.Azure, clientMetrics.AzureMetrics, hedgeCfg)
Expand Down Expand Up @@ -144,8 +136,7 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.
if cfg.Backend == local.Name {
return local.NewLocalRulesClient(cfg.Local, loader)
}
metrics := &bucket.Metrics{Registerer: reg}
bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, metrics)
bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger)
if err != nil {
return nil, err
}
Expand Down
24 changes: 6 additions & 18 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ var (

ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")

// TODO: Remove this global if we can tag all clients with component name
metrics = objstore.BucketMetrics(prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), "")
)

// StorageBackendConfig holds configuration for accessing long-term storage.
Expand Down Expand Up @@ -154,12 +157,13 @@ func (cfg *Config) Validate() error {
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, metrics *Metrics) (objstore.InstrumentedBucket, error) {
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.InstrumentedBucket, error) {
var (
client objstore.Bucket
err error
)

// TODO: add support for other backends that loki already supports
switch cfg.Backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
Expand All @@ -183,7 +187,7 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
client = NewPrefixedBucketClient(client, cfg.StoragePrefix)
}

instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, metrics))
instrumentedClient := objstoretracing.WrapWithTraces(objstore.WrapWith(client, metrics))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
Expand All @@ -195,19 +199,3 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,

return instrumentedClient, nil
}

func bucketWithMetrics(bucketClient objstore.Bucket, name string, metrics *Metrics) objstore.Bucket {
if metrics == nil {
return bucketClient
}

if metrics.BucketMetrics == nil {
reg := metrics.Registerer
reg = prometheus.WrapRegistererWithPrefix("loki_", reg)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)
// Save metrics to be assigned to other buckets created with the same component name
metrics.BucketMetrics = objstore.BucketMetrics(reg, "")
}

return objstore.WrapWith(bucketClient, metrics.BucketMetrics)
}
2 changes: 1 addition & 1 deletion pkg/storage/bucket/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewClient(t *testing.T) {
require.NoError(t, err)

// Instance a new bucket client from the config
bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger, nil)
bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger)
require.Equal(t, testData.expectedErr, err)

if testData.expectedErr == nil {
Expand Down
23 changes: 0 additions & 23 deletions pkg/storage/chunk/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"context"
"errors"

"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
)
Expand All @@ -33,23 +30,3 @@ type Client interface {
type ObjectAndIndexClient interface {
PutChunksAndIndex(ctx context.Context, chunks []chunk.Chunk, index index.WriteBatch) error
}

// Metrics holds metric related configuration for the objstore.
type Metrics struct {
// Registerer is the prometheus registerer that will be used by the objstore
// to register bucket metrics.
Registerer prometheus.Registerer
// BucketMetrics are objstore metrics that are will wrap the bucket.
// This field is used to share metrics among buckets with from the same
// component.
// This should only be set by the function that interfaces with the bucket
// package.
BucketMetrics *objstore.Metrics
// HedgingBucketMetrics are objstore metrics that are will wrap the
// heding bucket.
// This field is used to share metrics among buckets with from the same
// component.
// This should only be set by the function that interfaces with the bucket
// package.
HedgingBucketMetrics *objstore.Metrics
}
32 changes: 10 additions & 22 deletions pkg/storage/chunk/client/gcp/gcs_thanos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,33 @@ type GCSThanosObjectClient struct {
hedgedClient objstore.Bucket
}

func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config, metrics *client.Metrics) (*GCSThanosObjectClient, error) {
objMetrics := &bucket.Metrics{
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "false"}, metrics.Registerer),
BucketMetrics: metrics.BucketMetrics,
}
client, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg, objMetrics)
func NewGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (*GCSThanosObjectClient, error) {
client, err := newGCSThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg)
if err != nil {
return nil, err
}
// Save metrics in case we need to create more buckets
metrics.BucketMetrics = objMetrics.BucketMetrics

hedgingObjMetrics := &bucket.Metrics{
Registerer: prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "true"}, metrics.Registerer),
BucketMetrics: metrics.HedgingBucketMetrics,
}
hedgedClient, err := newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg, hedgingObjMetrics)
hedgedClient, err := newGCSThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg)
if err != nil {
return nil, err
}
// Save metrics in case we need to create more buckets
metrics.HedgingBucketMetrics = objMetrics.BucketMetrics

return &GCSThanosObjectClient{
client: client,
hedgedClient: hedgedClient,
}, nil
}

func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config, metrics *bucket.Metrics) (objstore.Bucket, error) {
func newGCSThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) {
if hedging {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWith(prometheus.Labels{"hedging": "true"}, metrics.Registerer))
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}

cfg.GCS.HTTP.Transport = hedgedTrasport
}
return bucket.NewClient(ctx, cfg, component, logger, metrics)
return bucket.NewClient(ctx, cfg, component, logger)
}

func (s *GCSThanosObjectClient) Stop() {}
Expand All @@ -75,7 +63,7 @@ func (s *GCSThanosObjectClient) ObjectExists(ctx context.Context, objectKey stri
// GetAttributes returns the attributes of the specified object key from the configured GCS bucket.
func (s *GCSThanosObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
attr := client.ObjectAttributes{}
thanosAttr, err := s.client.Attributes(ctx, objectKey)
thanosAttr, err := s.hedgedClient.Attributes(ctx, objectKey)
if err != nil {
return attr, err
}
Expand All @@ -91,12 +79,12 @@ func (s *GCSThanosObjectClient) PutObject(ctx context.Context, objectKey string,

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
func (s *GCSThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := s.client.Get(ctx, objectKey)
reader, err := s.hedgedClient.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

attr, err := s.client.Attributes(ctx, objectKey)
attr, err := s.hedgedClient.Attributes(ctx, objectKey)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}
Expand All @@ -105,7 +93,7 @@ func (s *GCSThanosObjectClient) GetObject(ctx context.Context, objectKey string)
}

func (s *GCSThanosObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
reader, err := s.client.GetRange(ctx, objectKey, offset, length)
reader, err := s.hedgedClient.GetRange(ctx, objectKey, offset, length)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading