diff --git a/CHANGELOG.md b/CHANGELOG.md index c26aeb95163..da41e6ebbec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [ENHANCEMENT] Ingester: exposed `-blocks-storage.tsdb.wal-segment-size-bytes` config option to customise the TSDB WAL segment max size. #3476 * [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483 * [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483 +* [ENHANCEMENT] Compactor: wait for a stable ring at startup, when sharding is enabled. #3484 * [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423 * [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422 * [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index f07b1f1c76e..6c71cb42ede 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -38,6 +38,14 @@ The compactor sharding is based on the Cortex [hash ring](../architecture.md#the This feature can be enabled via `-compactor.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-compactor.ring.*` flags (or their respective YAML config options). +### Waiting for stable ring at startup + +In the event of a cluster cold start or scale up of 2+ compactor instances at the same time we may end up in a situation where each new compactor instance starts at a slightly different time and thus each one runs the first compaction based on a different state of the ring. This is not a critical condition, but may be inefficient, because multiple compactor replicas may start compacting the same tenant nearly at the same time. + +To reduce the likelihood this could happen, the compactor waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-compactor.ring.wait-stability-min-duration`. If the ring keep getting changed after `-compactor.ring.wait-stability-max-duration`, the compactor will stop waiting for a stable ring and will proceed starting up normally. + +To disable this waiting logic, you can start the compactor with `-compactor.ring.wait-stability-min-duration=0`. + ## Soft and hard blocks deletion When the compactor successfully compacts some source blocks into a larger block, source blocks are deleted from the storage. Blocks deletion is not immediate, but follows a two steps process: @@ -193,6 +201,15 @@ compactor: # CLI flag: -compactor.ring.heartbeat-timeout [heartbeat_timeout: | default = 1m] + # Minimum time to wait for ring stability at startup. 0 to disable. + # CLI flag: -compactor.ring.wait-stability-min-duration + [wait_stability_min_duration: | default = 1m] + + # Maximum time to wait for ring stability at startup. If the compactor ring + # keep changing after this period of time, the compactor will start anyway. + # CLI flag: -compactor.ring.wait-stability-max-duration + [wait_stability_max_duration: | default = 5m] + # Name of network interface to read address from. # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] diff --git a/docs/blocks-storage/compactor.template b/docs/blocks-storage/compactor.template index 102bb0aebdd..7764d6c8b2c 100644 --- a/docs/blocks-storage/compactor.template +++ b/docs/blocks-storage/compactor.template @@ -38,6 +38,14 @@ The compactor sharding is based on the Cortex [hash ring](../architecture.md#the This feature can be enabled via `-compactor.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-compactor.ring.*` flags (or their respective YAML config options). +### Waiting for stable ring at startup + +In the event of a cluster cold start or scale up of 2+ compactor instances at the same time we may end up in a situation where each new compactor instance starts at a slightly different time and thus each one runs the first compaction based on a different state of the ring. This is not a critical condition, but may be inefficient, because multiple compactor replicas may start compacting the same tenant nearly at the same time. + +To reduce the likelihood this could happen, the compactor waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-compactor.ring.wait-stability-min-duration`. If the ring keep getting changed after `-compactor.ring.wait-stability-max-duration`, the compactor will stop waiting for a stable ring and will proceed starting up normally. + +To disable this waiting logic, you can start the compactor with `-compactor.ring.wait-stability-min-duration=0`. + ## Soft and hard blocks deletion When the compactor successfully compacts some source blocks into a larger block, source blocks are deleted from the storage. Blocks deletion is not immediate, but follows a two steps process: diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b7cbd6c2ddc..a8ffaa2a688 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3944,6 +3944,15 @@ sharding_ring: # CLI flag: -compactor.ring.heartbeat-timeout [heartbeat_timeout: | default = 1m] + # Minimum time to wait for ring stability at startup. 0 to disable. + # CLI flag: -compactor.ring.wait-stability-min-duration + [wait_stability_min_duration: | default = 1m] + + # Maximum time to wait for ring stability at startup. If the compactor ring + # keep changing after this period of time, the compactor will start anyway. + # CLI flag: -compactor.ring.wait-stability-max-duration + [wait_stability_max_duration: | default = 5m] + # Name of network interface to read address from. # CLI flag: -compactor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 89d6c16ef6a..e566fe7f943 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -266,6 +266,22 @@ func (c *Compactor) starting(ctx context.Context) error { return err } level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring") + + // In the event of a cluster cold start or scale up of 2+ compactor instances at the same + // time, we may end up in a situation where each new compactor instance starts at a slightly + // different time and thus each one starts with on a different state of the ring. It's better + // to just wait the ring stability for a short time. + if c.compactorCfg.ShardingRing.WaitStabilityMinDuration > 0 { + minWaiting := c.compactorCfg.ShardingRing.WaitStabilityMinDuration + maxWaiting := c.compactorCfg.ShardingRing.WaitStabilityMaxDuration + + level.Info(c.logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String()) + if err := ring.WaitRingStability(ctx, c.ring, ring.Compactor, minWaiting, maxWaiting); err != nil { + level.Warn(c.logger).Log("msg", "compactor is ring topology is not stable after the max waiting time, proceeding anyway") + } else { + level.Info(c.logger).Log("msg", "compactor is ring topology is stable") + } + } } // Create the blocks cleaner (service). diff --git a/pkg/compactor/compactor_ring.go b/pkg/compactor/compactor_ring.go index 77f8bde3d53..3c2fa0d418d 100644 --- a/pkg/compactor/compactor_ring.go +++ b/pkg/compactor/compactor_ring.go @@ -22,6 +22,10 @@ type RingConfig struct { HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + // Wait ring stability. + WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"` + WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"` + // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` @@ -45,6 +49,10 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.") f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring.") + // Wait stability flags. + f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.") + f.DurationVar(&cfg.WaitStabilityMaxDuration, "compactor.ring.wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the compactor ring keep changing after this period of time, the compactor will start anyway.") + // Instance flags cfg.InstanceInterfaceNames = []string{"eth0", "en0"} f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "compactor.ring.instance-interface-names", "Name of network interface to read address from.") diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index d4090bcaa45..66584dda5c9 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -679,6 +679,8 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM cfg.ShardingEnabled = true cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i) cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + cfg.ShardingRing.WaitStabilityMinDuration = 3 * time.Second + cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second cfg.ShardingRing.KVStore.Mock = kvstore c, tsdbCompactor, l, _, cleanup := prepare(t, cfg, bucketClient) @@ -700,26 +702,6 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) } - // Wait until each compactor sees all ACTIVE compactors in the ring - for _, c := range compactors { - cortex_testutil.Poll(t, 10*time.Second, len(compactors), func() interface{} { - // it is safe to access c.ring here, since we know that all compactors are Running now - rs, err := c.ring.GetAllHealthy(ring.Compactor) - if err != nil { - return 0 - } - - numActive := 0 - for _, i := range rs.Ingesters { - if i.GetState() == ring.ACTIVE { - numActive++ - } - } - - return numActive - }) - } - // Wait until a run has been completed on each compactor for _, c := range compactors { cortex_testutil.Poll(t, 10*time.Second, 1.0, func() interface{} { @@ -855,6 +837,10 @@ func prepareConfig() Config { compactorCfg.retryMinBackoff = 0 compactorCfg.retryMaxBackoff = 0 + // Do not wait for ring stability by default, in order to speed up tests. + compactorCfg.ShardingRing.WaitStabilityMinDuration = 0 + compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0 + return compactorCfg } diff --git a/pkg/ring/util.go b/pkg/ring/util.go index c234d05b9ce..6f28988eeda 100644 --- a/pkg/ring/util.go +++ b/pkg/ring/util.go @@ -82,6 +82,40 @@ func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state In return backoff.Err() } +// WaitRingStability monitors the ring topology for the provided operation and waits until it +// keeps stable for at least minStability. +func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error { + // Configure the max waiting time as a context deadline. + ctx, cancel := context.WithTimeout(ctx, maxWaiting) + defer cancel() + + // Get the initial ring state. + ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck + ringLastStateTs := time.Now() + + const pollingFrequency = time.Second + pollingTicker := time.NewTicker(pollingFrequency) + defer pollingTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-pollingTicker.C: + // We ignore the error because in case of error it will return an empty + // replication set which we use to compare with the previous state. + currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck + + if HasReplicationSetChanged(ringLastState, currRingState) { + ringLastState = currRingState + ringLastStateTs = time.Now() + } else if time.Since(ringLastStateTs) >= minStability { + return nil + } + } + } +} + // getZones return the list zones from the provided tokens. The returned list // is guaranteed to be sorted. func getZones(tokens map[string][]TokenDesc) []string { diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index d5175af8bd4..05c871f379a 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -1,7 +1,13 @@ package ring import ( + "context" + "fmt" "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestGenerateTokens(t *testing.T) { @@ -34,3 +40,141 @@ func TestGenerateTokensIgnoresOldTokens(t *testing.T) { } } } + +func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t *testing.T) { + t.Parallel() + + const ( + minStability = 2 * time.Second + maxWaiting = 10 * time.Second + ) + + // Init the ring. + ringDesc := &Desc{Ingesters: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: time.Now().Unix()}, + "instance-2": {Addr: "127.0.0.2", State: PENDING, Timestamp: time.Now().Unix()}, + "instance-3": {Addr: "127.0.0.3", State: JOINING, Timestamp: time.Now().Unix()}, + "instance-4": {Addr: "127.0.0.4", State: LEAVING, Timestamp: time.Now().Unix()}, + "instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: time.Now().Unix()}, + }} + + ring := &Ring{ + cfg: Config{HeartbeatTimeout: time.Minute}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + startTime := time.Now() + require.NoError(t, WaitRingStability(context.Background(), ring, Reporting, minStability, maxWaiting)) + elapsedTime := time.Since(startTime) + + assert.InDelta(t, minStability, elapsedTime, float64(2*time.Second)) +} + +func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing.T) { + t.Parallel() + + const ( + minStability = 3 * time.Second + addInstanceAfter = 2 * time.Second + maxWaiting = 10 * time.Second + ) + + // Init the ring. + ringDesc := &Desc{Ingesters: map[string]IngesterDesc{ + "instance-1": {Addr: "instance-1", State: ACTIVE, Timestamp: time.Now().Unix()}, + "instance-2": {Addr: "instance-2", State: PENDING, Timestamp: time.Now().Unix()}, + "instance-3": {Addr: "instance-3", State: JOINING, Timestamp: time.Now().Unix()}, + "instance-4": {Addr: "instance-4", State: LEAVING, Timestamp: time.Now().Unix()}, + "instance-5": {Addr: "instance-5", State: ACTIVE, Timestamp: time.Now().Unix()}, + }} + + ring := &Ring{ + cfg: Config{HeartbeatTimeout: time.Minute}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + // Add 1 new instance after some time. + go func() { + time.Sleep(addInstanceAfter) + + ring.mtx.Lock() + defer ring.mtx.Unlock() + + instanceID := fmt.Sprintf("instance-%d", len(ringDesc.Ingesters)+1) + ringDesc.Ingesters[instanceID] = IngesterDesc{Addr: instanceID, State: ACTIVE, Timestamp: time.Now().Unix()} + ring.ringDesc = ringDesc + ring.ringTokens = ringDesc.getTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + }() + + startTime := time.Now() + require.NoError(t, WaitRingStability(context.Background(), ring, Reporting, minStability, maxWaiting)) + elapsedTime := time.Since(startTime) + + assert.InDelta(t, minStability+addInstanceAfter, elapsedTime, float64(2*time.Second)) +} + +func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { + t.Parallel() + + const ( + minStability = 2 * time.Second + maxWaiting = 7 * time.Second + ) + + // Init the ring. + ringDesc := &Desc{Ingesters: map[string]IngesterDesc{ + "instance-1": {Addr: "instance-1", State: ACTIVE, Timestamp: time.Now().Unix()}, + "instance-2": {Addr: "instance-2", State: PENDING, Timestamp: time.Now().Unix()}, + "instance-3": {Addr: "instance-3", State: JOINING, Timestamp: time.Now().Unix()}, + "instance-4": {Addr: "instance-4", State: LEAVING, Timestamp: time.Now().Unix()}, + "instance-5": {Addr: "instance-5", State: ACTIVE, Timestamp: time.Now().Unix()}, + }} + + ring := &Ring{ + cfg: Config{HeartbeatTimeout: time.Minute}, + ringDesc: ringDesc, + ringTokens: ringDesc.getTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + // Keep changing the ring. + done := make(chan struct{}) + defer close(done) + go func() { + for { + select { + case <-done: + return + case <-time.After(time.Second): + ring.mtx.Lock() + + instanceID := fmt.Sprintf("instance-%d", len(ringDesc.Ingesters)+1) + ringDesc.Ingesters[instanceID] = IngesterDesc{Addr: instanceID, State: ACTIVE, Timestamp: time.Now().Unix()} + ring.ringDesc = ringDesc + ring.ringTokens = ringDesc.getTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + + ring.mtx.Unlock() + } + } + }() + + startTime := time.Now() + require.Equal(t, context.DeadlineExceeded, WaitRingStability(context.Background(), ring, Reporting, minStability, maxWaiting)) + elapsedTime := time.Since(startTime) + + assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second)) +}