Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [ENHANCEMENT] Update Go version to 1.20.4. #5299
* [ENHANCEMENT] Log: Avoid expensive log.Valuer evaluation for disallowed levels. #5297
* [ENHANCEMENT] Improving Performance on the API Gzip Handler. #5347
* [ENHANCEMENT] Dynamodb: Add `puller-sync-time` to allow different pull time for ring. #5357
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ compactor:
# CLI flag: -compactor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -compactor.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is:
# store-gateway.sharding-ring
Expand Down
28 changes: 28 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ sharding_ring:
# CLI flag: -alertmanager.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -alertmanager.sharding-ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
[consul: <consul_config>]
Expand Down Expand Up @@ -1878,6 +1882,10 @@ sharding_ring:
# CLI flag: -compactor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -compactor.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2117,6 +2125,10 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -distributor.ha-tracker.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ha-tracker
[consul: <consul_config>]
Expand Down Expand Up @@ -2194,6 +2206,10 @@ ring:
# CLI flag: -distributor.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -distributor.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2482,6 +2498,10 @@ lifecycler:
# CLI flag: -dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
[consul: <consul_config>]

Expand Down Expand Up @@ -3853,6 +3873,10 @@ ring:
# CLI flag: -ruler.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -ruler.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: ruler.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -4712,6 +4736,10 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -store-gateway.sharding-ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: store-gateway.sharding-ring
[consul: <consul_config>]
Expand Down
59 changes: 30 additions & 29 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ import (

// Config to create a ConsulClient
type Config struct {
Region string `yaml:"region"`
TableName string `yaml:"table_name"`
TTL time.Duration `yaml:"ttl"`
Region string `yaml:"region"`
TableName string `yaml:"table_name"`
TTL time.Duration `yaml:"ttl"`
PullerSyncTime time.Duration `yaml:"puller_sync_time"`
}

type Client struct {
kv dynamoDbClient
codec codec.Codec
ddbMetrics *dynamodbMetrics
logger log.Logger
kv dynamoDbClient
codec codec.Codec
ddbMetrics *dynamodbMetrics
logger log.Logger
pullerSyncTime time.Duration
backoffConfig backoff.Config

staleDataLock sync.RWMutex
staleData map[string]staleData
Expand All @@ -37,22 +40,13 @@ type staleData struct {
timestamp time.Time
}

var (
backoffConfig = backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 1 * time.Minute,
MaxRetries: 0,
}

defaultLoopDelay = 1 * time.Minute
)

// RegisterFlags adds the flags required to config this to the given FlagSet
// If prefix is not an empty string it should end with a period.
func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Region, prefix+"dynamodb.region", "", "Region to access dynamodb.")
f.StringVar(&cfg.TableName, prefix+"dynamodb.table-name", "", "Table name to use on dynamodb.")
f.DurationVar(&cfg.TTL, prefix+"dynamodb.ttl-time", 0, "Time to expire items on dynamodb.")
f.DurationVar(&cfg.PullerSyncTime, prefix+"dynamodb.puller-sync-time", 60*time.Second, "Time to refresh local ring with information on dynamodb.")
}

func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) {
Expand All @@ -63,12 +57,20 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh

ddbMetrics := newDynamoDbMetrics(registerer)

backoffConfig := backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: cfg.PullerSyncTime,
MaxRetries: 0,
}

c := &Client{
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
codec: cc,
logger: ddbLog(logger),
ddbMetrics: ddbMetrics,
staleData: make(map[string]staleData),
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
codec: cc,
logger: ddbLog(logger),
ddbMetrics: ddbMetrics,
pullerSyncTime: cfg.PullerSyncTime,
staleData: make(map[string]staleData),
backoffConfig: backoffConfig,
}
level.Info(c.logger).Log("dynamodb kv initialized")
return c, nil
Expand Down Expand Up @@ -121,7 +123,7 @@ func (c *Client) Delete(ctx context.Context, key string) error {
}

func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
bo := backoff.New(ctx, backoffConfig)
bo := backoff.New(ctx, c.backoffConfig)
for bo.Ongoing() {
resp, _, err := c.kv.Query(ctx, dynamodbKey{primaryKey: key}, false)
if err != nil {
Expand Down Expand Up @@ -190,7 +192,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
}

func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
bo := backoff.New(ctx, backoffConfig)
bo := backoff.New(ctx, c.backoffConfig)

for bo.Ongoing() {
out, _, err := c.kv.Query(ctx, dynamodbKey{
Expand All @@ -199,12 +201,11 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
if err != nil {
level.Error(c.logger).Log("msg", "error WatchKey", "key", key, "err", err)

if bo.NumRetries() > 10 {
if bo.NumRetries() >= 10 {
if staleData := c.getStaleData(key); staleData != nil {
if !f(staleData) {
return
}
bo.Reset()
}
}
bo.Wait()
Expand All @@ -226,13 +227,13 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
select {
case <-ctx.Done():
return
case <-time.After(defaultLoopDelay):
case <-time.After(c.pullerSyncTime):
}
}
}

func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
bo := backoff.New(ctx, backoffConfig)
bo := backoff.New(ctx, c.backoffConfig)

for bo.Ongoing() {
out, _, err := c.kv.Query(ctx, dynamodbKey{
Expand All @@ -259,7 +260,7 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
select {
case <-ctx.Done():
return
case <-time.After(defaultLoopDelay):
case <-time.After(c.pullerSyncTime):
}
}
}
Expand Down
Loading