From 454ffc6f5896a924b1b0772e0afaf3ea8bbb86c0 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 24 Jun 2020 14:07:31 +0530 Subject: [PATCH 1/7] add purpose label to metric being used for tracing cassandra session and relevant tests Signed-off-by: Sandeep Sukhani --- pkg/chunk/cassandra/fixtures.go | 7 +- pkg/chunk/cassandra/storage_client.go | 16 ++-- pkg/chunk/cassandra/storage_client_test.go | 85 ++++++++++++++++++++++ pkg/chunk/cassandra/table_client.go | 5 +- pkg/chunk/schema_config.go | 6 +- pkg/chunk/storage/factory.go | 16 ++-- pkg/chunk/storage/factory_test.go | 46 +++++++++++- pkg/cortex/modules.go | 15 ++-- 8 files changed, 167 insertions(+), 29 deletions(-) diff --git a/pkg/chunk/cassandra/fixtures.go b/pkg/chunk/cassandra/fixtures.go index feb2df25219..a5bba381d7b 100644 --- a/pkg/chunk/cassandra/fixtures.go +++ b/pkg/chunk/cassandra/fixtures.go @@ -34,18 +34,19 @@ func (f *fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient, // Get a SchemaConfig with the defaults. schemaConfig := testutils.DefaultSchemaConfig("cassandra") + schemaStart := schemaConfig.Configs[0].From.String() - storageClient, err := NewStorageClient(cfg, schemaConfig) + storageClient, err := NewStorageClient(cfg, schemaConfig, schemaStart, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } - objectClient, err := NewObjectClient(cfg, schemaConfig) + objectClient, err := NewObjectClient(cfg, schemaConfig, schemaStart, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } - tableClient, err := NewTableClient(context.Background(), cfg) + tableClient, err := NewTableClient(context.Background(), cfg, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index c4a0fa56f29..1e3e5a4267a 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -89,7 +89,7 @@ func (cfg *Config) Validate() error { return nil } -func (cfg *Config) session(name string) (*gocql.Session, error) { +func (cfg *Config) session(name, purpose string, registerer prometheus.Registerer) (*gocql.Session, error) { consistency, err := gocql.ParseConsistencyWrapper(cfg.Consistency) if err != nil { return nil, errors.WithStack(err) @@ -107,7 +107,7 @@ func (cfg *Config) session(name string) (*gocql.Session, error) { cluster.NumConns = cfg.NumConnections cluster.Logger = log.With(pkgutil.Logger, "module", "gocql", "client", name) cluster.Registerer = prometheus.WrapRegistererWith( - prometheus.Labels{"client": name}, prometheus.DefaultRegisterer) + prometheus.Labels{"client": name, "purpose": purpose}, registerer) if cfg.Retries > 0 { cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{ NumRetries: cfg.Retries, @@ -222,15 +222,15 @@ type StorageClient struct { } // NewStorageClient returns a new StorageClient. -func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (*StorageClient, error) { +func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (*StorageClient, error) { pkgutil.WarnExperimentalUse("Cassandra Backend") - readSession, err := cfg.session("index-read") + readSession, err := cfg.session("index-read", purpose, registerer) if err != nil { return nil, errors.WithStack(err) } - writeSession, err := cfg.session("index-write") + writeSession, err := cfg.session("index-write", purpose, registerer) if err != nil { return nil, errors.WithStack(err) } @@ -407,15 +407,15 @@ type ObjectClient struct { } // NewObjectClient returns a new ObjectClient. -func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig) (*ObjectClient, error) { +func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (*ObjectClient, error) { pkgutil.WarnExperimentalUse("Cassandra Backend") - readSession, err := cfg.session("chunks-read") + readSession, err := cfg.session("chunks-read", purpose, registerer) if err != nil { return nil, errors.WithStack(err) } - writeSession, err := cfg.session("chunks-write") + writeSession, err := cfg.session("chunks-write", purpose, registerer) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/chunk/cassandra/storage_client_test.go b/pkg/chunk/cassandra/storage_client_test.go index 83c4ed17189..dbd27bbcf98 100644 --- a/pkg/chunk/cassandra/storage_client_test.go +++ b/pkg/chunk/cassandra/storage_client_test.go @@ -1,9 +1,11 @@ package cassandra import ( + "os" "testing" "github.com/gocql/gocql" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -79,3 +81,86 @@ func TestConfig_setClusterConfig_authWithPasswordAndPasswordFile(t *testing.T) { } assert.Error(t, cfg.Validate()) } + +type sessionDetails struct { + name, purpose string +} + +func TestConfig_Session(t *testing.T) { + addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES") + if addresses == "" { + return + } + + var cfg Config + flagext.DefaultValues(&cfg) + cfg.Addresses = addresses + cfg.Keyspace = "test" + cfg.Consistency = "QUORUM" + cfg.ReplicationFactor = 1 + + for _, tc := range []struct { + name string + sessionDetails1 sessionDetails + sessionDetails2 sessionDetails + panicsWithErr string + }{ + { + name: "same name and purpose", + sessionDetails1: sessionDetails{ + name: "session-test", + purpose: "foo", + }, + sessionDetails2: sessionDetails{ + name: "session-test", + purpose: "foo", + }, + panicsWithErr: "duplicate metrics collector registration attempted", + }, + { + name: "same name and different purposes", + sessionDetails1: sessionDetails{ + name: "session-test", + purpose: "foo", + }, + sessionDetails2: sessionDetails{ + name: "session-test", + purpose: "bar", + }, + }, + { + name: "same name and empty purpose", + sessionDetails1: sessionDetails{ + name: "session-test", + }, + sessionDetails2: sessionDetails{ + name: "session-test", + }, + panicsWithErr: "duplicate metrics collector registration attempted", + }, + } { + t.Run(tc.name, func(t *testing.T) { + registerer := prometheus.NewRegistry() + + session1, err := cfg.session(tc.sessionDetails1.name, tc.sessionDetails1.purpose, registerer) + require.NoError(t, err) + + defer func() { + session1.Close() + }() + + if tc.panicsWithErr != "" { + require.PanicsWithError(t, tc.panicsWithErr, func() { + _, _ = cfg.session(tc.sessionDetails2.name, tc.sessionDetails2.purpose, registerer) + }) + } else { + session2, err := cfg.session(tc.sessionDetails2.name, tc.sessionDetails2.purpose, registerer) + require.NoError(t, err) + + defer func() { + session2.Close() + }() + } + }) + } +} diff --git a/pkg/chunk/cassandra/table_client.go b/pkg/chunk/cassandra/table_client.go index ee242e354c7..a27fb090cdd 100644 --- a/pkg/chunk/cassandra/table_client.go +++ b/pkg/chunk/cassandra/table_client.go @@ -6,6 +6,7 @@ import ( "github.com/gocql/gocql" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/chunk" ) @@ -16,8 +17,8 @@ type tableClient struct { } // NewTableClient returns a new TableClient. -func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { - session, err := cfg.session("table-manager") +func NewTableClient(ctx context.Context, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) { + session, err := cfg.session("table-manager", "", registerer) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index 2ffc73714ad..3b268f87d26 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -49,7 +49,7 @@ type DayTime struct { // MarshalYAML implements yaml.Marshaller. func (d DayTime) MarshalYAML() (interface{}, error) { - return d.Time.Time().Format("2006-01-02"), nil + return d.String(), nil } // UnmarshalYAML implements yaml.Unmarshaller. @@ -66,6 +66,10 @@ func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } +func (d *DayTime) String() string { + return d.Time.Time().Format("2006-01-02") +} + // SchemaConfig contains the config for our chunk index schemas type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 8d87a7d34f3..2e3d2083805 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -153,7 +153,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf stores := chunk.NewCompositeStore(cacheGenNumLoader) for _, s := range schemaCfg.Configs { - index, err := NewIndexClient(s.IndexType, cfg, schemaCfg) + index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, s.From.String(), reg) if err != nil { return nil, errors.Wrap(err, "error creating index client") } @@ -163,7 +163,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf if objectStoreType == "" { objectStoreType = s.IndexType } - chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg) + chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, s.From.String(), reg) if err != nil { return nil, errors.Wrap(err, "error creating object client") } @@ -180,7 +180,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf } // NewIndexClient makes a new index client of the desired type. -func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { +func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (chunk.IndexClient, error) { if indexClientFactory, ok := customIndexStores[name]; ok { if indexClientFactory.indexClientFactoryFunc != nil { return indexClientFactory.indexClientFactoryFunc() @@ -208,7 +208,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun cfg.GCPStorageConfig.DistributeKeys = true return gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": - return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) + return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg, purpose, registerer) case "boltdb": return local.NewBoltDBIndexClient(cfg.BoltDBConfig) case "grpc-store": @@ -219,7 +219,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun } // NewChunkClient makes a new chunk.Client of the desired types. -func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.Client, error) { +func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (chunk.Client, error) { switch name { case "inmemory": return chunk.NewMockStorage(), nil @@ -245,7 +245,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun case "swift": return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, chunk.DirDelim)) case "cassandra": - return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg) + return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, purpose, registerer) case "filesystem": store, err := local.NewFSObjectClient(cfg.FSConfig) if err != nil { @@ -267,7 +267,7 @@ func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client, } // NewTableClient makes a new table client based on the configuration. -func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { +func NewTableClient(name string, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) { if indexClientFactory, ok := customIndexStores[name]; ok { if indexClientFactory.tableClientFactoryFunc != nil { return indexClientFactory.tableClientFactoryFunc() @@ -289,7 +289,7 @@ func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed": return gcp.NewTableClient(context.Background(), cfg.GCPStorageConfig) case "cassandra": - return cassandra.NewTableClient(context.Background(), cfg.CassandraStorageConfig) + return cassandra.NewTableClient(context.Background(), cfg.CassandraStorageConfig, registerer) case "boltdb": return local.NewTableClient(cfg.BoltDBConfig.Directory) case "grpc-store": diff --git a/pkg/chunk/storage/factory_test.go b/pkg/chunk/storage/factory_test.go index 0b16dc7b2a2..475da18a95a 100644 --- a/pkg/chunk/storage/factory_test.go +++ b/pkg/chunk/storage/factory_test.go @@ -5,11 +5,14 @@ import ( "os" "reflect" "testing" + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/cassandra" "github.com/cortexproject/cortex/pkg/chunk/local" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" @@ -135,7 +138,7 @@ func TestCustomIndexClient(t *testing.T) { RegisterIndexStore(tc.indexClientName, tc.indexClientFactories.indexClientFactoryFunc, tc.indexClientFactories.tableClientFactoryFunc) } - indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg) + indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, "test", nil) if tc.errorExpected { require.Error(t, err) } else { @@ -143,7 +146,7 @@ func TestCustomIndexClient(t *testing.T) { require.Equal(t, tc.expectedIndexClientType, reflect.TypeOf(indexClient)) } - tableClient, err := NewTableClient(tc.indexClientName, cfg) + tableClient, err := NewTableClient(tc.indexClientName, cfg, nil) if tc.errorExpected { require.Error(t, err) } else { @@ -154,6 +157,45 @@ func TestCustomIndexClient(t *testing.T) { } } +func TestCassandraInMultipleSchemas(t *testing.T) { + addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES") + if addresses == "" { + return + } + + // cassandra config + var cassandraCfg cassandra.Config + flagext.DefaultValues(&cassandraCfg) + cassandraCfg.Addresses = addresses + cassandraCfg.Keyspace = "test" + cassandraCfg.Consistency = "QUORUM" + cassandraCfg.ReplicationFactor = 1 + + // build schema with cassandra in multiple periodic configs + schemaCfg := chunk.DefaultSchemaConfig("cassandra", "v1", model.Now().Add(-7*24*time.Hour)) + newSchemaCfg := schemaCfg.Configs[0] + newSchemaCfg.Schema = "v2" + newSchemaCfg.From = chunk.DayTime{Time: model.Now()} + + schemaCfg.Configs = append(schemaCfg.Configs, newSchemaCfg) + + var ( + cfg Config + storeConfig chunk.StoreConfig + defaults validation.Limits + ) + flagext.DefaultValues(&cfg, &storeConfig, &defaults) + cfg.CassandraStorageConfig = cassandraCfg + + limits, err := validation.NewOverrides(defaults, nil) + require.NoError(t, err) + + store, err := NewStore(cfg, storeConfig, schemaCfg, limits, prometheus.NewRegistry(), nil) + require.NoError(t, err) + + store.Stop() +} + // useful for cleaning up state after tests func unregisterAllCustomIndexStores() { customIndexStores = map[string]indexStoreFactories{} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 7100608d853..964d7aed1c4 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -329,7 +329,7 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { } var indexClient chunk.IndexClient - indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema) + indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema, "delete-requests", prometheus.DefaultRegisterer) if err != nil { return } @@ -425,7 +425,7 @@ func (t *Cortex) initTableManager() (services.Service, error) { os.Exit(1) } - tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage) + tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -435,9 +435,14 @@ func (t *Cortex) initTableManager() (services.Service, error) { var extraTables []chunk.ExtraTables if t.Cfg.PurgerConfig.Enable { - deleteStoreTableClient, err := storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage) - if err != nil { - return nil, err + var deleteStoreTableClient chunk.TableClient + if lastConfig.IndexType == t.Cfg.Storage.DeleteStoreConfig.Store { + deleteStoreTableClient = tableClient + } else { + deleteStoreTableClient, err = storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } } extraTables = append(extraTables, chunk.ExtraTables{TableClient: deleteStoreTableClient, Tables: t.Cfg.Storage.DeleteStoreConfig.GetTables()}) From 0a2e2f2d34d60821a5b7aca8af9ddd3a4b4280f5 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 24 Jun 2020 14:18:58 +0530 Subject: [PATCH 2/7] update changelog Signed-off-by: Sandeep Sukhani --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d11817d6397..02d7325d8e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ * [BUGFIX] Ingester: Flushing chunks via `/flush` endpoint could previously lead to panic, if chunks were already flushed before and then removed from memory during the flush caused by `/flush` handler. Immediate flush now doesn't cause chunks to be flushed again. Samples received during flush triggered via `/flush` handler are no longer discarded. #2778 * [BUGFIX] Prometheus upgraded. #2849 * Fixed unknown symbol error during head compaction +* [BUGFIX] Fix panic when using cassandra as store for both index and delete requests. #2774 ## 1.2.0 / 2020-07-01 From 361fdd7a420cf2aae8da3d4919760c37249b1fcc Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 1 Jul 2020 12:04:08 +0530 Subject: [PATCH 3/7] passing registerer wrapped with purpose to index client factory functions Signed-off-by: Sandeep Sukhani --- pkg/chunk/cassandra/fixtures.go | 5 +- pkg/chunk/cassandra/storage_client.go | 16 ++-- pkg/chunk/cassandra/storage_client_test.go | 85 ---------------------- pkg/chunk/cassandra/table_client.go | 2 +- pkg/chunk/storage/factory.go | 15 ++-- pkg/chunk/storage/factory_test.go | 2 +- pkg/cortex/modules.go | 4 +- 7 files changed, 24 insertions(+), 105 deletions(-) diff --git a/pkg/chunk/cassandra/fixtures.go b/pkg/chunk/cassandra/fixtures.go index a5bba381d7b..b55cb16af5f 100644 --- a/pkg/chunk/cassandra/fixtures.go +++ b/pkg/chunk/cassandra/fixtures.go @@ -34,14 +34,13 @@ func (f *fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient, // Get a SchemaConfig with the defaults. schemaConfig := testutils.DefaultSchemaConfig("cassandra") - schemaStart := schemaConfig.Configs[0].From.String() - storageClient, err := NewStorageClient(cfg, schemaConfig, schemaStart, nil) + storageClient, err := NewStorageClient(cfg, schemaConfig, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } - objectClient, err := NewObjectClient(cfg, schemaConfig, schemaStart, nil) + objectClient, err := NewObjectClient(cfg, schemaConfig, nil) if err != nil { return nil, nil, nil, schemaConfig, nil, err } diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index 1e3e5a4267a..a19a5b8ef88 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -89,7 +89,7 @@ func (cfg *Config) Validate() error { return nil } -func (cfg *Config) session(name, purpose string, registerer prometheus.Registerer) (*gocql.Session, error) { +func (cfg *Config) session(name string, registerer prometheus.Registerer) (*gocql.Session, error) { consistency, err := gocql.ParseConsistencyWrapper(cfg.Consistency) if err != nil { return nil, errors.WithStack(err) @@ -107,7 +107,7 @@ func (cfg *Config) session(name, purpose string, registerer prometheus.Registere cluster.NumConns = cfg.NumConnections cluster.Logger = log.With(pkgutil.Logger, "module", "gocql", "client", name) cluster.Registerer = prometheus.WrapRegistererWith( - prometheus.Labels{"client": name, "purpose": purpose}, registerer) + prometheus.Labels{"client": name}, registerer) if cfg.Retries > 0 { cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{ NumRetries: cfg.Retries, @@ -222,15 +222,15 @@ type StorageClient struct { } // NewStorageClient returns a new StorageClient. -func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (*StorageClient, error) { +func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*StorageClient, error) { pkgutil.WarnExperimentalUse("Cassandra Backend") - readSession, err := cfg.session("index-read", purpose, registerer) + readSession, err := cfg.session("index-read", registerer) if err != nil { return nil, errors.WithStack(err) } - writeSession, err := cfg.session("index-write", purpose, registerer) + writeSession, err := cfg.session("index-write", registerer) if err != nil { return nil, errors.WithStack(err) } @@ -407,15 +407,15 @@ type ObjectClient struct { } // NewObjectClient returns a new ObjectClient. -func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (*ObjectClient, error) { +func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*ObjectClient, error) { pkgutil.WarnExperimentalUse("Cassandra Backend") - readSession, err := cfg.session("chunks-read", purpose, registerer) + readSession, err := cfg.session("chunks-read", registerer) if err != nil { return nil, errors.WithStack(err) } - writeSession, err := cfg.session("chunks-write", purpose, registerer) + writeSession, err := cfg.session("chunks-write", registerer) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/chunk/cassandra/storage_client_test.go b/pkg/chunk/cassandra/storage_client_test.go index dbd27bbcf98..83c4ed17189 100644 --- a/pkg/chunk/cassandra/storage_client_test.go +++ b/pkg/chunk/cassandra/storage_client_test.go @@ -1,11 +1,9 @@ package cassandra import ( - "os" "testing" "github.com/gocql/gocql" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -81,86 +79,3 @@ func TestConfig_setClusterConfig_authWithPasswordAndPasswordFile(t *testing.T) { } assert.Error(t, cfg.Validate()) } - -type sessionDetails struct { - name, purpose string -} - -func TestConfig_Session(t *testing.T) { - addresses := os.Getenv("CASSANDRA_TEST_ADDRESSES") - if addresses == "" { - return - } - - var cfg Config - flagext.DefaultValues(&cfg) - cfg.Addresses = addresses - cfg.Keyspace = "test" - cfg.Consistency = "QUORUM" - cfg.ReplicationFactor = 1 - - for _, tc := range []struct { - name string - sessionDetails1 sessionDetails - sessionDetails2 sessionDetails - panicsWithErr string - }{ - { - name: "same name and purpose", - sessionDetails1: sessionDetails{ - name: "session-test", - purpose: "foo", - }, - sessionDetails2: sessionDetails{ - name: "session-test", - purpose: "foo", - }, - panicsWithErr: "duplicate metrics collector registration attempted", - }, - { - name: "same name and different purposes", - sessionDetails1: sessionDetails{ - name: "session-test", - purpose: "foo", - }, - sessionDetails2: sessionDetails{ - name: "session-test", - purpose: "bar", - }, - }, - { - name: "same name and empty purpose", - sessionDetails1: sessionDetails{ - name: "session-test", - }, - sessionDetails2: sessionDetails{ - name: "session-test", - }, - panicsWithErr: "duplicate metrics collector registration attempted", - }, - } { - t.Run(tc.name, func(t *testing.T) { - registerer := prometheus.NewRegistry() - - session1, err := cfg.session(tc.sessionDetails1.name, tc.sessionDetails1.purpose, registerer) - require.NoError(t, err) - - defer func() { - session1.Close() - }() - - if tc.panicsWithErr != "" { - require.PanicsWithError(t, tc.panicsWithErr, func() { - _, _ = cfg.session(tc.sessionDetails2.name, tc.sessionDetails2.purpose, registerer) - }) - } else { - session2, err := cfg.session(tc.sessionDetails2.name, tc.sessionDetails2.purpose, registerer) - require.NoError(t, err) - - defer func() { - session2.Close() - }() - } - }) - } -} diff --git a/pkg/chunk/cassandra/table_client.go b/pkg/chunk/cassandra/table_client.go index a27fb090cdd..fc269e26409 100644 --- a/pkg/chunk/cassandra/table_client.go +++ b/pkg/chunk/cassandra/table_client.go @@ -18,7 +18,7 @@ type tableClient struct { // NewTableClient returns a new TableClient. func NewTableClient(ctx context.Context, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) { - session, err := cfg.session("table-manager", "", registerer) + session, err := cfg.session("table-manager", registerer) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 2e3d2083805..1572fee2f16 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -153,7 +153,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf stores := chunk.NewCompositeStore(cacheGenNumLoader) for _, s := range schemaCfg.Configs { - index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, s.From.String(), reg) + reg := prometheus.WrapRegistererWith( + prometheus.Labels{"purpose": s.From.String()}, reg) + + index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, reg) if err != nil { return nil, errors.Wrap(err, "error creating index client") } @@ -163,7 +166,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf if objectStoreType == "" { objectStoreType = s.IndexType } - chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, s.From.String(), reg) + chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, reg) if err != nil { return nil, errors.Wrap(err, "error creating object client") } @@ -180,7 +183,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf } // NewIndexClient makes a new index client of the desired type. -func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (chunk.IndexClient, error) { +func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) { if indexClientFactory, ok := customIndexStores[name]; ok { if indexClientFactory.indexClientFactoryFunc != nil { return indexClientFactory.indexClientFactoryFunc() @@ -208,7 +211,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpo cfg.GCPStorageConfig.DistributeKeys = true return gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": - return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg, purpose, registerer) + return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg, registerer) case "boltdb": return local.NewBoltDBIndexClient(cfg.BoltDBConfig) case "grpc-store": @@ -219,7 +222,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpo } // NewChunkClient makes a new chunk.Client of the desired types. -func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpose string, registerer prometheus.Registerer) (chunk.Client, error) { +func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (chunk.Client, error) { switch name { case "inmemory": return chunk.NewMockStorage(), nil @@ -245,7 +248,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, purpo case "swift": return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, chunk.DirDelim)) case "cassandra": - return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, purpose, registerer) + return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer) case "filesystem": store, err := local.NewFSObjectClient(cfg.FSConfig) if err != nil { diff --git a/pkg/chunk/storage/factory_test.go b/pkg/chunk/storage/factory_test.go index 475da18a95a..e2b09c0e0a2 100644 --- a/pkg/chunk/storage/factory_test.go +++ b/pkg/chunk/storage/factory_test.go @@ -138,7 +138,7 @@ func TestCustomIndexClient(t *testing.T) { RegisterIndexStore(tc.indexClientName, tc.indexClientFactories.indexClientFactoryFunc, tc.indexClientFactories.tableClientFactoryFunc) } - indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, "test", nil) + indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, nil) if tc.errorExpected { require.Error(t, err) } else { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 964d7aed1c4..bc648a0ee77 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -329,7 +329,9 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { } var indexClient chunk.IndexClient - indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema, "delete-requests", prometheus.DefaultRegisterer) + reg := prometheus.WrapRegistererWith( + prometheus.Labels{"purpose": "delete-requests"}, prometheus.DefaultRegisterer) + indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema, reg) if err != nil { return } From b69195a7e0803a0462b64ad7adb3af6ea93ddc32 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 1 Jul 2020 13:26:43 +0530 Subject: [PATCH 4/7] add registerer for metrics in dynamodb clients Signed-off-by: Sandeep Sukhani --- pkg/chunk/aws/dynamodb_storage_client.go | 98 ++++++----------------- pkg/chunk/aws/dynamodb_table_client.go | 31 +++---- pkg/chunk/aws/fixtures.go | 4 + pkg/chunk/aws/metrics.go | 60 ++++++++++++++ pkg/chunk/aws/metrics_autoscaling.go | 2 +- pkg/chunk/aws/metrics_autoscaling_test.go | 2 + pkg/chunk/storage/factory.go | 6 +- 7 files changed, 113 insertions(+), 90 deletions(-) create mode 100644 pkg/chunk/aws/metrics.go diff --git a/pkg/chunk/aws/dynamodb_storage_client.go b/pkg/chunk/aws/dynamodb_storage_client.go index 126de4e209d..ff5a0ae0499 100644 --- a/pkg/chunk/aws/dynamodb_storage_client.go +++ b/pkg/chunk/aws/dynamodb_storage_client.go @@ -50,55 +50,6 @@ const ( validationException = "ValidationException" ) -var ( - dynamoRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "dynamo_request_duration_seconds", - Help: "Time spent doing DynamoDB requests.", - - // DynamoDB latency seems to range from a few ms to a several seconds and is - // important. So use 9 buckets from 1ms to just over 1 minute (65s). - Buckets: prometheus.ExponentialBuckets(0.001, 4, 9), - }, []string{"operation", "status_code"})) - dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_consumed_capacity_total", - Help: "The capacity units consumed by operation.", - }, []string{"operation", tableNameLabel}) - dynamoThrottled = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_throttled_total", - Help: "The total number of throttled events.", - }, []string{"operation", tableNameLabel}) - dynamoFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_failures_total", - Help: "The total number of errors while storing chunks to the chunk store.", - }, []string{tableNameLabel, errorReasonLabel, "operation"}) - dynamoDroppedRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "dynamo_dropped_requests_total", - Help: "The total number of requests which were dropped due to errors encountered from dynamo.", - }, []string{tableNameLabel, errorReasonLabel, "operation"}) - dynamoQueryPagesCount = prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "dynamo_query_pages_count", - Help: "Number of pages per query.", - // Most queries will have one page, however this may increase with fuzzy - // metric names. - Buckets: prometheus.ExponentialBuckets(1, 4, 6), - }) -) - -func init() { - dynamoRequestDuration.Register() - prometheus.MustRegister(dynamoConsumedCapacity) - prometheus.MustRegister(dynamoThrottled) - prometheus.MustRegister(dynamoFailures) - prometheus.MustRegister(dynamoQueryPagesCount) - prometheus.MustRegister(dynamoDroppedRequests) -} - // DynamoDBConfig specifies config for a DynamoDB database. type DynamoDBConfig struct { DynamoDB flagext.URLValue `yaml:"dynamodb_url"` @@ -148,20 +99,22 @@ type dynamoDBStorageClient struct { // of boilerplate. batchGetItemRequestFn func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest + + metrics *metrics } // NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient. -func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - return newDynamoDBStorageClient(cfg, schemaCfg) +func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.IndexClient, error) { + return newDynamoDBStorageClient(cfg, schemaCfg, reg) } // NewDynamoDBChunkClient makes a new DynamoDB-backed chunk.Client. -func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.Client, error) { - return newDynamoDBStorageClient(cfg, schemaCfg) +func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.Client, error) { + return newDynamoDBStorageClient(cfg, schemaCfg, reg) } // newDynamoDBStorageClient makes a new DynamoDB-backed IndexClient and chunk.Client. -func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (*dynamoDBStorageClient, error) { +func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (*dynamoDBStorageClient, error) { dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) if err != nil { return nil, err @@ -172,6 +125,7 @@ func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) schemaCfg: schemaCfg, DynamoDB: dynamoDB, writeThrottle: rate.NewLimiter(rate.Limit(cfg.ThrottleLimit), dynamoDBMaxWriteBatchSize), + metrics: newMetrics(reg), } client.batchGetItemRequestFn = client.batchGetItemRequest client.batchWriteItemRequestFn = client.batchWriteItemRequest @@ -187,7 +141,7 @@ func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch { return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) } -func logWriteRetry(ctx context.Context, unprocessed dynamoDBWriteBatch) { +func logWriteRetry(unprocessed dynamoDBWriteBatch, dynamoThrottled *prometheus.CounterVec) { for table, reqs := range unprocessed { dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs))) for _, req := range reqs { @@ -225,25 +179,25 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), }) - err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return request.Send() }) resp := request.Data().(*dynamodb.BatchWriteItemOutput) for _, cc := range resp.ConsumedCapacity { - dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName). + a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName). Add(float64(*cc.CapacityUnits)) } if err != nil { for tableName := range requests { - recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem") + recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics.dynamoFailures) } // If we get provisionedThroughputExceededException, then no items were processed, // so back off and retry all. if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { - logWriteRetry(ctx, requests) + logWriteRetry(requests, a.metrics.dynamoThrottled) unprocessed.TakeReqs(requests, -1) _ = a.writeThrottle.WaitN(ctx, len(requests)) backoff.Wait() @@ -256,7 +210,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context // to determine if a request was dropped (or not) for tableName := range requests { - dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc() + a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc() } continue } @@ -268,7 +222,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write // If there are unprocessed items, retry those items. unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems) if len(unprocessedItems) > 0 { - logWriteRetry(ctx, unprocessedItems) + logWriteRetry(unprocessedItems, a.metrics.dynamoThrottled) _ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len()) unprocessed.TakeReqs(unprocessedItems, -1) } @@ -329,11 +283,11 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery pageCount := 0 defer func() { - dynamoQueryPagesCount.Observe(float64(pageCount)) + a.metrics.dynamoQueryPagesCount.Observe(float64(pageCount)) }() retryer := newRetryer(ctx, a.cfg.backoffConfig) - err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error { + err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error { if sp := ot.SpanFromContext(innerCtx); sp != nil { sp.SetTag("tableName", query.TableName) sp.SetTag("hashValue", query.HashValue) @@ -345,12 +299,12 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery } if cc := output.ConsumedCapacity; cc != nil { - dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName). + a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName). Add(float64(*cc.CapacityUnits)) } return callback(query, &dynamoDBReadResponse{items: output.Items}) - }, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages")) + }, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics.dynamoFailures)) }) if err != nil { return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName) @@ -481,19 +435,19 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), }) - err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return request.Send() }) response := request.Data().(*dynamodb.BatchGetItemOutput) for _, cc := range response.ConsumedCapacity { - dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName). + a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName). Add(float64(*cc.CapacityUnits)) } if err != nil { for tableName := range requests { - recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages") + recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics.dynamoFailures) } // If we get provisionedThroughputExceededException, then no items were processed, @@ -509,7 +463,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c // recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context // to determine if a request was dropped (or not) for tableName := range requests { - dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc() + a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc() } continue } @@ -792,17 +746,17 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) { } } -func withErrorHandler(tableName, operation string) func(req *request.Request) { +func withErrorHandler(tableName, operation string, dynamoFailures *prometheus.CounterVec) func(req *request.Request) { return func(req *request.Request) { req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) { if req.Error != nil { - recordDynamoError(tableName, req.Error, operation) + recordDynamoError(tableName, req.Error, operation, dynamoFailures) } }) } } -func recordDynamoError(tableName string, err error, operation string) { +func recordDynamoError(tableName string, err error, operation string, dynamoFailures *prometheus.CounterVec) { if awsErr, ok := err.(awserr.Error); ok { dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1)) } else { diff --git a/pkg/chunk/aws/dynamodb_table_client.go b/pkg/chunk/aws/dynamodb_table_client.go index 8d8571f7adf..a0689004640 100644 --- a/pkg/chunk/aws/dynamodb_table_client.go +++ b/pkg/chunk/aws/dynamodb_table_client.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/instrument" "golang.org/x/time/rate" @@ -35,10 +36,11 @@ type dynamoTableClient struct { DynamoDB dynamodbiface.DynamoDBAPI callManager callManager autoscale autoscale + metrics *metrics } // NewDynamoDBTableClient makes a new DynamoTableClient. -func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { +func NewDynamoDBTableClient(cfg DynamoDBConfig, reg prometheus.Registerer) (chunk.TableClient, error) { dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL) if err != nil { return nil, err @@ -51,7 +53,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { var autoscale autoscale if cfg.Metrics.URL != "" { - autoscale, err = newMetrics(cfg) + autoscale, err = newAutoScale(cfg) if err != nil { return nil, err } @@ -61,6 +63,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (chunk.TableClient, error) { DynamoDB: dynamoDB, callManager: callManager, autoscale: autoscale, + metrics: newMetrics(reg), }, nil } @@ -95,7 +98,7 @@ func (d callManager) backoffAndRetry(ctx context.Context, fn func(context.Contex func (d dynamoTableClient) ListTables(ctx context.Context) ([]string, error) { table := []string{} err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.ListTablesPages", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.ListTablesPages", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { return d.DynamoDB.ListTablesPagesWithContext(ctx, &dynamodb.ListTablesInput{}, func(resp *dynamodb.ListTablesOutput, _ bool) bool { for _, s := range resp.TableNames { table = append(table, *s) @@ -121,7 +124,7 @@ func chunkTagsToDynamoDB(ts chunk.Tags) []*dynamodb.Tag { func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error { var tableARN *string if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.CreateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.CreateTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { input := &dynamodb.CreateTableInput{ TableName: aws.String(desc.Name), AttributeDefinitions: []*dynamodb.AttributeDefinition{ @@ -179,7 +182,7 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc tags := chunkTagsToDynamoDB(desc.Tags) if len(tags) > 0 { return d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := d.DynamoDB.TagResourceWithContext(ctx, &dynamodb.TagResourceInput{ ResourceArn: tableARN, Tags: tags, @@ -196,7 +199,7 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc func (d dynamoTableClient) DeleteTable(ctx context.Context, name string) error { if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.DeleteTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.DeleteTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { input := &dynamodb.DeleteTableInput{TableName: aws.String(name)} _, err := d.DynamoDB.DeleteTableWithContext(ctx, input) if err != nil { @@ -215,7 +218,7 @@ func (d dynamoTableClient) DeleteTable(ctx context.Context, name string) error { func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc chunk.TableDesc, isActive bool, err error) { var tableARN *string err = d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { out, err := d.DynamoDB.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(name), }) @@ -248,7 +251,7 @@ func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc } err = d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.ListTagsOfResource", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.ListTagsOfResource", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { out, err := d.DynamoDB.ListTagsOfResourceWithContext(ctx, &dynamodb.ListTagsOfResourceInput{ ResourceArn: tableARN, }) @@ -300,7 +303,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch !expected.UseOnDemandIOMode { level.Info(util.Logger).Log("msg", "updating provisioned throughput on table", "table", expected.Name, "old_read", current.ProvisionedRead, "old_write", current.ProvisionedWrite, "new_read", expected.ProvisionedRead, "new_write", expected.ProvisionedWrite) if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { var dynamoBillingMode string updateTableInput := &dynamodb.UpdateTableInput{TableName: aws.String(expected.Name), ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ @@ -320,7 +323,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch return err }) }); err != nil { - recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable") + recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics.dynamoFailures) if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" { level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err) } else { @@ -331,14 +334,14 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch // moved the enabling of OnDemand mode to it's own block to reduce complexities & interactions with the various // settings used in provisioned mode. Unfortunately the boilerplate wrappers for retry and tracking needed to be copied. if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.UpdateTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { level.Info(util.Logger).Log("msg", "updating billing mode on table", "table", expected.Name, "old_mode", current.UseOnDemandIOMode, "new_mode", expected.UseOnDemandIOMode) updateTableInput := &dynamodb.UpdateTableInput{TableName: aws.String(expected.Name), BillingMode: aws.String(dynamodb.BillingModePayPerRequest)} _, err := d.DynamoDB.UpdateTableWithContext(ctx, updateTableInput) return err }) }); err != nil { - recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable") + recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics.dynamoFailures) if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" { level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err) } else { @@ -350,7 +353,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch if !current.Tags.Equals(expected.Tags) { var tableARN *string if err := d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.DescribeTable", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { out, err := d.DynamoDB.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(expected.Name), }) @@ -367,7 +370,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch } return d.backoffAndRetry(ctx, func(ctx context.Context) error { - return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + return instrument.CollectedRequest(ctx, "DynamoDB.TagResource", d.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { _, err := d.DynamoDB.TagResourceWithContext(ctx, &dynamodb.TagResourceInput{ ResourceArn: tableARN, Tags: chunkTagsToDynamoDB(expected.Tags), diff --git a/pkg/chunk/aws/fixtures.go b/pkg/chunk/aws/fixtures.go index 0d23d9ee31e..24ab8cde736 100644 --- a/pkg/chunk/aws/fixtures.go +++ b/pkg/chunk/aws/fixtures.go @@ -35,12 +35,14 @@ var Fixtures = []testutils.Fixture{ dynamoDB := newMockDynamoDB(0, 0) table := &dynamoTableClient{ DynamoDB: dynamoDB, + metrics: newMetrics(nil), } index := &dynamoDBStorageClient{ DynamoDB: dynamoDB, batchGetItemRequestFn: dynamoDB.batchGetItemRequest, batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest, schemaCfg: schemaConfig, + metrics: newMetrics(nil), } object := objectclient.NewClient(&S3ObjectClient{ S3: newMockS3(), @@ -68,6 +70,7 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) testutils.Fix schemaCfg := testutils.DefaultSchemaConfig("aws") table := &dynamoTableClient{ DynamoDB: dynamoDB, + metrics: newMetrics(nil), } storage := &dynamoDBStorageClient{ cfg: DynamoDBConfig{ @@ -84,6 +87,7 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) testutils.Fix batchGetItemRequestFn: dynamoDB.batchGetItemRequest, batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest, schemaCfg: schemaCfg, + metrics: newMetrics(nil), } return storage, storage, table, schemaCfg, testutils.CloserFunc(func() error { table.Stop() diff --git a/pkg/chunk/aws/metrics.go b/pkg/chunk/aws/metrics.go new file mode 100644 index 00000000000..f04501a8e5e --- /dev/null +++ b/pkg/chunk/aws/metrics.go @@ -0,0 +1,60 @@ +package aws + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" +) + +type metrics struct { + dynamoRequestDuration *instrument.HistogramCollector + dynamoConsumedCapacity *prometheus.CounterVec + dynamoThrottled *prometheus.CounterVec + dynamoFailures *prometheus.CounterVec + dynamoDroppedRequests *prometheus.CounterVec + dynamoQueryPagesCount prometheus.Histogram +} + +func newMetrics(r prometheus.Registerer) *metrics { + m := metrics{} + + m.dynamoRequestDuration = instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "dynamo_request_duration_seconds", + Help: "Time spent doing DynamoDB requests.", + + // DynamoDB latency seems to range from a few ms to a several seconds and is + // important. So use 9 buckets from 1ms to just over 1 minute (65s). + Buckets: prometheus.ExponentialBuckets(0.001, 4, 9), + }, []string{"operation", "status_code"})) + m.dynamoConsumedCapacity = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_consumed_capacity_total", + Help: "The capacity units consumed by operation.", + }, []string{"operation", tableNameLabel}) + m.dynamoThrottled = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_throttled_total", + Help: "The total number of throttled events.", + }, []string{"operation", tableNameLabel}) + m.dynamoFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_failures_total", + Help: "The total number of errors while storing chunks to the chunk store.", + }, []string{tableNameLabel, errorReasonLabel, "operation"}) + m.dynamoDroppedRequests = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "dynamo_dropped_requests_total", + Help: "The total number of requests which were dropped due to errors encountered from dynamo.", + }, []string{tableNameLabel, errorReasonLabel, "operation"}) + m.dynamoQueryPagesCount = promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "dynamo_query_pages_count", + Help: "Number of pages per query.", + // Most queries will have one page, however this may increase with fuzzy + // metric names. + Buckets: prometheus.ExponentialBuckets(1, 4, 6), + }) + + return &m +} diff --git a/pkg/chunk/aws/metrics_autoscaling.go b/pkg/chunk/aws/metrics_autoscaling.go index 3df859cbdcf..15b8133e8aa 100644 --- a/pkg/chunk/aws/metrics_autoscaling.go +++ b/pkg/chunk/aws/metrics_autoscaling.go @@ -78,7 +78,7 @@ type metricsData struct { readErrorRates map[string]float64 } -func newMetrics(cfg DynamoDBConfig) (*metricsData, error) { +func newAutoScale(cfg DynamoDBConfig) (*metricsData, error) { client, err := promApi.NewClient(promApi.Config{Address: cfg.Metrics.URL}) if err != nil { return nil, err diff --git a/pkg/chunk/aws/metrics_autoscaling_test.go b/pkg/chunk/aws/metrics_autoscaling_test.go index 3ebf8745694..992024e2776 100644 --- a/pkg/chunk/aws/metrics_autoscaling_test.go +++ b/pkg/chunk/aws/metrics_autoscaling_test.go @@ -139,6 +139,7 @@ func TestTableManagerMetricsAutoScaling(t *testing.T) { }, tableLastUpdated: make(map[string]time.Time), }, + metrics: newMetrics(nil), } indexWriteScale := fixtureWriteScale() @@ -299,6 +300,7 @@ func TestTableManagerMetricsReadAutoScaling(t *testing.T) { tableLastUpdated: make(map[string]time.Time), tableReadLastUpdated: make(map[string]time.Time), }, + metrics: newMetrics(nil), } indexReadScale := fixtureReadScale() diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 1572fee2f16..533853ec9f0 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -202,7 +202,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewDynamoDBIndexClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg) + return aws.NewDynamoDBIndexClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer) case "gcp": return gcp.NewStorageClientV1(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "gcp-columnkey", "bigtable": @@ -236,7 +236,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg) + return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer) case "azure": return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig, chunk.DirDelim)) case "gcp": @@ -288,7 +288,7 @@ func NewTableClient(name string, cfg Config, registerer prometheus.Registerer) ( if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewDynamoDBTableClient(cfg.AWSStorageConfig.DynamoDBConfig) + return aws.NewDynamoDBTableClient(cfg.AWSStorageConfig.DynamoDBConfig, registerer) case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed": return gcp.NewTableClient(context.Background(), cfg.GCPStorageConfig) case "cassandra": From 8a3e8ac1a7e4f2fb32d4d29fec7750d744e93603 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Mon, 6 Jul 2020 17:12:19 +0530 Subject: [PATCH 5/7] changes suggested from PR review Signed-off-by: Sandeep Sukhani --- .../aws/{metrics.go => dynamodb_metrics.go} | 6 ++--- pkg/chunk/aws/dynamodb_storage_client.go | 26 +++++++++---------- pkg/chunk/aws/dynamodb_table_client.go | 8 +++--- pkg/chunk/aws/metrics_autoscaling.go | 2 +- pkg/chunk/storage/factory.go | 12 ++++++--- pkg/cortex/modules.go | 21 ++++++++------- 6 files changed, 40 insertions(+), 35 deletions(-) rename pkg/chunk/aws/{metrics.go => dynamodb_metrics.go} (95%) diff --git a/pkg/chunk/aws/metrics.go b/pkg/chunk/aws/dynamodb_metrics.go similarity index 95% rename from pkg/chunk/aws/metrics.go rename to pkg/chunk/aws/dynamodb_metrics.go index f04501a8e5e..58533b44146 100644 --- a/pkg/chunk/aws/metrics.go +++ b/pkg/chunk/aws/dynamodb_metrics.go @@ -6,7 +6,7 @@ import ( "github.com/weaveworks/common/instrument" ) -type metrics struct { +type dynamoDBMetrics struct { dynamoRequestDuration *instrument.HistogramCollector dynamoConsumedCapacity *prometheus.CounterVec dynamoThrottled *prometheus.CounterVec @@ -15,8 +15,8 @@ type metrics struct { dynamoQueryPagesCount prometheus.Histogram } -func newMetrics(r prometheus.Registerer) *metrics { - m := metrics{} +func newMetrics(r prometheus.Registerer) *dynamoDBMetrics { + m := dynamoDBMetrics{} m.dynamoRequestDuration = instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", diff --git a/pkg/chunk/aws/dynamodb_storage_client.go b/pkg/chunk/aws/dynamodb_storage_client.go index ff5a0ae0499..b1e25ec8c7e 100644 --- a/pkg/chunk/aws/dynamodb_storage_client.go +++ b/pkg/chunk/aws/dynamodb_storage_client.go @@ -100,7 +100,7 @@ type dynamoDBStorageClient struct { batchGetItemRequestFn func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest - metrics *metrics + metrics *dynamoDBMetrics } // NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient. @@ -141,9 +141,9 @@ func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch { return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) } -func logWriteRetry(unprocessed dynamoDBWriteBatch, dynamoThrottled *prometheus.CounterVec) { +func logWriteRetry(unprocessed dynamoDBWriteBatch, metrics *dynamoDBMetrics) { for table, reqs := range unprocessed { - dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs))) + metrics.dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs))) for _, req := range reqs { item := req.PutRequest.Item var hash, rnge string @@ -191,13 +191,13 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write if err != nil { for tableName := range requests { - recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics.dynamoFailures) + recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics) } // If we get provisionedThroughputExceededException, then no items were processed, // so back off and retry all. if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { - logWriteRetry(requests, a.metrics.dynamoThrottled) + logWriteRetry(requests, a.metrics) unprocessed.TakeReqs(requests, -1) _ = a.writeThrottle.WaitN(ctx, len(requests)) backoff.Wait() @@ -222,7 +222,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write // If there are unprocessed items, retry those items. unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems) if len(unprocessedItems) > 0 { - logWriteRetry(unprocessedItems, a.metrics.dynamoThrottled) + logWriteRetry(unprocessedItems, a.metrics) _ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len()) unprocessed.TakeReqs(unprocessedItems, -1) } @@ -304,7 +304,7 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery } return callback(query, &dynamoDBReadResponse{items: output.Items}) - }, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics.dynamoFailures)) + }, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics)) }) if err != nil { return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName) @@ -447,7 +447,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c if err != nil { for tableName := range requests { - recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics.dynamoFailures) + recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics) } // If we get provisionedThroughputExceededException, then no items were processed, @@ -746,21 +746,21 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) { } } -func withErrorHandler(tableName, operation string, dynamoFailures *prometheus.CounterVec) func(req *request.Request) { +func withErrorHandler(tableName, operation string, metrics *dynamoDBMetrics) func(req *request.Request) { return func(req *request.Request) { req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) { if req.Error != nil { - recordDynamoError(tableName, req.Error, operation, dynamoFailures) + recordDynamoError(tableName, req.Error, operation, metrics) } }) } } -func recordDynamoError(tableName string, err error, operation string, dynamoFailures *prometheus.CounterVec) { +func recordDynamoError(tableName string, err error, operation string, metrics *dynamoDBMetrics) { if awsErr, ok := err.(awserr.Error); ok { - dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1)) + metrics.dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1)) } else { - dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1)) + metrics.dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1)) } } diff --git a/pkg/chunk/aws/dynamodb_table_client.go b/pkg/chunk/aws/dynamodb_table_client.go index a0689004640..a2ab3b8096b 100644 --- a/pkg/chunk/aws/dynamodb_table_client.go +++ b/pkg/chunk/aws/dynamodb_table_client.go @@ -36,7 +36,7 @@ type dynamoTableClient struct { DynamoDB dynamodbiface.DynamoDBAPI callManager callManager autoscale autoscale - metrics *metrics + metrics *dynamoDBMetrics } // NewDynamoDBTableClient makes a new DynamoTableClient. @@ -53,7 +53,7 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig, reg prometheus.Registerer) (chun var autoscale autoscale if cfg.Metrics.URL != "" { - autoscale, err = newAutoScale(cfg) + autoscale, err = newMetricsAutoScaling(cfg) if err != nil { return nil, err } @@ -323,7 +323,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch return err }) }); err != nil { - recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics.dynamoFailures) + recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics) if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" { level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err) } else { @@ -341,7 +341,7 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected ch return err }) }); err != nil { - recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics.dynamoFailures) + recordDynamoError(expected.Name, err, "DynamoDB.UpdateTable", d.metrics) if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "LimitExceededException" { level.Warn(util.Logger).Log("msg", "update limit exceeded", "err", err) } else { diff --git a/pkg/chunk/aws/metrics_autoscaling.go b/pkg/chunk/aws/metrics_autoscaling.go index 15b8133e8aa..fea098c8233 100644 --- a/pkg/chunk/aws/metrics_autoscaling.go +++ b/pkg/chunk/aws/metrics_autoscaling.go @@ -78,7 +78,7 @@ type metricsData struct { readErrorRates map[string]float64 } -func newAutoScale(cfg DynamoDBConfig) (*metricsData, error) { +func newMetricsAutoScaling(cfg DynamoDBConfig) (*metricsData, error) { client, err := promApi.NewClient(promApi.Config{Address: cfg.Metrics.URL}) if err != nil { return nil, err diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 533853ec9f0..1629ead9247 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -153,10 +153,10 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf stores := chunk.NewCompositeStore(cacheGenNumLoader) for _, s := range schemaCfg.Configs { - reg := prometheus.WrapRegistererWith( - prometheus.Labels{"purpose": s.From.String()}, reg) + indexClientReg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "index-store-" + s.From.String()}, reg) - index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, reg) + index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, indexClientReg) if err != nil { return nil, errors.Wrap(err, "error creating index client") } @@ -166,7 +166,11 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf if objectStoreType == "" { objectStoreType = s.IndexType } - chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, reg) + + chunkClientReg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "chunk-store-" + s.From.String()}, reg) + + chunks, err := NewChunkClient(objectStoreType, cfg, schemaCfg, chunkClientReg) if err != nil { return nil, errors.Wrap(err, "error creating object client") } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index bc648a0ee77..2057e9916ca 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -330,7 +330,7 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { var indexClient chunk.IndexClient reg := prometheus.WrapRegistererWith( - prometheus.Labels{"purpose": "delete-requests"}, prometheus.DefaultRegisterer) + prometheus.Labels{"component": DeleteRequestsStore}, prometheus.DefaultRegisterer) indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema, reg) if err != nil { return @@ -427,7 +427,10 @@ func (t *Cortex) initTableManager() (services.Service, error) { os.Exit(1) } - tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage, prometheus.DefaultRegisterer) + reg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "table-manager-index-chunk-" + lastConfig.From.String()}, prometheus.DefaultRegisterer) + + tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage, reg) if err != nil { return nil, err } @@ -437,14 +440,12 @@ func (t *Cortex) initTableManager() (services.Service, error) { var extraTables []chunk.ExtraTables if t.Cfg.PurgerConfig.Enable { - var deleteStoreTableClient chunk.TableClient - if lastConfig.IndexType == t.Cfg.Storage.DeleteStoreConfig.Store { - deleteStoreTableClient = tableClient - } else { - deleteStoreTableClient, err = storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, prometheus.DefaultRegisterer) - if err != nil { - return nil, err - } + reg := prometheus.WrapRegistererWith( + prometheus.Labels{"component": "table-manager-" + DeleteRequestsStore}, prometheus.DefaultRegisterer) + + deleteStoreTableClient, err := storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, reg) + if err != nil { + return nil, err } extraTables = append(extraTables, chunk.ExtraTables{TableClient: deleteStoreTableClient, Tables: t.Cfg.Storage.DeleteStoreConfig.GetTables()}) From df15535885e18567b96cadb21b10c31b70a10b1d Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 9 Jul 2020 19:50:34 +0530 Subject: [PATCH 6/7] minor nit Signed-off-by: Sandeep Sukhani --- pkg/chunk/cassandra/storage_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/chunk/cassandra/storage_client.go b/pkg/chunk/cassandra/storage_client.go index a19a5b8ef88..f6fe99a449b 100644 --- a/pkg/chunk/cassandra/storage_client.go +++ b/pkg/chunk/cassandra/storage_client.go @@ -89,7 +89,7 @@ func (cfg *Config) Validate() error { return nil } -func (cfg *Config) session(name string, registerer prometheus.Registerer) (*gocql.Session, error) { +func (cfg *Config) session(name string, reg prometheus.Registerer) (*gocql.Session, error) { consistency, err := gocql.ParseConsistencyWrapper(cfg.Consistency) if err != nil { return nil, errors.WithStack(err) @@ -107,7 +107,7 @@ func (cfg *Config) session(name string, registerer prometheus.Registerer) (*gocq cluster.NumConns = cfg.NumConnections cluster.Logger = log.With(pkgutil.Logger, "module", "gocql", "client", name) cluster.Registerer = prometheus.WrapRegistererWith( - prometheus.Labels{"client": name}, registerer) + prometheus.Labels{"client": name}, reg) if cfg.Retries > 0 { cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{ NumRetries: cfg.Retries, From 7d701efc9e26929668d13d820fbdbc146c459e8a Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 9 Jul 2020 20:25:41 +0530 Subject: [PATCH 7/7] changes suggested from PR review Signed-off-by: Sandeep Sukhani --- CHANGELOG.md | 1 + pkg/cortex/modules.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02d7325d8e8..1f3ff4e80f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [CHANGE] Experimental TSDB: the store-gateway service is required in a Cortex cluster running with the experimental blocks storage. Removed the `-experimental.tsdb.store-gateway-enabled` CLI flag and `store_gateway_enabled` YAML config option. The store-gateway is now always enabled when the storage engine is `tsdb`. #2822 * [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778 * [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826 +* [CHANGE] Add `component` label for chunk, delete and index store client metrics. #2774 * [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783 * [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783 * [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783 diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 2057e9916ca..fafbdcb5c9b 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -428,7 +428,7 @@ func (t *Cortex) initTableManager() (services.Service, error) { } reg := prometheus.WrapRegistererWith( - prometheus.Labels{"component": "table-manager-index-chunk-" + lastConfig.From.String()}, prometheus.DefaultRegisterer) + prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer) tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage, reg) if err != nil {