diff --git a/CHANGELOG.md b/CHANGELOG.md index a3a67982020..60e7b15ea10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## master / unreleased +* [ENHANCEMENT] Ruler: optimized `/api/v1/rules` and `/api/v1/alerts` when ruler sharding is enabled. #3916 +* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916 + * `cortex_ruler_clients` + * `cortex_ruler_client_request_duration_seconds` + ## 1.8.0 in progress * [CHANGE] Alertmanager: Don't expose cluster information to tenants via the `/alertmanager/api/v1/status` API endpoint when operating with clustering enabled. diff --git a/integration/configs.go b/integration/configs.go index 9d2339e3bbc..cb1602ae7a1 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -168,6 +168,14 @@ var ( } } + RulerShardingFlags = func(consulAddress string) map[string]string { + return map[string]string{ + "-ruler.enable-sharding": "true", + "-ruler.ring.store": "consul", + "-ruler.ring.consul.hostname": consulAddress, + } + } + BlocksStorageFlags = func() map[string]string { return map[string]string{ "-store.engine": blocksStorageEngine, diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index b3033807c6a..f70e8cd2a0c 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -23,6 +23,8 @@ import ( "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/prompb" yaml "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/ruler" ) var ( @@ -213,7 +215,49 @@ type ServerStatus struct { } `json:"data"` } -// GetRuleGroups gets the status of an alertmanager instance +// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules. +func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) { + // Create HTTP request + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil) + if err != nil { + return nil, err + } + req.Header.Set("X-Scope-OrgID", c.orgID) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + // Decode the response. + type response struct { + Status string `json:"status"` + Data ruler.RuleDiscovery `json:"data"` + } + + decoded := &response{} + if err := json.Unmarshal(body, decoded); err != nil { + return nil, err + } + + if decoded.Status != "success" { + return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status) + } + + return decoded.Data.RuleGroups, nil +} + +// GetRuleGroups gets the configured rule groups from the ruler. func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) { // Create HTTP request req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/rules", c.rulerAddress), nil) @@ -247,7 +291,7 @@ func (c *Client) GetRuleGroups() (map[string][]rulefmt.RuleGroup, error) { return rgs, nil } -// SetRuleGroup gets the status of an alertmanager instance +// SetRuleGroup configures the provided rulegroup to the ruler. func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) error { // Create write request data, err := yaml.Marshal(rulegroup) @@ -277,7 +321,7 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err return nil } -// DeleteRuleGroup gets the status of an alertmanager instance +// DeleteRuleGroup deletes a rule group. func (c *Client) DeleteRuleGroup(namespace string, groupName string) error { // Create HTTP request req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/prom/rules/%s/%s", c.rulerAddress, url.PathEscape(namespace), url.PathEscape(groupName)), nil) diff --git a/integration/ruler_test.go b/integration/ruler_test.go index f07ac7fb3f5..3c6acb19484 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "path/filepath" + "strconv" "strings" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" @@ -301,7 +303,87 @@ func TestRulerEvaluationDelay(t *testing.T) { } } require.Equal(t, len(series.Samples), inputPos, "expect to have returned all evaluations") +} + +func TestRulerSharding(t *testing.T) { + const numRulesGroups = 100 + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Generate multiple rule groups, with 1 rule each. + ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups) + expectedNames := make([]string, numRulesGroups) + for i := 0; i < numRulesGroups; i++ { + var recordNode yaml.Node + var exprNode yaml.Node + + recordNode.SetString(fmt.Sprintf("rule_%d", i)) + exprNode.SetString(strconv.Itoa(i)) + ruleName := fmt.Sprintf("test_%d", i) + + expectedNames[i] = ruleName + ruleGroups[i] = rulefmt.RuleGroup{ + Name: ruleName, + Interval: 60, + Rules: []rulefmt.RuleNode{{ + Record: recordNode, + Expr: exprNode, + }}, + } + } + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + // Configure the ruler. + rulerFlags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(false), + RulerShardingFlags(consul.NetworkHTTPEndpoint()), + map[string]string{ + // Since we're not going to run any rule, we don't need the + // store-gateway to be configured to a valid address. + "-querier.store-gateway-addresses": "localhost:12345", + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + }, + ) + + // Start rulers. + ruler1 := e2ecortex.NewRuler("ruler-1", rulerFlags, "") + ruler2 := e2ecortex.NewRuler("ruler-2", rulerFlags, "") + rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2) + require.NoError(t, s.StartAndWaitReady(ruler1, ruler2)) + + // Upload rule groups to one of the rulers. + c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1") + require.NoError(t, err) + + for _, ruleGroup := range ruleGroups { + require.NoError(t, c.SetRuleGroup(ruleGroup, "test")) + } + + // Wait until rulers have loaded all rules. + require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics)) + + // Since rulers have loaded all rules, we expect that rules have been sharded + // between the two rulers. + require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules")) + require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules")) + + // Fetch the rules and ensure they match the configured ones. + actualGroups, err := c.GetPrometheusRules() + require.NoError(t, err) + + var actualNames []string + for _, group := range actualGroups { + actualNames = append(actualNames, group.Name) + } + assert.ElementsMatch(t, expectedNames, actualNames) } func TestRulerAlertmanager(t *testing.T) { diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 2a26289a11b..8752f4f1009 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -4,7 +4,7 @@ import ( "context" "encoding/json" "errors" - io "io" + "io" "io/ioutil" "net/http" "net/http/httptest" diff --git a/pkg/ruler/client_pool.go b/pkg/ruler/client_pool.go new file mode 100644 index 00000000000..a2d6241f708 --- /dev/null +++ b/pkg/ruler/client_pool.go @@ -0,0 +1,79 @@ +package ruler + +import ( + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/util/grpcclient" +) + +func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prometheus.Registerer) *client.Pool { + // We prefer sane defaults instead of exposing further config options. + poolCfg := client.PoolConfig{ + CheckInterval: time.Minute, + HealthCheckEnabled: true, + HealthCheckTimeout: 10 * time.Second, + } + + clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ruler_clients", + Help: "The current number of ruler clients in the pool.", + }) + + return client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger) +} + +func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { + requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_ruler_client_request_duration_seconds", + Help: "Time spent executing requests to the ruler.", + Buckets: prometheus.ExponentialBuckets(0.008, 4, 7), + }, []string{"operation", "status_code"}) + + return func(addr string) (client.PoolClient, error) { + return dialRulerClient(clientCfg, addr, requestDuration) + } +} + +func dialRulerClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) { + opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration)) + if err != nil { + return nil, err + } + + conn, err := grpc.Dial(addr, opts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial ruler %s", addr) + } + + return &rulerExtendedClient{ + RulerClient: NewRulerClient(conn), + HealthClient: grpc_health_v1.NewHealthClient(conn), + conn: conn, + }, nil +} + +type rulerExtendedClient struct { + RulerClient + grpc_health_v1.HealthClient + conn *grpc.ClientConn +} + +func (c *rulerExtendedClient) Close() error { + return c.conn.Close() +} + +func (c *rulerExtendedClient) String() string { + return c.RemoteAddress() +} + +func (c *rulerExtendedClient) RemoteAddress() string { + return c.conn.Target() +} diff --git a/pkg/ruler/client_pool_test.go b/pkg/ruler/client_pool_test.go new file mode 100644 index 00000000000..11c2ce4c2bb --- /dev/null +++ b/pkg/ruler/client_pool_test.go @@ -0,0 +1,68 @@ +package ruler + +import ( + "context" + "net" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/grpcclient" +) + +func Test_newRulerClientFactory(t *testing.T) { + // Create a GRPC server used to query the mocked service. + grpcServer := grpc.NewServer() + defer grpcServer.GracefulStop() + + srv := &mockRulerServer{} + RegisterRulerServer(grpcServer, srv) + + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + go func() { + require.NoError(t, grpcServer.Serve(listener)) + }() + + // Create a client factory and query back the mocked service + // with different clients. + cfg := grpcclient.Config{} + flagext.DefaultValues(&cfg) + + reg := prometheus.NewPedanticRegistry() + factory := newRulerClientFactory(cfg, reg) + + for i := 0; i < 2; i++ { + client, err := factory(listener.Addr().String()) + require.NoError(t, err) + defer client.Close() //nolint:errcheck + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = client.(*rulerExtendedClient).Rules(ctx, &RulesRequest{}) + assert.NoError(t, err) + } + + // Assert on the request duration metric, but since it's a duration histogram and + // we can't predict the exact time it took, we need to workaround it. + metrics, err := reg.Gather() + require.NoError(t, err) + + assert.Len(t, metrics, 1) + assert.Equal(t, "cortex_ruler_client_request_duration_seconds", metrics[0].GetName()) + assert.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType()) + assert.Len(t, metrics[0].GetMetric(), 1) + assert.Equal(t, uint64(2), metrics[0].GetMetric()[0].GetHistogram().GetSampleCount()) +} + +type mockRulerServer struct{} + +func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) { + return &RulesResponse{}, nil +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ec3548a3aeb..2a0b1c08040 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -23,15 +23,16 @@ import ( "github.com/prometheus/prometheus/util/strutil" "github.com/weaveworks/common/user" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" + ring_client "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/grpcclient" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -207,13 +208,18 @@ type MultiTenantManager interface { type Ruler struct { services.Service - cfg Config - lifecycler *ring.BasicLifecycler - ring *ring.Ring - subservices *services.Manager - store rulestore.RuleStore - manager MultiTenantManager - limits RulesLimits + cfg Config + lifecycler *ring.BasicLifecycler + ring *ring.Ring + store rulestore.RuleStore + manager MultiTenantManager + limits RulesLimits + + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + // Pool of clients used to connect to other ruler replicas. + clientsPool *ring_client.Pool ringCheckErrors prometheus.Counter rulerSync *prometheus.CounterVec @@ -225,12 +231,13 @@ type Ruler struct { // NewRuler creates a new ruler from a distributor and chunk store. func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) { ruler := &Ruler{ - cfg: cfg, - store: ruleStore, - manager: manager, - registry: reg, - logger: logger, - limits: limits, + cfg: cfg, + store: ruleStore, + manager: manager, + registry: reg, + logger: logger, + limits: limits, + clientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg), ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ruler_ring_check_errors_total", @@ -293,14 +300,20 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { } func (r *Ruler) starting(ctx context.Context) error { - // If sharding is enabled, start the ruler ring subservices + // If sharding is enabled, start the used subservices. if r.cfg.EnableSharding { var err error - r.subservices, err = services.NewManager(r.lifecycler, r.ring) - if err == nil { - err = services.StartManagerAndAwaitHealthy(ctx, r.subservices) + + if r.subservices, err = services.NewManager(r.lifecycler, r.ring, r.clientsPool); err != nil { + return errors.Wrap(err, "unable to start ruler subservices") + } + + r.subservicesWatcher = services.NewFailureWatcher() + r.subservicesWatcher.WatchManager(r.subservices) + + if err = services.StartManagerAndAwaitHealthy(ctx, r.subservices); err != nil { + return errors.Wrap(err, "unable to start ruler subservices") } - return errors.Wrap(err, "failed to start ruler's services") } // TODO: ideally, ruler would wait until its queryable is finished starting. @@ -313,7 +326,6 @@ func (r *Ruler) stopping(_ error) error { r.manager.Stop() if r.subservices != nil { - // subservices manages ring and lifecycler, if sharding was enabled. _ = services.StopManagerAndAwaitStopped(context.Background(), r.subservices) } return nil @@ -432,6 +444,8 @@ func (r *Ruler) run(ctx context.Context) error { ringLastState = currRingState r.syncRules(ctx, rulerSyncReasonRingChange) } + case err := <-r.subservicesWatcher.Chan(): + return errors.Wrap(err, "ruler subservice failed") } } } @@ -699,32 +713,35 @@ func (r *Ruler) getShardedRules(ctx context.Context) ([]*GroupStateDesc, error) return nil, fmt.Errorf("unable to inject user ID into grpc request, %v", err) } - var rgs []*GroupStateDesc + var ( + mergedMx sync.Mutex + merged []*GroupStateDesc + ) + + // Concurrently fetch rules from all rulers. Since rules are not replicated, + // we need all requests to succeed. + jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) + err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { + addr := job.(string) - for _, rlr := range rulers.Ingesters { - dialOpts, err := r.cfg.ClientTLSConfig.DialOption(nil, nil) + grpcClient, err := r.clientsPool.GetClientFor(addr) if err != nil { - return nil, err + return errors.Wrapf(err, "unable to get client for ruler %s", addr) } - conn, err := grpc.DialContext(ctx, rlr.Addr, dialOpts...) + + newGrps, err := grpcClient.(RulerClient).Rules(ctx, &RulesRequest{}) if err != nil { - return nil, err + return errors.Wrapf(err, "unable to retrieve rules from ruler %s", addr) } - cc := NewRulerClient(conn) - newGrps, err := cc.Rules(ctx, &RulesRequest{}) - // Close the gRPC connection regardless the RPC was successful or not. - if closeErr := conn.Close(); closeErr != nil { - level.Warn(r.logger).Log("msg", "failed to close gRPC connection to ruler", "remote", rlr.Addr, "err", closeErr) - } + mergedMx.Lock() + merged = append(merged, newGrps.Groups...) + mergedMx.Unlock() - if err != nil { - return nil, fmt.Errorf("unable to retrieve rules from other rulers, %v", err) - } - rgs = append(rgs, newGrps.Groups...) - } + return nil + }) - return rgs, nil + return merged, err } // Rules implements the rules service