-
Notifications
You must be signed in to change notification settings - Fork 75
Support ring handler on lifecycler #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
0cbe964
816b94b
edf4b07
13169e4
61ccda2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ import ( | |
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/go-kit/log" | ||
| "github.com/go-kit/log/level" | ||
| ) | ||
|
|
||
|
|
@@ -90,7 +91,7 @@ func init() { | |
| pageTemplate = template.Must(t.Parse(pageContent)) | ||
| } | ||
|
|
||
| func (r *Ring) forget(ctx context.Context, id string) error { | ||
| func (h *ringPageHandler) forget(ctx context.Context, id string) error { | ||
| unregister := func(in interface{}) (out interface{}, retry bool, err error) { | ||
| if in == nil { | ||
| return nil, false, fmt.Errorf("found empty ring when trying to unregister") | ||
|
|
@@ -100,7 +101,7 @@ func (r *Ring) forget(ctx context.Context, id string) error { | |
| ringDesc.RemoveIngester(id) | ||
| return ringDesc, true, nil | ||
| } | ||
| return r.KVClient.CAS(ctx, r.key, unregister) | ||
| return h.r.casRing(ctx, unregister) | ||
| } | ||
|
|
||
| type ingesterDesc struct { | ||
|
|
@@ -121,11 +122,30 @@ type httpResponse struct { | |
| ShowTokens bool `json:"-"` | ||
| } | ||
|
|
||
| func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { | ||
| type ringObserver interface { | ||
simonswine marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error | ||
| getRing(context.Context) (*Desc, map[string]uint32, error) | ||
| } | ||
|
|
||
| type ringPageHandler struct { | ||
| logger log.Logger | ||
| r ringObserver | ||
| heartbeatPeriod time.Duration | ||
| } | ||
|
|
||
| func newRingPageHandler(logger log.Logger, r ringObserver, heartbeatPeriod time.Duration) *ringPageHandler { | ||
| return &ringPageHandler{ | ||
| logger: logger, | ||
| r: r, | ||
| heartbeatPeriod: heartbeatPeriod, | ||
| } | ||
| } | ||
|
|
||
| func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { | ||
| if req.Method == http.MethodPost { | ||
| ingesterID := req.FormValue("forget") | ||
| if err := r.forget(req.Context(), ingesterID); err != nil { | ||
| level.Error(r.logger).Log("msg", "error forgetting instance", "err", err) | ||
| if err := h.forget(req.Context(), ingesterID); err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: it would be nice if this error was reported to the user, instead of (or in addition to) being logged. (If we removed logging here, we could remove logger completely)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I have removed it in 13169e4. I guess there is no riks of exposing additional information to the HTTP caller, as the caller has to have quite a lot of access to the cluster anyhow, if they are able to remove ingesters. |
||
| level.Error(h.logger).Log("msg", "error forgetting instance", "err", err) | ||
| } | ||
|
|
||
| // Implement PRG pattern to prevent double-POST and work with CSRF middleware. | ||
|
|
@@ -140,23 +160,25 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |
| return | ||
| } | ||
|
|
||
| r.mtx.RLock() | ||
| defer r.mtx.RUnlock() | ||
| ringDesc, ownedTokens, err := h.r.getRing(req.Context()) | ||
| if err != nil { | ||
| http.Error(w, err.Error(), http.StatusInternalServerError) | ||
| return | ||
| } | ||
|
|
||
| ingesterIDs := []string{} | ||
| for id := range r.ringDesc.Ingesters { | ||
| for id := range ringDesc.Ingesters { | ||
| ingesterIDs = append(ingesterIDs, id) | ||
| } | ||
| sort.Strings(ingesterIDs) | ||
|
|
||
| now := time.Now() | ||
| var ingesters []ingesterDesc | ||
| _, owned := r.countTokens() | ||
| for _, id := range ingesterIDs { | ||
| ing := r.ringDesc.Ingesters[id] | ||
| ing := ringDesc.Ingesters[id] | ||
| heartbeatTimestamp := time.Unix(ing.Timestamp, 0) | ||
| state := ing.State.String() | ||
| if !r.IsHealthy(&ing, Reporting, now) { | ||
| if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) { | ||
| state = unhealthy | ||
| } | ||
|
|
||
|
|
@@ -175,7 +197,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |
| Tokens: ing.Tokens, | ||
| Zone: ing.Zone, | ||
| NumTokens: len(ing.Tokens), | ||
| Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100, | ||
| Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, | ||
| }) | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.