diff --git a/CHANGELOG.md b/CHANGELOG.md index d52e3f1db..308e1dd4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ * [ENHANCEMENT] Lifecycler: It's now possible to change default value of lifecycler's `final-sleep`. #121 * [ENHANCEMENT] Memberlist: Update to latest fork of memberlist. #160 * [ENHANCEMENT] Memberlist: extracted HTTP status page handler to `memberlist.HTTPStatusHandler` which now can be instantiated with a custom template. #163 +* [ENHANCEMENT] Ring: extracted HTTP status page handler to `ring.HTTPStatusHandler` which now can be instantiated with a custom template. #166 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/ring/basic_lifecycler.go b/ring/basic_lifecycler.go index 1bb95c083..807228a9e 100644 --- a/ring/basic_lifecycler.go +++ b/ring/basic_lifecycler.go @@ -498,11 +498,7 @@ func (l *BasicLifecycler) run(fn func() error) error { } } -func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return l.store.CAS(ctx, l.ringKey, f) -} - -func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) { +func (l *BasicLifecycler) Describe(ctx context.Context) (*Desc, error) { obj, err := l.store.Get(ctx, l.ringKey) if err != nil { return nil, err @@ -511,6 +507,14 @@ func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) { return GetOrCreateRingDesc(obj), nil } +func (l *BasicLifecycler) Forget(ctx context.Context, id string) error { + return forget(ctx, l.store, l.ringKey, id) +} + +func (l *BasicLifecycler) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool { + return instance.IsHealthy(op, l.cfg.HeartbeatPeriod, now) +} + func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req) + NewHTTPStatusHandler(l, defaultPageTemplate).ServeHTTP(w, req) } diff --git a/ring/http.go b/ring/http.go index bcf3d1cc8..ead7a8ae6 100644 --- a/ring/http.go +++ b/ring/http.go @@ -13,30 +13,19 @@ import ( "time" ) -//go:embed status.gohtml -var defaultPageContent string -var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ - "mod": func(i, j int) bool { return i%j == 0 }, - "humanFloat": func(f float64) string { - return fmt.Sprintf("%.2g", f) - }, - "timeOrEmptyString": func(t time.Time) string { - if t.IsZero() { - return "" - } - return t.Format(time.RFC3339Nano) - }, - "durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Millisecond).String() }, -}).Parse(defaultPageContent)) - -type httpResponse struct { - Ingesters []ingesterDesc `json:"shards"` - Now time.Time `json:"now"` - ShowTokens bool `json:"-"` +type StatusPageData struct { + // Ingesters is the list of the ingesters found in the ring. + Ingesters []IngesterDesc `json:"shards"` + // ShowTokens is true if user requested to see show the tokens. + // Tokens are always provided in the IngesterDesc struct, regardless of this param. + ShowTokens bool `json:"-"` + // Now is the current time (time when template was rendered) + Now time.Time `json:"now"` } -type ingesterDesc struct { - ID string `json:"id"` +type IngesterDesc struct { + ID string `json:"id"` + // State can be: "ACTIVE", "LEAVING", "PENDING", "JOINING", "LEFT" or "UNHEALTHY" State string `json:"state"` Address string `json:"address"` HeartbeatTimestamp time.Time `json:"timestamp"` @@ -44,30 +33,38 @@ type ingesterDesc struct { Zone string `json:"zone"` Tokens []uint32 `json:"tokens"` NumTokens int `json:"-"` - Ownership float64 `json:"-"` + // Ownership represents the percentage (0-100) of the tokens owned by this instance. + Ownership float64 `json:"-"` } -type ringAccess interface { - casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error - getRing(context.Context) (*Desc, error) -} +// Operator allows external entities to perform generic operations on the ring, +// like describing it or force-forgetting one of the members. +type Operator interface { + Describe(ctx context.Context) (*Desc, error) + Forget(ctx context.Context, id string) error -type ringPageHandler struct { - r ringAccess - heartbeatPeriod time.Duration + IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool } -func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler { - return &ringPageHandler{ - r: r, - heartbeatPeriod: heartbeatPeriod, +// NewHTTPStatusHandler will use the provided Operator to build an http.Handler to inspect the ring status over http. +// It will render the provided template (unless Accept: application/json header is provided, in which case it will return a JSON response). +// The handler provided also can force forgetting members of the ring by sending a POST request with the ID in the `forget` field. +func NewHTTPStatusHandler(r Operator, tpl *template.Template) HTTPStatusHandler { + return HTTPStatusHandler{ + r: r, + template: tpl, } } -func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { +type HTTPStatusHandler struct { + r Operator + template *template.Template +} + +func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if req.Method == http.MethodPost { ingesterID := req.FormValue("forget") - if err := h.forget(req.Context(), ingesterID); err != nil { + if err := h.r.Forget(req.Context(), ingesterID); err != nil { http.Error( w, fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(), @@ -88,7 +85,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { return } - ringDesc, err := h.r.getRing(req.Context()) + ringDesc, err := h.r.Describe(req.Context()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -102,15 +99,15 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { sort.Strings(ingesterIDs) now := time.Now() - var ingesters []ingesterDesc + var ingesters []IngesterDesc for _, id := range ingesterIDs { ing := ringDesc.Ingesters[id] state := ing.State.String() - if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) { + if !h.r.IsHealthy(&ing, Reporting, now) { state = "UNHEALTHY" } - ingesters = append(ingesters, ingesterDesc{ + ingesters = append(ingesters, IngesterDesc{ ID: id, State: state, Address: ing.Addr, @@ -125,16 +122,16 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { tokensParam := req.URL.Query().Get("tokens") - renderHTTPResponse(w, httpResponse{ + renderHTTPResponse(w, StatusPageData{ Ingesters: ingesters, Now: now, ShowTokens: tokensParam == "true", - }, defaultPageTemplate, req) + }, h.template, req) } -// RenderHTTPResponse either responds with json or a rendered html page using the passed in template +// renderHTTPResponse either responds with json or a rendered html page using the passed in template // by checking the Accepts header -func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Template, r *http.Request) { +func renderHTTPResponse(w http.ResponseWriter, v StatusPageData, t *template.Template, r *http.Request) { accept := r.Header.Get("Accept") if strings.Contains(accept, "application/json") { writeJSONResponse(w, v) @@ -147,24 +144,27 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ } } -func (h *ringPageHandler) forget(ctx context.Context, id string) error { - unregister := func(in interface{}) (out interface{}, retry bool, err error) { - if in == nil { - return nil, false, fmt.Errorf("found empty ring when trying to unregister") - } - - ringDesc := in.(*Desc) - ringDesc.RemoveIngester(id) - return ringDesc, true, nil - } - return h.r.casRing(ctx, unregister) -} - // WriteJSONResponse writes some JSON as a HTTP response. -func writeJSONResponse(w http.ResponseWriter, v httpResponse) { +func writeJSONResponse(w http.ResponseWriter, v StatusPageData) { w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(v); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } } + +//go:embed status.gohtml +var defaultPageContent string +var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ + "mod": func(i, j int) bool { return i%j == 0 }, + "humanFloat": func(f float64) string { + return fmt.Sprintf("%.2g", f) + }, + "timeOrEmptyString": func(t time.Time) string { + if t.IsZero() { + return "" + } + return t.Format(time.RFC3339Nano) + }, + "durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Millisecond).String() }, +}).Parse(defaultPageContent)) diff --git a/ring/lifecycler.go b/ring/lifecycler.go index 9db0a7e6b..d40e5074e 100644 --- a/ring/lifecycler.go +++ b/ring/lifecycler.go @@ -851,11 +851,7 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { time.Sleep(i.cfg.FinalSleep) } -func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return i.KVStore.CAS(ctx, i.RingKey, f) -} - -func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) { +func (i *Lifecycler) Describe(ctx context.Context) (*Desc, error) { obj, err := i.KVStore.Get(ctx, i.RingKey) if err != nil { return nil, err @@ -864,8 +860,16 @@ func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) { return GetOrCreateRingDesc(obj), nil } +func (i *Lifecycler) Forget(ctx context.Context, id string) error { + return forget(ctx, i.KVStore, i.RingKey, id) +} + +func (i *Lifecycler) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool { + return instance.IsHealthy(op, i.cfg.HeartbeatPeriod, now) +} + func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req) + NewHTTPStatusHandler(i, defaultPageTemplate).ServeHTTP(w, req) } // unregister removes our entry from consul. diff --git a/ring/replication_set_test.go b/ring/replication_set_test.go index 42ecc0f11..2694dbae7 100644 --- a/ring/replication_set_test.go +++ b/ring/replication_set_test.go @@ -157,7 +157,7 @@ func TestReplicationSet_Do(t *testing.T) { expectedError: errFailure, }, { - name: "max errors = 1, should handle context canceled", + name: "max errors = 1, should ServeHTTP context canceled", instances: []InstanceDesc{{}, {}, {}}, maxErrors: 1, f: func(c context.Context, id *InstanceDesc) (interface{}, error) { diff --git a/ring/ring.go b/ring/ring.go index be78fee59..77e32d213 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -815,11 +815,7 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) { } } -func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { - return r.KVClient.CAS(ctx, r.key, f) -} - -func (r *Ring) getRing(ctx context.Context) (*Desc, error) { +func (r *Ring) Describe(ctx context.Context) (*Desc, error) { r.mtx.RLock() defer r.mtx.RUnlock() @@ -828,8 +824,12 @@ func (r *Ring) getRing(ctx context.Context) (*Desc, error) { return ringDesc, nil } +func (r *Ring) Forget(ctx context.Context, id string) error { + return forget(ctx, r.KVClient, r.key, id) +} + func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { - newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req) + NewHTTPStatusHandler(r, defaultPageTemplate).ServeHTTP(w, req) } // Operation describes which instances can be included in the replica set, based on their state. diff --git a/ring/status.gohtml b/ring/status.gohtml index 80e5b6a8f..00d852a92 100644 --- a/ring/status.gohtml +++ b/ring/status.gohtml @@ -1,4 +1,4 @@ -{{- /*gotype: github.com/grafana/dskit/ring.httpResponse */ -}} +{{- /*gotype: github.com/grafana/dskit/ring.StatusPageData */ -}} diff --git a/ring/util.go b/ring/util.go index b39f2f26e..04f351da3 100644 --- a/ring/util.go +++ b/ring/util.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/kv" ) // GenerateTokens make numTokens unique random tokens, none of which clash @@ -222,3 +223,16 @@ func filterIPs(addrs []net.Addr) string { } return ipAddr } + +func forget(ctx context.Context, client kv.Client, ringKey, id string) error { + unregister := func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return nil, false, fmt.Errorf("found empty ring when trying to unregister") + } + + ringDesc := in.(*Desc) + ringDesc.RemoveIngester(id) + return ringDesc, true, nil + } + return client.CAS(ctx, ringKey, unregister) +}