From 32e127622a51464ab528cae6774f81958e3fc25b Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Fri, 19 May 2023 14:04:56 -0700 Subject: [PATCH 1/2] Add ddb ring pull time config Signed-off-by: Daniel Deluiggi --- docs/blocks-storage/compactor.md | 4 ++ docs/blocks-storage/store-gateway.md | 4 ++ docs/configuration/config-file-reference.md | 28 ++++++++ pkg/ring/kv/dynamodb/client.go | 59 +++++++-------- pkg/ring/kv/dynamodb/client_test.go | 80 +++++++++++---------- 5 files changed, 108 insertions(+), 67 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 59db4162b71..cdc1c02faf1 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -217,6 +217,10 @@ compactor: # CLI flag: -compactor.ring.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -compactor.ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: compactor.ring [consul: ] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 6ee5c8cd304..4a40926a06a 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -225,6 +225,10 @@ store_gateway: # CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -store-gateway.sharding-ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: # store-gateway.sharding-ring diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6f86710e188..3175d80cd70 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -313,6 +313,10 @@ sharding_ring: # CLI flag: -alertmanager.sharding-ring.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -alertmanager.sharding-ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: alertmanager.sharding-ring [consul: ] @@ -1878,6 +1882,10 @@ sharding_ring: # CLI flag: -compactor.ring.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -compactor.ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: compactor.ring [consul: ] @@ -2117,6 +2125,10 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -distributor.ha-tracker.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: distributor.ha-tracker [consul: ] @@ -2194,6 +2206,10 @@ ring: # CLI flag: -distributor.ring.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -distributor.ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: distributor.ring [consul: ] @@ -2482,6 +2498,10 @@ lifecycler: # CLI flag: -dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. [consul: ] @@ -3853,6 +3873,10 @@ ring: # CLI flag: -ruler.ring.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -ruler.ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: ruler.ring [consul: ] @@ -4712,6 +4736,10 @@ sharding_ring: # CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time [ttl: | default = 0s] + # Time to refresh local ring with information on dynamodb. + # CLI flag: -store-gateway.sharding-ring.dynamodb.puller-sync-time + [puller_sync_time: | default = 1m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: store-gateway.sharding-ring [consul: ] diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 34797b1516d..51f5020a6f7 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -17,16 +17,19 @@ import ( // Config to create a ConsulClient type Config struct { - Region string `yaml:"region"` - TableName string `yaml:"table_name"` - TTL time.Duration `yaml:"ttl"` + Region string `yaml:"region"` + TableName string `yaml:"table_name"` + TTL time.Duration `yaml:"ttl"` + PullerSyncTime time.Duration `yaml:"puller_sync_time"` } type Client struct { - kv dynamoDbClient - codec codec.Codec - ddbMetrics *dynamodbMetrics - logger log.Logger + kv dynamoDbClient + codec codec.Codec + ddbMetrics *dynamodbMetrics + logger log.Logger + pullerSyncTime time.Duration + backoffConfig backoff.Config staleDataLock sync.RWMutex staleData map[string]staleData @@ -37,22 +40,13 @@ type staleData struct { timestamp time.Time } -var ( - backoffConfig = backoff.Config{ - MinBackoff: 1 * time.Second, - MaxBackoff: 1 * time.Minute, - MaxRetries: 0, - } - - defaultLoopDelay = 1 * time.Minute -) - // RegisterFlags adds the flags required to config this to the given FlagSet // If prefix is not an empty string it should end with a period. func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) { f.StringVar(&cfg.Region, prefix+"dynamodb.region", "", "Region to access dynamodb.") f.StringVar(&cfg.TableName, prefix+"dynamodb.table-name", "", "Table name to use on dynamodb.") f.DurationVar(&cfg.TTL, prefix+"dynamodb.ttl-time", 0, "Time to expire items on dynamodb.") + f.DurationVar(&cfg.PullerSyncTime, prefix+"dynamodb.puller-sync-time", 60*time.Second, "Time to refresh local ring with information on dynamodb.") } func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) { @@ -63,12 +57,20 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh ddbMetrics := newDynamoDbMetrics(registerer) + backoffConfig := backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: cfg.PullerSyncTime, + MaxRetries: 0, + } + c := &Client{ - kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}, - codec: cc, - logger: ddbLog(logger), - ddbMetrics: ddbMetrics, - staleData: make(map[string]staleData), + kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}, + codec: cc, + logger: ddbLog(logger), + ddbMetrics: ddbMetrics, + pullerSyncTime: cfg.PullerSyncTime, + staleData: make(map[string]staleData), + backoffConfig: backoffConfig, } level.Info(c.logger).Log("dynamodb kv initialized") return c, nil @@ -121,7 +123,7 @@ func (c *Client) Delete(ctx context.Context, key string) error { } func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { - bo := backoff.New(ctx, backoffConfig) + bo := backoff.New(ctx, c.backoffConfig) for bo.Ongoing() { resp, _, err := c.kv.Query(ctx, dynamodbKey{primaryKey: key}, false) if err != nil { @@ -190,7 +192,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou } func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { - bo := backoff.New(ctx, backoffConfig) + bo := backoff.New(ctx, c.backoffConfig) for bo.Ongoing() { out, _, err := c.kv.Query(ctx, dynamodbKey{ @@ -199,12 +201,11 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b if err != nil { level.Error(c.logger).Log("msg", "error WatchKey", "key", key, "err", err) - if bo.NumRetries() > 10 { + if bo.NumRetries() >= 10 { if staleData := c.getStaleData(key); staleData != nil { if !f(staleData) { return } - bo.Reset() } } bo.Wait() @@ -226,13 +227,13 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b select { case <-ctx.Done(): return - case <-time.After(defaultLoopDelay): + case <-time.After(c.pullerSyncTime): } } } func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { - bo := backoff.New(ctx, backoffConfig) + bo := backoff.New(ctx, c.backoffConfig) for bo.Ongoing() { out, _, err := c.kv.Query(ctx, dynamodbKey{ @@ -259,7 +260,7 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, select { case <-ctx.Done(): return - case <-time.After(defaultLoopDelay): + case <-time.After(c.pullerSyncTime): } } } diff --git a/pkg/ring/kv/dynamodb/client_test.go b/pkg/ring/kv/dynamodb/client_test.go index 409cdbd28d7..7648b2fa79e 100644 --- a/pkg/ring/kv/dynamodb/client_test.go +++ b/pkg/ring/kv/dynamodb/client_test.go @@ -13,15 +13,25 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ring/kv/codec" + "github.com/cortexproject/cortex/pkg/util/backoff" ) const key = "test" +var ( + defaultBackoff = backoff.Config{ + MinBackoff: 1 * time.Millisecond, + MaxBackoff: 1 * time.Millisecond, + MaxRetries: 0, + } + defaultPullTime = 60 * time.Second +) + func Test_CAS_ErrorNoRetry(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedErr := errors.Errorf("test") ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() @@ -36,12 +46,10 @@ func Test_CAS_ErrorNoRetry(t *testing.T) { } func Test_CAS_Backoff(t *testing.T) { - backoffConfig.MinBackoff = 1 * time.Millisecond - backoffConfig.MaxBackoff = 1 * time.Millisecond ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedErr := errors.Errorf("test") ddbMock.On("Query").Return(map[string][]byte{}, expectedErr).Once() @@ -59,13 +67,15 @@ func Test_CAS_Backoff(t *testing.T) { } func Test_CAS_Failed(t *testing.T) { - backoffConfig.MinBackoff = 1 * time.Millisecond - backoffConfig.MaxBackoff = 1 * time.Millisecond - backoffConfig.MaxRetries = 10 + config := backoff.Config{ + MinBackoff: 1 * time.Millisecond, + MaxBackoff: 1 * time.Millisecond, + MaxRetries: 10, + } ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, config) ddbMock.On("Query").Return(map[string][]byte{}, errors.Errorf("test")) @@ -81,7 +91,7 @@ func Test_CAS_Update(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedUpdatedKeys := []string{"t1", "t2"} expectedUpdated := map[string][]byte{ expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), @@ -112,7 +122,7 @@ func Test_CAS_Delete(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedToDelete := []string{"test", "test2"} expectedBatch := []dynamodbKey{ {primaryKey: key, sortKey: expectedToDelete[0]}, @@ -139,7 +149,7 @@ func Test_CAS_Update_Delete(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) expectedUpdatedKeys := []string{"t1", "t2"} expectedUpdated := map[string][]byte{ expectedUpdatedKeys[0]: []byte(expectedUpdatedKeys[0]), @@ -172,13 +182,10 @@ func Test_CAS_Update_Delete(t *testing.T) { } func Test_WatchKey(t *testing.T) { - backoffConfig.MinBackoff = 1 * time.Millisecond - backoffConfig.MaxBackoff = 1 * time.Millisecond - defaultLoopDelay = 1 * time.Second ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} descMock := &DescMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), 1*time.Second, defaultBackoff) timesCalled := 0 ddbMock.On("Query").Return(map[string][]byte{}, nil) @@ -194,11 +201,9 @@ func Test_WatchKey(t *testing.T) { } func Test_WatchKey_UpdateStale(t *testing.T) { - backoffConfig.MinBackoff = 1 * time.Millisecond - backoffConfig.MaxBackoff = 1 * time.Millisecond ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) staleData := &DescMock{} ddbMock.On("Query").Return(map[string][]byte{}, nil).Once() @@ -223,11 +228,9 @@ func Test_WatchKey_UpdateStale(t *testing.T) { } func Test_WatchPrefix(t *testing.T) { - backoffConfig.MinBackoff = 1 * time.Millisecond - backoffConfig.MaxBackoff = 1 * time.Millisecond ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) data := map[string][]byte{} dataKey := []string{"t1", "t2"} data[dataKey[0]] = []byte(dataKey[0]) @@ -252,7 +255,7 @@ func Test_WatchPrefix(t *testing.T) { func Test_UpdateStaleData(t *testing.T) { ddbMock := NewDynamodbClientMock() codecMock := &CodecMock{} - c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry()) + c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, defaultBackoff) staleData := &DescMock{} timestamp := time.Date(2000, 10, 10, 10, 10, 10, 10, time.UTC) @@ -265,30 +268,31 @@ func Test_UpdateStaleData(t *testing.T) { } // NewClientMock makes a new local dynamodb client. -func NewClientMock(ddbClient dynamoDbClient, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer) *Client { - m := &Client{ - kv: ddbClient, - ddbMetrics: newDynamoDbMetrics(registerer), - codec: cc, - logger: logger, - staleData: make(map[string]staleData), +func NewClientMock(ddbClient dynamoDbClient, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer, time time.Duration, config backoff.Config) *Client { + return &Client{ + kv: ddbClient, + ddbMetrics: newDynamoDbMetrics(registerer), + codec: cc, + logger: logger, + staleData: make(map[string]staleData), + pullerSyncTime: time, + backoffConfig: config, } - return m } -type mockDynamodbClient struct { +type MockDynamodbClient struct { mock.Mock } //revive:disable:unexported-return -func NewDynamodbClientMock() *mockDynamodbClient { - return &mockDynamodbClient{} +func NewDynamodbClientMock() *MockDynamodbClient { + return &MockDynamodbClient{} } //revive:enable:unexported-return -func (m *mockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float64, error) { +func (m *MockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float64, error) { args := m.Called() var err error if args.Get(1) != nil { @@ -296,7 +300,7 @@ func (m *mockDynamodbClient) List(context.Context, dynamodbKey) ([]string, float } return args.Get(0).([]string), 0, err } -func (m *mockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) { +func (m *MockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[string][]byte, float64, error) { args := m.Called() var err error if args.Get(1) != nil { @@ -304,15 +308,15 @@ func (m *mockDynamodbClient) Query(context.Context, dynamodbKey, bool) (map[stri } return args.Get(0).(map[string][]byte), 0, err } -func (m *mockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error { +func (m *MockDynamodbClient) Delete(ctx context.Context, key dynamodbKey) error { m.Called(ctx, key) return nil } -func (m *mockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []byte) error { +func (m *MockDynamodbClient) Put(ctx context.Context, key dynamodbKey, data []byte) error { m.Called(ctx, key, data) return nil } -func (m *mockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { +func (m *MockDynamodbClient) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { m.Called(ctx, put, delete) return nil } From 0901e85d64a93f9540a15cd9afbe06341abceca9 Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Mon, 22 May 2023 22:35:42 -0700 Subject: [PATCH 2/2] Add changelog Signed-off-by: Daniel Deluiggi --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ac212caa38..ecf855e56dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [ENHANCEMENT] Update Go version to 1.20.4. #5299 * [ENHANCEMENT] Log: Avoid expensive log.Valuer evaluation for disallowed levels. #5297 * [ENHANCEMENT] Improving Performance on the API Gzip Handler. #5347 +* [ENHANCEMENT] Dynamodb: Add `puller-sync-time` to allow different pull time for ring. #5357 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293