Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [ENHANCEMENT] Ingester: exposed `-blocks-storage.tsdb.wal-segment-size-bytes` config option to customise the TSDB WAL segment max size. #3476
* [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483
* [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483
* [ENHANCEMENT] Compactor: wait for a stable ring at startup, when sharding is enabled. #3484
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
Expand Down
17 changes: 17 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ The compactor sharding is based on the Cortex [hash ring](../architecture.md#the

This feature can be enabled via `-compactor.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-compactor.ring.*` flags (or their respective YAML config options).

### Waiting for stable ring at startup

In the event of a cluster cold start or scale up of 2+ compactor instances at the same time we may end up in a situation where each new compactor instance starts at a slightly different time and thus each one runs the first compaction based on a different state of the ring. This is not a critical condition, but may be inefficient, because multiple compactor replicas may start compacting the same tenant nearly at the same time.

To reduce the likelihood this could happen, the compactor waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-compactor.ring.wait-stability-min-duration`. If the ring keep getting changed after `-compactor.ring.wait-stability-max-duration`, the compactor will stop waiting for a stable ring and will proceed starting up normally.

To disable this waiting logic, you can start the compactor with `-compactor.ring.wait-stability-min-duration=0`.

## Soft and hard blocks deletion

When the compactor successfully compacts some source blocks into a larger block, source blocks are deleted from the storage. Blocks deletion is not immediate, but follows a two steps process:
Expand Down Expand Up @@ -193,6 +201,15 @@ compactor:
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Minimum time to wait for ring stability at startup. 0 to disable.
# CLI flag: -compactor.ring.wait-stability-min-duration
[wait_stability_min_duration: <duration> | default = 1m]

# Maximum time to wait for ring stability at startup. If the compactor ring
# keep changing after this period of time, the compactor will start anyway.
# CLI flag: -compactor.ring.wait-stability-max-duration
[wait_stability_max_duration: <duration> | default = 5m]

# Name of network interface to read address from.
# CLI flag: -compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
Expand Down
8 changes: 8 additions & 0 deletions docs/blocks-storage/compactor.template
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ The compactor sharding is based on the Cortex [hash ring](../architecture.md#the

This feature can be enabled via `-compactor.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-compactor.ring.*` flags (or their respective YAML config options).

### Waiting for stable ring at startup

In the event of a cluster cold start or scale up of 2+ compactor instances at the same time we may end up in a situation where each new compactor instance starts at a slightly different time and thus each one runs the first compaction based on a different state of the ring. This is not a critical condition, but may be inefficient, because multiple compactor replicas may start compacting the same tenant nearly at the same time.

To reduce the likelihood this could happen, the compactor waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-compactor.ring.wait-stability-min-duration`. If the ring keep getting changed after `-compactor.ring.wait-stability-max-duration`, the compactor will stop waiting for a stable ring and will proceed starting up normally.

To disable this waiting logic, you can start the compactor with `-compactor.ring.wait-stability-min-duration=0`.

## Soft and hard blocks deletion

When the compactor successfully compacts some source blocks into a larger block, source blocks are deleted from the storage. Blocks deletion is not immediate, but follows a two steps process:
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3944,6 +3944,15 @@ sharding_ring:
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Minimum time to wait for ring stability at startup. 0 to disable.
# CLI flag: -compactor.ring.wait-stability-min-duration
[wait_stability_min_duration: <duration> | default = 1m]

# Maximum time to wait for ring stability at startup. If the compactor ring
# keep changing after this period of time, the compactor will start anyway.
# CLI flag: -compactor.ring.wait-stability-max-duration
[wait_stability_max_duration: <duration> | default = 5m]

# Name of network interface to read address from.
# CLI flag: -compactor.ring.instance-interface-names
[instance_interface_names: <list of string> | default = [eth0 en0]]
Expand Down
16 changes: 16 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,22 @@ func (c *Compactor) starting(ctx context.Context) error {
return err
}
level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring")

// In the event of a cluster cold start or scale up of 2+ compactor instances at the same
// time, we may end up in a situation where each new compactor instance starts at a slightly
// different time and thus each one starts with on a different state of the ring. It's better
// to just wait the ring stability for a short time.
if c.compactorCfg.ShardingRing.WaitStabilityMinDuration > 0 {
minWaiting := c.compactorCfg.ShardingRing.WaitStabilityMinDuration
maxWaiting := c.compactorCfg.ShardingRing.WaitStabilityMaxDuration

level.Info(c.logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
if err := ring.WaitRingStability(ctx, c.ring, ring.Compactor, minWaiting, maxWaiting); err != nil {
level.Warn(c.logger).Log("msg", "compactor is ring topology is not stable after the max waiting time, proceeding anyway")
} else {
level.Info(c.logger).Log("msg", "compactor is ring topology is stable")
}
}
}

// Create the blocks cleaner (service).
Expand Down
8 changes: 8 additions & 0 deletions pkg/compactor/compactor_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type RingConfig struct {
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

// Wait ring stability.
WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"`

// Instance details
InstanceID string `yaml:"instance_id" doc:"hidden"`
InstanceInterfaceNames []string `yaml:"instance_interface_names"`
Expand All @@ -45,6 +49,10 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.HeartbeatPeriod, "compactor.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring.")
f.DurationVar(&cfg.HeartbeatTimeout, "compactor.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring.")

// Wait stability flags.
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")
f.DurationVar(&cfg.WaitStabilityMaxDuration, "compactor.ring.wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the compactor ring keep changing after this period of time, the compactor will start anyway.")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "compactor.ring.instance-interface-names", "Name of network interface to read address from.")
Expand Down
26 changes: 6 additions & 20 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,8 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i)
cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i)
cfg.ShardingRing.WaitStabilityMinDuration = 3 * time.Second
cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

c, tsdbCompactor, l, _, cleanup := prepare(t, cfg, bucketClient)
Expand All @@ -700,26 +702,6 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
}

// Wait until each compactor sees all ACTIVE compactors in the ring
for _, c := range compactors {
cortex_testutil.Poll(t, 10*time.Second, len(compactors), func() interface{} {
// it is safe to access c.ring here, since we know that all compactors are Running now
rs, err := c.ring.GetAllHealthy(ring.Compactor)
if err != nil {
return 0
}

numActive := 0
for _, i := range rs.Ingesters {
if i.GetState() == ring.ACTIVE {
numActive++
}
}

return numActive
})
}

// Wait until a run has been completed on each compactor
for _, c := range compactors {
cortex_testutil.Poll(t, 10*time.Second, 1.0, func() interface{} {
Expand Down Expand Up @@ -855,6 +837,10 @@ func prepareConfig() Config {
compactorCfg.retryMinBackoff = 0
compactorCfg.retryMaxBackoff = 0

// Do not wait for ring stability by default, in order to speed up tests.
compactorCfg.ShardingRing.WaitStabilityMinDuration = 0
compactorCfg.ShardingRing.WaitStabilityMaxDuration = 0

return compactorCfg
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,40 @@ func WaitInstanceState(ctx context.Context, r *Ring, instanceID string, state In
return backoff.Err()
}

// WaitRingStability monitors the ring topology for the provided operation and waits until it
// keeps stable for at least minStability.
func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error {
// Configure the max waiting time as a context deadline.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest removing maxWaiting and rely on context only. Client calling this can already setup context deadline, if needed. The code right now returns context.DeadlineExceeded when maxWaiting is reached, which client may confuse with its own context deadline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same, but using it from the caller perspective is more annoying, because it's not something you can do inline. You need to get a new context, defer the cancel(), call the WaitRingStability().

ctx, cancel := context.WithTimeout(ctx, maxWaiting)
defer cancel()

// Get the initial ring state.
ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck
ringLastStateTs := time.Now()

const pollingFrequency = time.Second
pollingTicker := time.NewTicker(pollingFrequency)
defer pollingTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-pollingTicker.C:
// We ignore the error because in case of error it will return an empty
// replication set which we use to compare with the previous state.
currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck

if HasReplicationSetChanged(ringLastState, currRingState) {
ringLastState = currRingState
ringLastStateTs = time.Now()
} else if time.Since(ringLastStateTs) >= minStability {
return nil
}
}
}
}

// getZones return the list zones from the provided tokens. The returned list
// is guaranteed to be sorted.
func getZones(tokens map[string][]TokenDesc) []string {
Expand Down
144 changes: 144 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package ring

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGenerateTokens(t *testing.T) {
Expand Down Expand Up @@ -34,3 +40,141 @@ func TestGenerateTokensIgnoresOldTokens(t *testing.T) {
}
}
}

func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t *testing.T) {
t.Parallel()

const (
minStability = 2 * time.Second
maxWaiting = 10 * time.Second
)

// Init the ring.
ringDesc := &Desc{Ingesters: map[string]IngesterDesc{
"instance-1": {Addr: "127.0.0.1", State: ACTIVE, Timestamp: time.Now().Unix()},
"instance-2": {Addr: "127.0.0.2", State: PENDING, Timestamp: time.Now().Unix()},
"instance-3": {Addr: "127.0.0.3", State: JOINING, Timestamp: time.Now().Unix()},
"instance-4": {Addr: "127.0.0.4", State: LEAVING, Timestamp: time.Now().Unix()},
"instance-5": {Addr: "127.0.0.5", State: ACTIVE, Timestamp: time.Now().Unix()},
}}

ring := &Ring{
cfg: Config{HeartbeatTimeout: time.Minute},
ringDesc: ringDesc,
ringTokens: ringDesc.getTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: &DefaultReplicationStrategy{},
}

startTime := time.Now()
require.NoError(t, WaitRingStability(context.Background(), ring, Reporting, minStability, maxWaiting))
elapsedTime := time.Since(startTime)

assert.InDelta(t, minStability, elapsedTime, float64(2*time.Second))
}

func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing.T) {
t.Parallel()

const (
minStability = 3 * time.Second
addInstanceAfter = 2 * time.Second
maxWaiting = 10 * time.Second
)

// Init the ring.
ringDesc := &Desc{Ingesters: map[string]IngesterDesc{
"instance-1": {Addr: "instance-1", State: ACTIVE, Timestamp: time.Now().Unix()},
"instance-2": {Addr: "instance-2", State: PENDING, Timestamp: time.Now().Unix()},
"instance-3": {Addr: "instance-3", State: JOINING, Timestamp: time.Now().Unix()},
"instance-4": {Addr: "instance-4", State: LEAVING, Timestamp: time.Now().Unix()},
"instance-5": {Addr: "instance-5", State: ACTIVE, Timestamp: time.Now().Unix()},
}}

ring := &Ring{
cfg: Config{HeartbeatTimeout: time.Minute},
ringDesc: ringDesc,
ringTokens: ringDesc.getTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: &DefaultReplicationStrategy{},
}

// Add 1 new instance after some time.
go func() {
time.Sleep(addInstanceAfter)

ring.mtx.Lock()
defer ring.mtx.Unlock()

instanceID := fmt.Sprintf("instance-%d", len(ringDesc.Ingesters)+1)
ringDesc.Ingesters[instanceID] = IngesterDesc{Addr: instanceID, State: ACTIVE, Timestamp: time.Now().Unix()}
ring.ringDesc = ringDesc
ring.ringTokens = ringDesc.getTokens()
ring.ringTokensByZone = ringDesc.getTokensByZone()
ring.ringZones = getZones(ringDesc.getTokensByZone())
}()

startTime := time.Now()
require.NoError(t, WaitRingStability(context.Background(), ring, Reporting, minStability, maxWaiting))
elapsedTime := time.Since(startTime)

assert.InDelta(t, minStability+addInstanceAfter, elapsedTime, float64(2*time.Second))
}

func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) {
t.Parallel()

const (
minStability = 2 * time.Second
maxWaiting = 7 * time.Second
)

// Init the ring.
ringDesc := &Desc{Ingesters: map[string]IngesterDesc{
"instance-1": {Addr: "instance-1", State: ACTIVE, Timestamp: time.Now().Unix()},
"instance-2": {Addr: "instance-2", State: PENDING, Timestamp: time.Now().Unix()},
"instance-3": {Addr: "instance-3", State: JOINING, Timestamp: time.Now().Unix()},
"instance-4": {Addr: "instance-4", State: LEAVING, Timestamp: time.Now().Unix()},
"instance-5": {Addr: "instance-5", State: ACTIVE, Timestamp: time.Now().Unix()},
}}

ring := &Ring{
cfg: Config{HeartbeatTimeout: time.Minute},
ringDesc: ringDesc,
ringTokens: ringDesc.getTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: &DefaultReplicationStrategy{},
}

// Keep changing the ring.
done := make(chan struct{})
defer close(done)
go func() {
for {
select {
case <-done:
return
case <-time.After(time.Second):
ring.mtx.Lock()

instanceID := fmt.Sprintf("instance-%d", len(ringDesc.Ingesters)+1)
ringDesc.Ingesters[instanceID] = IngesterDesc{Addr: instanceID, State: ACTIVE, Timestamp: time.Now().Unix()}
ring.ringDesc = ringDesc
ring.ringTokens = ringDesc.getTokens()
ring.ringTokensByZone = ringDesc.getTokensByZone()
ring.ringZones = getZones(ringDesc.getTokensByZone())

ring.mtx.Unlock()
}
}
}()

startTime := time.Now()
require.Equal(t, context.DeadlineExceeded, WaitRingStability(context.Background(), ring, Reporting, minStability, maxWaiting))
elapsedTime := time.Since(startTime)

assert.InDelta(t, maxWaiting, elapsedTime, float64(2*time.Second))
}