Skip to content
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
741f9b2
Add unit tests
MichelHollands Oct 26, 2020
91917fd
Add lock to avoid race condition in test
MichelHollands Oct 26, 2020
ba3d860
Add zone aware to GetAll
MichelHollands Oct 28, 2020
90d9013
Remove debug print statements
MichelHollands Oct 28, 2020
440d0fc
Add changelog entry
MichelHollands Oct 28, 2020
4362f5d
Fix comment
MichelHollands Oct 28, 2020
d9ab63a
Address replication set review comments
MichelHollands Nov 2, 2020
f752ce3
Reword and change changelong entry to enhancement
MichelHollands Nov 2, 2020
ff8ebcc
Address review comments in ring code
MichelHollands Nov 2, 2020
6e217ef
Do not return early and add more test cases
MichelHollands Nov 3, 2020
9cf6a92
Rename ingesters to instances
MichelHollands Nov 3, 2020
4b8e908
Add one more test
MichelHollands Nov 3, 2020
c4d560d
Update pkg/ring/replication_set.go
MichelHollands Nov 4, 2020
3531844
Update pkg/ring/replication_set.go : add sign off
MichelHollands Nov 4, 2020
6edb2b4
Add integration test
MichelHollands Nov 4, 2020
e04db17
Fix imports as per goimports
MichelHollands Nov 4, 2020
fa1c3b5
Address review comments and add extra tests
MichelHollands Nov 4, 2020
c0d7be0
Fix rebase
MichelHollands Nov 4, 2020
9ffa08d
Fix rebase in test
MichelHollands Nov 4, 2020
e10842f
Add lock around mockIngester call
MichelHollands Nov 5, 2020
4385980
Add lock around mockIngester call at correct place
MichelHollands Nov 5, 2020
be94fff
Handle nr of zones > replication factor
MichelHollands Nov 5, 2020
7f08749
Use util.Min instead of if statement
MichelHollands Nov 5, 2020
847dd71
Update pkg/ring/replication_set_test.go
MichelHollands Nov 5, 2020
7886450
Use atomic and sets
MichelHollands Nov 5, 2020
ccdb908
Fixed integration test and ReplicationSet.Do()
pracucci Nov 6, 2020
1259ed9
Added tracker unit tests
pracucci Nov 6, 2020
3facb35
Fixed TestReplicationSet_Do
pracucci Nov 6, 2020
c788f17
Commented ReplicationSet max errors and max unavailable zones
pracucci Nov 6, 2020
833eb89
Fixed GetReplicationSetForOperation() logic and improved unit tests
pracucci Nov 6, 2020
3351321
Improved tests
pracucci Nov 6, 2020
a87411b
Fixed tests flakyness
pracucci Nov 6, 2020
9902101
Fixed test
pracucci Nov 6, 2020
c7d39e9
Update documentation
MichelHollands Nov 9, 2020
75a5dc6
Add note about reads from zone aware clusters
MichelHollands Nov 9, 2020
2647294
Remove extra space
MichelHollands Nov 9, 2020
ee08cab
Address some of Peter's review comment
MichelHollands Nov 9, 2020
dd18b80
Add special case for rf=2
MichelHollands Nov 9, 2020
97ea59b
Address review comments
MichelHollands Nov 9, 2020
474168f
Fix comment
MichelHollands Nov 9, 2020
1e50276
Update docs with review comments
MichelHollands Nov 10, 2020
af5ff2a
Set maxUnavailableZones and change tests
MichelHollands Nov 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [CHANGE] Querier: deprecated `-store.max-look-back-period`. You should use `-querier.max-query-lookback` instead. #3452
* [ENHANCEMENT] Added zone-awareness support on queries. When zone-awareness is enabled, queries will still succeed if all ingesters in a single zone will fail. #3414
* [ENHANCEMENT] Blocks storage ingester: exported more TSDB-related metrics. #3412
- `cortex_ingester_tsdb_wal_corruptions_total`
- `cortex_ingester_tsdb_head_truncations_failed_total`
Expand Down
4 changes: 4 additions & 0 deletions docs/guides/zone-replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ It is completely possible that all the replicas for the given data are held with

For this reason, Cortex optionally supports zone-aware replication. When zone-aware replication is **enabled**, replicas for the given data are guaranteed to span across different availability zones. This requires Cortex cluster to run at least in a number of zones equal to the configured replication factor.

Reads from a zone-aware replication enabled Cortex Cluster can withstand zone failures as long as there are more than replication factor / 2 zones available without any failing instances.

The Cortex services supporting **zone-aware replication** are:

- **[Distributors and Ingesters](#distributors-and-ingesters-time-series-replication)**
Expand All @@ -26,6 +28,8 @@ The Cortex time-series replication is used to hold multiple (typically 3) replic
2. Rollout ingesters to apply the configured zone
3. Enable time-series zone-aware replication via the `-distributor.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to distributors, queriers and rulers.

The `-distributor.shard-by-all-labels` setting has an impact on zone replication. When set to `true` there will be more series stored that will be spread out over more instances. When an instance goes down fewer series will be impacted.

## Store-gateways: blocks replication

The Cortex [store-gateway](../blocks-storage/store-gateway.md) (used only when Cortex is running with the [blocks storage](../blocks-storage/_index.md)) supports blocks sharding, used to horizontally scale blocks in a large cluster without hitting any vertical scalability limit.
Expand Down
7 changes: 6 additions & 1 deletion integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,15 @@ func (s *ConcreteService) Kill() error {

logger.Log("Killing", s.name)

if out, err := RunCommandAndGetOutput("docker", "stop", "--time=0", s.containerName()); err != nil {
if out, err := RunCommandAndGetOutput("docker", "kill", s.containerName()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changing from stop to kill?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Kill() function, used by end-to-end tests, is expected to send a SIGKILL. We were using docker stop --time=0, which is expected to send a SIGTERM and then a SIGKILL after time 0, but we wondered if there could be some timing issues and a graceful shutdown of the stopped process could actually happen (or at least start). To make it more obvious we're going to send a SIGKILL, we decided to just use docker kill.

logger.Log(string(out))
return err
}

// Wait until the container actually stopped. However, this could fail if
// the container already exited, so we just ignore the error.
_, _ = RunCommandAndGetOutput("docker", "wait", s.containerName())

s.usedNetworkName = ""

return nil
Expand Down
1 change: 0 additions & 1 deletion integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) {
}

func (c *Client) query(addr string) (*http.Response, []byte, error) {

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

Expand Down
150 changes: 150 additions & 0 deletions integration/zone_aware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package integration

import (
"context"
"fmt"
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestZoneAwareReplication(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.replication-factor"] = "3"
flags["-distributor.zone-awareness-enabled"] = "true"

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
ingesterFlags := func(zone string) map[string]string {
return mergeFlags(flags, map[string]string{
"-ingester.availability-zone": zone,
})
}

ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "")
ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "")
ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "")
require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6))

distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, querier))

// Wait until distributor and querier have updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

// Push some series
now := time.Now()
numSeries := 100
expectedVectors := map[string]model.Vector{}

for i := 1; i <= numSeries; i++ {
metricName := fmt.Sprintf("series_%d", i)
series, expectedVector := generateSeries(metricName, now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

expectedVectors[metricName] = expectedVector
}

// Query back series => all good
for metricName, expectedVector := range expectedVectors {
result, err := client.Query(metricName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

// SIGKILL 1 ingester in 1st zone
require.NoError(t, ingester1.Kill())

// Push 1 more series => all good
numSeries++
metricName := fmt.Sprintf("series_%d", numSeries)
series, expectedVector := generateSeries(metricName, now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

expectedVectors[metricName] = expectedVector

// Query back series => all good
for metricName, expectedVector := range expectedVectors {
result, err := client.Query(metricName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

// SIGKILL 1 more ingester in the 1st zone (all ingesters in 1st zone have been killed)
require.NoError(t, ingester2.Kill())

// Push 1 more series => all good
numSeries++
metricName = fmt.Sprintf("series_%d", numSeries)
series, expectedVector = generateSeries(metricName, now)
res, err = client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

expectedVectors[metricName] = expectedVector

// Query back series => all good
for metricName, expectedVector := range expectedVectors {
result, err := client.Query(metricName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}

// SIGKILL 1 ingester in the 2nd zone
require.NoError(t, ingester3.Kill())

// Query back any series => fail (either because of a timeout or 500)
result, _, err := client.QueryRaw("series_1")
if !errors.Is(err, context.DeadlineExceeded) {
require.NoError(t, err)
require.Equal(t, 500, result.StatusCode)
}

// SIGKILL 1 more ingester in the 2nd zone (all ingesters in 2nd zone have been killed)
require.NoError(t, ingester4.Kill())

// Push 1 more series => fail
series, _ = generateSeries("series_last", now)
res, err = client.Push(series)
require.NoError(t, err)
require.Equal(t, 500, res.StatusCode)

}
3 changes: 3 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,9 @@ func (i *mockIngester) series() map[uint32]*client.PreallocTimeseries {
}

func (i *mockIngester) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
i.Lock()
defer i.Unlock()

i.trackCall("Check")

return &grpc_health_v1.HealthCheckResponse{}, nil
Expand Down
72 changes: 46 additions & 26 deletions pkg/ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,45 @@ import (
// many errors to tolerate.
type ReplicationSet struct {
Ingesters []IngesterDesc

// Maximum number of tolerated failing instances. Max errors and max unavailable zones are
// mutually exclusive.
MaxErrors int

// Maximum number of different zones in which instances can fail. Max unavailable zones and
// max errors are mutually exclusive.
MaxUnavailableZones int
}

// Do function f in parallel for all replicas in the set, erroring is we exceed
// MaxErrors and returning early otherwise.
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *IngesterDesc) (interface{}, error)) ([]interface{}, error) {
type instanceResult struct {
res interface{}
err error
instance *IngesterDesc
}

// Initialise the result tracker, which is use to keep track of successes and failures.
var tracker replicationSetResultTracker
if r.MaxUnavailableZones > 0 {
tracker = newZoneAwareResultTracker(r.Ingesters, r.MaxUnavailableZones)
} else {
tracker = newDefaultResultTracker(r.Ingesters, r.MaxErrors)
}

var (
errs = make(chan error, len(r.Ingesters))
resultsChan = make(chan interface{}, len(r.Ingesters))
minSuccess = len(r.Ingesters) - r.MaxErrors
forceStart = make(chan struct{}, r.MaxErrors)
ch = make(chan instanceResult, len(r.Ingesters))
forceStart = make(chan struct{}, r.MaxErrors)
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Spawn a goroutine for each instance.
for i := range r.Ingesters {
go func(i int, ing *IngesterDesc) {
// wait to send extra requests
if i >= minSuccess && delay > 0 {
// Wait to send extra requests. Works only when zone-awareness is disabled.
if delay > 0 && r.MaxUnavailableZones == 0 && i >= len(r.Ingesters)-r.MaxErrors {
after := time.NewTimer(delay)
defer after.Stop()
select {
Expand All @@ -39,32 +59,32 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
}
}
result, err := f(ctx, ing)
if err != nil {
errs <- err
} else {
resultsChan <- result
ch <- instanceResult{
res: result,
err: err,
instance: ing,
}
}(i, &r.Ingesters[i])
}

var (
numErrs int
numSuccess int
results = make([]interface{}, 0, len(r.Ingesters))
)
for numSuccess < minSuccess {
results := make([]interface{}, 0, len(r.Ingesters))

for !tracker.succeeded() {
select {
case err := <-errs:
numErrs++
if numErrs > r.MaxErrors {
return nil, err
}
// force one of the delayed requests to start
forceStart <- struct{}{}
case res := <-ch:
tracker.done(res.instance, res.err)
if res.err != nil {
if tracker.failed() {
return nil, res.err
}

case result := <-resultsChan:
numSuccess++
results = append(results, result)
// force one of the delayed requests to start
if delay > 0 && r.MaxUnavailableZones == 0 {
forceStart <- struct{}{}
}
} else {
results = append(results, res.res)
}

case <-ctx.Done():
return nil, ctx.Err()
Expand Down
Loading