Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [BUGFIX] Querier: Merge results from chunks and blocks ingesters when using streaming of results. #3013
* [BUGFIX] Querier: query /series from ingesters regardless the `-querier.query-ingesters-within` setting. #3035
* [BUGFIX] Experimental blocks storage: Ingester is less likely to hit gRPC message size limit when streaming data to queriers. #3015
* [BUGFIX] Experimental blocks storage: fixed memberlist support for the store-gateways ring used when blocks sharding is enabled. #3058
* [BUGFIX] Fix configuration for TLS server validation, TLS skip verify was hardcoded to true for all TLS configurations and prevented validation of server certificates. #3030
* [BUGFIX] Fixes the Alertmanager panicking when no `-alertmanager.web.external-url` is provided. #3017

Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/single-process-config-blocks-gossip-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ blocks_storage:
filesystem:
dir: /tmp/cortex/storage

store_gateway:
sharding_enabled: true
sharding_ring:
kvstore:
store: memberlist

frontend_worker:
match_max_concurrent: true

Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/single-process-config-blocks-gossip-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ blocks_storage:
filesystem:
dir: /tmp/cortex/storage

store_gateway:
sharding_enabled: true
sharding_ring:
kvstore:
store: memberlist

frontend_worker:
match_max_concurrent: true

Expand Down
3 changes: 2 additions & 1 deletion integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ func (p *HTTPReadinessProbe) Ready(service *ConcreteService) (err error) {
return nil
}

return fmt.Errorf("got status code: %v, expected code in range: [%v, %v]", res.StatusCode, p.expectedStatusRangeStart, p.expectedStatusRangeEnd)
body, _ := ioutil.ReadAll(res.Body)
return fmt.Errorf("expected code in range: [%v, %v], got status code: %v and body: %v", p.expectedStatusRangeStart, p.expectedStatusRangeEnd, res.StatusCode, string(body))
}

// TCPReadinessProbe checks readiness by ensure a TCP connection can be established.
Expand Down
48 changes: 41 additions & 7 deletions integration/getting_started_with_gossiped_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"fmt"
"testing"
"time"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

Expand All @@ -20,15 +22,26 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Start Cortex components.
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-gossip-1.yaml", "config1.yaml"))
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-gossip-2.yaml", "config2.yaml"))

// We don't care for storage part too much here. Both Cortex instances will write new blocks to /tmp, but that's fine.
flags := map[string]string{
// decrease timeouts to make test faster. should still be fine with two instances only
"-ingester.join-after": "0s", // join quickly
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-ingester.join-after": "0s", // join quickly
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
"-experimental.blocks-storage.bucket-store.sync-interval": "1s", // sync continuously
"-experimental.blocks-storage.backend": "s3",
"-experimental.blocks-storage.s3.bucket-name": bucketName,
"-experimental.blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
"-experimental.blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-experimental.blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-experimental.blocks-storage.s3.insecure": "true",
}

// This cortex will fail to join the cluster configured in yaml file. That's fine.
Expand All @@ -44,13 +57,17 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(cortex1))
require.NoError(t, s.StartAndWaitReady(cortex2))

// Both Cortex serves should see each other.
// Both Cortex servers should see each other.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "memberlist_client_cluster_members_count"))

// Both Cortex servers should have 512 tokens
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))
// Both Cortex servers should have 512 tokens for ingesters ring and 512 tokens for store-gateways ring.
for _, ringName := range []string{"ingester", "store-gateway"} {
ringMatcher := labels.MustNewMatcher(labels.MatchEqual, "name", ringName)

require.NoError(t, cortex1.WaitSumMetricsWithOptions(e2e.Equals(2*512), []string{"cortex_ring_tokens_total"}, e2e.WithLabelMatchers(ringMatcher)))
require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(2*512), []string{"cortex_ring_tokens_total"}, e2e.WithLabelMatchers(ringMatcher)))
}

// We need two "ring members" visible from both Cortex instances
require.NoError(t, cortex1.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
Expand All @@ -67,7 +84,7 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Cortex2 (Cortex1 may not yet see Cortex2 as ACTIVE due to gossip, so we play it safe by pushing to Cortex2)
// Push some series to Cortex2
now := time.Now()
series, expectedVector := generateSeries("series_1", now)

Expand All @@ -80,4 +97,21 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))

// Before flushing the blocks we expect no store-gateway has loaded any block.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(0), "cortex_bucket_store_blocks_loaded"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(0), "cortex_bucket_store_blocks_loaded"))

// Flush blocks from ingesters to the store.
for _, instance := range []*e2ecortex.CortexService{cortex1, cortex2} {
res, err = e2e.GetRequest("http://" + instance.HTTPEndpoint() + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)
}

// Given store-gateway blocks sharding is enabled with the default replication factor of 3,
// and ingestion replication factor is 1, we do expect the series has been ingested by 1
// single ingester and so we have 1 block shipped from ingesters and loaded by both store-gateways.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_bucket_store_blocks_loaded"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(1), "cortex_bucket_store_blocks_loaded"))
}
18 changes: 10 additions & 8 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func (t *Cortex) initServer() (services.Service, error) {

func (t *Cortex) initRing() (serv services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
Expand Down Expand Up @@ -153,7 +152,6 @@ func (t *Cortex) initOverrides() (serv services.Service, err error) {

func (t *Cortex) initDistributor() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down Expand Up @@ -297,7 +295,6 @@ func (t *Cortex) tsdbIngesterConfig() {

func (t *Cortex) initIngester() (serv services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ingester.ShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.tsdbIngesterConfig()
Expand Down Expand Up @@ -504,7 +501,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
}

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
queryable, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer)

Expand Down Expand Up @@ -557,7 +553,6 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {

func (t *Cortex) initCompactor() (serv services.Service, err error) {
t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
Expand All @@ -575,7 +570,6 @@ func (t *Cortex) initStoreGateway() (serv services.Service, err error) {
}

t.Cfg.StoreGateway.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

t.StoreGateway, err = storegateway.NewStoreGateway(t.Cfg.StoreGateway, t.Cfg.BlocksStorage, t.Overrides, t.Cfg.Server.LogLevel, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
Expand All @@ -594,6 +588,14 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {
ring.GetCodec(),
}
t.MemberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV)

// Update the config.
t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

return t.MemberlistKV, nil
}

Expand Down Expand Up @@ -655,15 +657,15 @@ func (t *Cortex) setupModuleManager() error {
Store: {Overrides, DeleteRequestsStore},
Ingester: {Overrides, Store, API, RuntimeConfig, MemberlistKV},
Flusher: {Store, API},
Querier: {Overrides, Distributor, Store, Ring, API, StoreQueryable},
Querier: {Overrides, Distributor, Store, Ring, API, StoreQueryable, MemberlistKV},
StoreQueryable: {Overrides, Store},
QueryFrontend: {API, Overrides, DeleteRequestsStore},
TableManager: {API},
Ruler: {Overrides, Distributor, Store, StoreQueryable, RulerStorage},
Configs: {API},
AlertManager: {API},
Compactor: {API},
StoreGateway: {API, Overrides},
StoreGateway: {API, Overrides, MemberlistKV},
Purger: {Store, DeleteRequestsStore, API},
All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Purger, StoreGateway, Ruler},
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,18 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc,
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState())
}

prevTimestamp := instanceDesc.Timestamp
changed := update(ringDesc, &instanceDesc)
if ok && !changed {
return nil, false, nil
}

// Memberlist requires that the timestamp always change, so we do update it unless
// was updated in the callback function.
if instanceDesc.Timestamp == prevTimestamp {
instanceDesc.Timestamp = time.Now().Unix()
}

ringDesc.Ingesters[l.cfg.ID] = instanceDesc
return ringDesc, true, nil
})
Expand Down
7 changes: 4 additions & 3 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ func (i *Lifecycler) CheckReady(ctx context.Context) error {
}

if err := ringDesc.Ready(time.Now(), i.cfg.RingConfig.HeartbeatTimeout); err != nil {
level.Warn(util.Logger).Log("msg", "found an existing ingester(s) with a problem in the ring, "+
"this ingester cannot complete joining and become ready until this problem is resolved. "+
"The /ring http endpoint on the distributor (or single binary) provides visibility into the ring.", "err", err)
level.Warn(util.Logger).Log("msg", "found an existing instance(s) with a problem in the ring, "+
"this instance cannot complete joining and become ready until this problem is resolved. "+
"The /ring http endpoint on the distributor (or single binary) provides visibility into the ring.",
"ring", i.RingName, "err", err)
return err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
numTokens := 0
for id, ingester := range d.Ingesters {
if now.Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout {
return fmt.Errorf("ingester %s past heartbeat timeout", id)
return fmt.Errorf("instance %s past heartbeat timeout", id)
} else if ingester.State != ACTIVE {
return fmt.Errorf("ingester %s in state %v", id, ingester.State)
return fmt.Errorf("instance %s in state %v", id, ingester.State)
}
numTokens += len(ingester.Tokens)
}

if numTokens == 0 {
return fmt.Errorf("Not ready: no tokens in ring")
return fmt.Errorf("no tokens in ring")
}
return nil
}
Expand Down