Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* [ENHANCEMENT] Lifecycler: It's now possible to change default value of lifecycler's `final-sleep`. #121
* [ENHANCEMENT] Memberlist: Update to latest fork of memberlist. #160
* [ENHANCEMENT] Memberlist: extracted HTTP status page handler to `memberlist.HTTPStatusHandler` which now can be instantiated with a custom template. #163
* [ENHANCEMENT] Ring: extracted HTTP status page handler to `ring.HTTPStatusHandler` which now can be instantiated with a custom template. #166
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
16 changes: 10 additions & 6 deletions ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,7 @@ func (l *BasicLifecycler) run(fn func() error) error {
}
}

func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return l.store.CAS(ctx, l.ringKey, f)
}

func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) {
func (l *BasicLifecycler) Describe(ctx context.Context) (*Desc, error) {
obj, err := l.store.Get(ctx, l.ringKey)
if err != nil {
return nil, err
Expand All @@ -511,6 +507,14 @@ func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) {
return GetOrCreateRingDesc(obj), nil
}

func (l *BasicLifecycler) Forget(ctx context.Context, id string) error {
return forget(ctx, l.store, l.ringKey, id)
}

func (l *BasicLifecycler) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool {
return instance.IsHealthy(op, l.cfg.HeartbeatPeriod, now)
}

func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req)
NewHTTPStatusHandler(l, defaultPageTemplate).ServeHTTP(w, req)
}
116 changes: 58 additions & 58 deletions ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,58 @@ import (
"time"
)

//go:embed status.gohtml
var defaultPageContent string
var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
"mod": func(i, j int) bool { return i%j == 0 },
"humanFloat": func(f float64) string {
return fmt.Sprintf("%.2g", f)
},
"timeOrEmptyString": func(t time.Time) string {
if t.IsZero() {
return ""
}
return t.Format(time.RFC3339Nano)
},
"durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Millisecond).String() },
}).Parse(defaultPageContent))

type httpResponse struct {
Ingesters []ingesterDesc `json:"shards"`
Now time.Time `json:"now"`
ShowTokens bool `json:"-"`
type StatusPageData struct {
// Ingesters is the list of the ingesters found in the ring.
Ingesters []IngesterDesc `json:"shards"`
// ShowTokens is true if user requested to see show the tokens.
// Tokens are always provided in the IngesterDesc struct, regardless of this param.
ShowTokens bool `json:"-"`
// Now is the current time (time when template was rendered)
Now time.Time `json:"now"`
}

type ingesterDesc struct {
ID string `json:"id"`
type IngesterDesc struct {
ID string `json:"id"`
// State can be: "ACTIVE", "LEAVING", "PENDING", "JOINING", "LEFT" or "UNHEALTHY"
State string `json:"state"`
Address string `json:"address"`
HeartbeatTimestamp time.Time `json:"timestamp"`
RegisteredTimestamp time.Time `json:"registered_timestamp"`
Zone string `json:"zone"`
Tokens []uint32 `json:"tokens"`
NumTokens int `json:"-"`
Ownership float64 `json:"-"`
// Ownership represents the percentage (0-100) of the tokens owned by this instance.
Ownership float64 `json:"-"`
}

type ringAccess interface {
casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error
getRing(context.Context) (*Desc, error)
}
// Operator allows external entities to perform generic operations on the ring,
// like describing it or force-forgetting one of the members.
type Operator interface {
Describe(ctx context.Context) (*Desc, error)
Forget(ctx context.Context, id string) error

type ringPageHandler struct {
r ringAccess
heartbeatPeriod time.Duration
IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool
}

func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler {
return &ringPageHandler{
r: r,
heartbeatPeriod: heartbeatPeriod,
// NewHTTPStatusHandler will use the provided Operator to build an http.Handler to inspect the ring status over http.
// It will render the provided template (unless Accept: application/json header is provided, in which case it will return a JSON response).
// The handler provided also can force forgetting members of the ring by sending a POST request with the ID in the `forget` field.
func NewHTTPStatusHandler(r Operator, tpl *template.Template) HTTPStatusHandler {
return HTTPStatusHandler{
r: r,
template: tpl,
}
}

func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
type HTTPStatusHandler struct {
r Operator
template *template.Template
}

func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
ingesterID := req.FormValue("forget")
if err := h.forget(req.Context(), ingesterID); err != nil {
if err := h.r.Forget(req.Context(), ingesterID); err != nil {
http.Error(
w,
fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(),
Expand All @@ -88,7 +85,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
return
}

ringDesc, err := h.r.getRing(req.Context())
ringDesc, err := h.r.Describe(req.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -102,15 +99,15 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
sort.Strings(ingesterIDs)

now := time.Now()
var ingesters []ingesterDesc
var ingesters []IngesterDesc
for _, id := range ingesterIDs {
ing := ringDesc.Ingesters[id]
state := ing.State.String()
if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) {
if !h.r.IsHealthy(&ing, Reporting, now) {
state = "UNHEALTHY"
}

ingesters = append(ingesters, ingesterDesc{
ingesters = append(ingesters, IngesterDesc{
ID: id,
State: state,
Address: ing.Addr,
Expand All @@ -125,16 +122,16 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {

tokensParam := req.URL.Query().Get("tokens")

renderHTTPResponse(w, httpResponse{
renderHTTPResponse(w, StatusPageData{
Ingesters: ingesters,
Now: now,
ShowTokens: tokensParam == "true",
}, defaultPageTemplate, req)
}, h.template, req)
}

// RenderHTTPResponse either responds with json or a rendered html page using the passed in template
// renderHTTPResponse either responds with json or a rendered html page using the passed in template
// by checking the Accepts header
func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Template, r *http.Request) {
func renderHTTPResponse(w http.ResponseWriter, v StatusPageData, t *template.Template, r *http.Request) {
accept := r.Header.Get("Accept")
if strings.Contains(accept, "application/json") {
writeJSONResponse(w, v)
Expand All @@ -147,24 +144,27 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ
}
}

func (h *ringPageHandler) forget(ctx context.Context, id string) error {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}

ringDesc := in.(*Desc)
ringDesc.RemoveIngester(id)
return ringDesc, true, nil
}
return h.r.casRing(ctx, unregister)
}

// WriteJSONResponse writes some JSON as a HTTP response.
func writeJSONResponse(w http.ResponseWriter, v httpResponse) {
func writeJSONResponse(w http.ResponseWriter, v StatusPageData) {
w.Header().Set("Content-Type", "application/json")

if err := json.NewEncoder(w).Encode(v); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

//go:embed status.gohtml
var defaultPageContent string
var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
"mod": func(i, j int) bool { return i%j == 0 },
"humanFloat": func(f float64) string {
return fmt.Sprintf("%.2g", f)
},
"timeOrEmptyString": func(t time.Time) string {
if t.IsZero() {
return ""
}
return t.Format(time.RFC3339Nano)
},
"durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Millisecond).String() },
}).Parse(defaultPageContent))
16 changes: 10 additions & 6 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,11 +851,7 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
time.Sleep(i.cfg.FinalSleep)
}

func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return i.KVStore.CAS(ctx, i.RingKey, f)
}

func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) {
func (i *Lifecycler) Describe(ctx context.Context) (*Desc, error) {
obj, err := i.KVStore.Get(ctx, i.RingKey)
if err != nil {
return nil, err
Expand All @@ -864,8 +860,16 @@ func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) {
return GetOrCreateRingDesc(obj), nil
}

func (i *Lifecycler) Forget(ctx context.Context, id string) error {
return forget(ctx, i.KVStore, i.RingKey, id)
}

func (i *Lifecycler) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool {
return instance.IsHealthy(op, i.cfg.HeartbeatPeriod, now)
}

func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req)
NewHTTPStatusHandler(i, defaultPageTemplate).ServeHTTP(w, req)
}

// unregister removes our entry from consul.
Expand Down
2 changes: 1 addition & 1 deletion ring/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestReplicationSet_Do(t *testing.T) {
expectedError: errFailure,
},
{
name: "max errors = 1, should handle context canceled",
name: "max errors = 1, should ServeHTTP context canceled",
instances: []InstanceDesc{{}, {}, {}},
maxErrors: 1,
f: func(c context.Context, id *InstanceDesc) (interface{}, error) {
Expand Down
12 changes: 6 additions & 6 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,11 +815,7 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) {
}
}

func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return r.KVClient.CAS(ctx, r.key, f)
}

func (r *Ring) getRing(ctx context.Context) (*Desc, error) {
func (r *Ring) Describe(ctx context.Context) (*Desc, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

Expand All @@ -828,8 +824,12 @@ func (r *Ring) getRing(ctx context.Context) (*Desc, error) {
return ringDesc, nil
}

func (r *Ring) Forget(ctx context.Context, id string) error {
return forget(ctx, r.KVClient, r.key, id)
}

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req)
NewHTTPStatusHandler(r, defaultPageTemplate).ServeHTTP(w, req)
}

// Operation describes which instances can be included in the replica set, based on their state.
Expand Down
2 changes: 1 addition & 1 deletion ring/status.gohtml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{- /*gotype: github.com/grafana/dskit/ring.httpResponse */ -}}
{{- /*gotype: github.com/grafana/dskit/ring.StatusPageData */ -}}
<!DOCTYPE html>
<html>
<head>
Expand Down
14 changes: 14 additions & 0 deletions ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/log/level"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/kv"
)

// GenerateTokens make numTokens unique random tokens, none of which clash
Expand Down Expand Up @@ -222,3 +223,16 @@ func filterIPs(addrs []net.Addr) string {
}
return ipAddr
}

func forget(ctx context.Context, client kv.Client, ringKey, id string) error {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}

ringDesc := in.(*Desc)
ringDesc.RemoveIngester(id)
return ringDesc, true, nil
}
return client.CAS(ctx, ringKey, unregister)
}