-
Notifications
You must be signed in to change notification settings - Fork 833
Add topology aware read #3414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
pracucci
merged 42 commits into
cortexproject:master
from
MichelHollands:add_topology_aware_read
Nov 18, 2020
+1,126
−43
Merged
Add topology aware read #3414
Changes from all commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
741f9b2
Add unit tests
MichelHollands 91917fd
Add lock to avoid race condition in test
MichelHollands ba3d860
Add zone aware to GetAll
MichelHollands 90d9013
Remove debug print statements
MichelHollands 440d0fc
Add changelog entry
MichelHollands 4362f5d
Fix comment
MichelHollands d9ab63a
Address replication set review comments
MichelHollands f752ce3
Reword and change changelong entry to enhancement
MichelHollands ff8ebcc
Address review comments in ring code
MichelHollands 6e217ef
Do not return early and add more test cases
MichelHollands 9cf6a92
Rename ingesters to instances
MichelHollands 4b8e908
Add one more test
MichelHollands c4d560d
Update pkg/ring/replication_set.go
MichelHollands 3531844
Update pkg/ring/replication_set.go : add sign off
MichelHollands 6edb2b4
Add integration test
MichelHollands e04db17
Fix imports as per goimports
MichelHollands fa1c3b5
Address review comments and add extra tests
MichelHollands c0d7be0
Fix rebase
MichelHollands 9ffa08d
Fix rebase in test
MichelHollands e10842f
Add lock around mockIngester call
MichelHollands 4385980
Add lock around mockIngester call at correct place
MichelHollands be94fff
Handle nr of zones > replication factor
MichelHollands 7f08749
Use util.Min instead of if statement
MichelHollands 847dd71
Update pkg/ring/replication_set_test.go
MichelHollands 7886450
Use atomic and sets
MichelHollands ccdb908
Fixed integration test and ReplicationSet.Do()
pracucci 1259ed9
Added tracker unit tests
pracucci 3facb35
Fixed TestReplicationSet_Do
pracucci c788f17
Commented ReplicationSet max errors and max unavailable zones
pracucci 833eb89
Fixed GetReplicationSetForOperation() logic and improved unit tests
pracucci 3351321
Improved tests
pracucci a87411b
Fixed tests flakyness
pracucci 9902101
Fixed test
pracucci c7d39e9
Update documentation
MichelHollands 75a5dc6
Add note about reads from zone aware clusters
MichelHollands 2647294
Remove extra space
MichelHollands ee08cab
Address some of Peter's review comment
MichelHollands dd18b80
Add special case for rf=2
MichelHollands 97ea59b
Address review comments
MichelHollands 474168f
Fix comment
MichelHollands 1e50276
Update docs with review comments
MichelHollands af5ff2a
Set maxUnavailableZones and change tests
MichelHollands File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
package integration | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/cortexproject/cortex/integration/e2e" | ||
e2edb "github.com/cortexproject/cortex/integration/e2e/db" | ||
"github.com/cortexproject/cortex/integration/e2ecortex" | ||
) | ||
|
||
func TestZoneAwareReplication(t *testing.T) { | ||
s, err := e2e.NewScenario(networkName) | ||
require.NoError(t, err) | ||
defer s.Close() | ||
|
||
flags := BlocksStorageFlags() | ||
flags["-distributor.shard-by-all-labels"] = "true" | ||
flags["-distributor.replication-factor"] = "3" | ||
flags["-distributor.zone-awareness-enabled"] = "true" | ||
|
||
// Start dependencies. | ||
consul := e2edb.NewConsul() | ||
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) | ||
require.NoError(t, s.StartAndWaitReady(consul, minio)) | ||
|
||
// Start Cortex components. | ||
ingesterFlags := func(zone string) map[string]string { | ||
return mergeFlags(flags, map[string]string{ | ||
"-ingester.availability-zone": zone, | ||
}) | ||
} | ||
|
||
ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") | ||
ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") | ||
ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") | ||
ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") | ||
ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") | ||
ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") | ||
require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6)) | ||
|
||
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") | ||
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") | ||
require.NoError(t, s.StartAndWaitReady(distributor, querier)) | ||
|
||
// Wait until distributor and querier have updated the ring. | ||
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( | ||
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), | ||
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) | ||
|
||
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( | ||
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), | ||
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) | ||
|
||
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) | ||
require.NoError(t, err) | ||
|
||
// Push some series | ||
now := time.Now() | ||
numSeries := 100 | ||
expectedVectors := map[string]model.Vector{} | ||
|
||
for i := 1; i <= numSeries; i++ { | ||
metricName := fmt.Sprintf("series_%d", i) | ||
series, expectedVector := generateSeries(metricName, now) | ||
res, err := client.Push(series) | ||
require.NoError(t, err) | ||
require.Equal(t, 200, res.StatusCode) | ||
|
||
expectedVectors[metricName] = expectedVector | ||
} | ||
|
||
// Query back series => all good | ||
for metricName, expectedVector := range expectedVectors { | ||
result, err := client.Query(metricName, now) | ||
require.NoError(t, err) | ||
require.Equal(t, model.ValVector, result.Type()) | ||
assert.Equal(t, expectedVector, result.(model.Vector)) | ||
} | ||
|
||
// SIGKILL 1 ingester in 1st zone | ||
require.NoError(t, ingester1.Kill()) | ||
|
||
// Push 1 more series => all good | ||
numSeries++ | ||
metricName := fmt.Sprintf("series_%d", numSeries) | ||
series, expectedVector := generateSeries(metricName, now) | ||
res, err := client.Push(series) | ||
require.NoError(t, err) | ||
require.Equal(t, 200, res.StatusCode) | ||
|
||
expectedVectors[metricName] = expectedVector | ||
|
||
// Query back series => all good | ||
for metricName, expectedVector := range expectedVectors { | ||
result, err := client.Query(metricName, now) | ||
require.NoError(t, err) | ||
require.Equal(t, model.ValVector, result.Type()) | ||
assert.Equal(t, expectedVector, result.(model.Vector)) | ||
} | ||
|
||
// SIGKILL 1 more ingester in the 1st zone (all ingesters in 1st zone have been killed) | ||
require.NoError(t, ingester2.Kill()) | ||
|
||
// Push 1 more series => all good | ||
numSeries++ | ||
metricName = fmt.Sprintf("series_%d", numSeries) | ||
series, expectedVector = generateSeries(metricName, now) | ||
res, err = client.Push(series) | ||
require.NoError(t, err) | ||
require.Equal(t, 200, res.StatusCode) | ||
|
||
expectedVectors[metricName] = expectedVector | ||
|
||
// Query back series => all good | ||
for metricName, expectedVector := range expectedVectors { | ||
result, err := client.Query(metricName, now) | ||
require.NoError(t, err) | ||
require.Equal(t, model.ValVector, result.Type()) | ||
assert.Equal(t, expectedVector, result.(model.Vector)) | ||
} | ||
|
||
// SIGKILL 1 ingester in the 2nd zone | ||
require.NoError(t, ingester3.Kill()) | ||
|
||
// Query back any series => fail (either because of a timeout or 500) | ||
result, _, err := client.QueryRaw("series_1") | ||
if !errors.Is(err, context.DeadlineExceeded) { | ||
require.NoError(t, err) | ||
require.Equal(t, 500, result.StatusCode) | ||
} | ||
|
||
// SIGKILL 1 more ingester in the 2nd zone (all ingesters in 2nd zone have been killed) | ||
require.NoError(t, ingester4.Kill()) | ||
|
||
// Push 1 more series => fail | ||
series, _ = generateSeries("series_last", now) | ||
res, err = client.Push(series) | ||
require.NoError(t, err) | ||
require.Equal(t, 500, res.StatusCode) | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this changing from stop to kill?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Kill()
function, used by end-to-end tests, is expected to send aSIGKILL
. We were usingdocker stop --time=0
, which is expected to send a SIGTERM and then aSIGKILL
after time0
, but we wondered if there could be some timing issues and a graceful shutdown of the stopped process could actually happen (or at least start). To make it more obvious we're going to send aSIGKILL
, we decided to just usedocker kill
.