diff --git a/CHANGELOG.md b/CHANGELOG.md
index fb3c7d5499d..d2a08cfbbd9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,7 +12,7 @@
* `cortex_bucket_index_last_successful_update_timestamp_seconds`: Timestamp of the last successful update of a tenant's bucket index.
* [ENHANCEMENT] Ruler: Add `cortex_prometheus_last_evaluation_samples` to expose the number of samples generated by a rule group per tenant. #3582
* [ENHANCEMENT] Memberlist: add status page (/memberlist) with available details about memberlist-based KV store and memberlist cluster. It's also possible to view KV values in Go struct or JSON format, or download for inspection. #3575
-* [ENHANCEMENT] Memberlist: client can now keep a size-bounded buffer with sent and received messages and display them in the admin UI (/memberlist) for troubleshooting. #3581
+* [ENHANCEMENT] Memberlist: client can now keep a size-bounded buffer with sent and received messages and display them in the admin UI (/memberlist) for troubleshooting. #3581 #3602
* [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598
* [BUGFIX] Query-Frontend: `cortex_query_seconds_total` now return seconds not nanoseconds. #3589
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603
diff --git a/pkg/ring/kv/memberlist/kv_init_service.go b/pkg/ring/kv/memberlist/kv_init_service.go
index 1400f233223..af79bb30c93 100644
--- a/pkg/ring/kv/memberlist/kv_init_service.go
+++ b/pkg/ring/kv/memberlist/kv_init_service.go
@@ -8,6 +8,7 @@ import (
"net/http"
"sort"
"strconv"
+ "strings"
"sync"
"time"
@@ -93,9 +94,10 @@ func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
const (
- downloadKeyParam = "downloadKey"
- viewKeyParam = "viewKey"
- viewMsgParam = "viewMsg"
+ downloadKeyParam = "downloadKey"
+ viewKeyParam = "viewKey"
+ viewMsgParam = "viewMsg"
+ deleteMessagesParam = "deleteMessages"
)
if err := req.ParseForm(); err == nil {
@@ -128,6 +130,15 @@ func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) {
http.Error(w, "message not found", http.StatusNotFound)
return
}
+
+ if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" {
+ kv.deleteSentReceivedMessages()
+
+ // Redirect back.
+ w.Header().Set("Location", "?"+deleteMessagesParam+"=false")
+ w.WriteHeader(http.StatusFound)
+ return
+ }
}
members := kv.memberlist.Members()
@@ -236,7 +247,9 @@ type pageData struct {
ReceivedMessages []message
}
-var pageTemplate = template.Must(template.New("webpage").Parse(pageContent))
+var pageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
+ "StringsJoin": strings.Join,
+}).Parse(pageContent))
const pageContent = `
@@ -309,6 +322,8 @@ const pageContent = `
@@ -317,6 +332,7 @@ const pageContent = `
Key |
Value in the Message |
Version After Update (0 = no change) |
+ Changes |
Actions |
@@ -329,6 +345,7 @@ const pageContent = `
{{ .Pair.Key }} |
size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }} |
{{ .Version }} |
+ {{ StringsJoin .Changes ", " }} |
json
| json-pretty
@@ -341,6 +358,8 @@ const pageContent = `
Sent Messages
+ Delete All Messages (received and sent)
+
@@ -349,6 +368,7 @@ const pageContent = `
Key |
Value |
Version |
+ Changes |
Actions |
@@ -361,6 +381,7 @@ const pageContent = `
{{ .Pair.Key }} |
size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }} |
{{ .Version }} |
+ {{ StringsJoin .Changes ", " }} |
json
| json-pretty
diff --git a/pkg/ring/kv/memberlist/kv_init_service_test.go b/pkg/ring/kv/memberlist/kv_init_service_test.go
index 9a2f678affd..629570c6006 100644
--- a/pkg/ring/kv/memberlist/kv_init_service_test.go
+++ b/pkg/ring/kv/memberlist/kv_init_service_test.go
@@ -30,9 +30,10 @@ func TestPage(t *testing.T) {
Pair: KeyValuePair{
Key: "hello",
Value: []byte("world"),
- Codec: "coded",
+ Codec: "codec",
},
Version: 20,
+ Changes: []string{"A", "B", "C"},
}},
SentMessages: []message{{
@@ -42,9 +43,10 @@ func TestPage(t *testing.T) {
Pair: KeyValuePair{
Key: "hello",
Value: []byte("world"),
- Codec: "coded",
+ Codec: "codec",
},
Version: 20,
+ Changes: []string{"A", "B", "C"},
}},
}))
}
diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go
index 48dc84a5124..26495ff243b 100644
--- a/pkg/ring/kv/memberlist/memberlist_client.go
+++ b/pkg/ring/kv/memberlist/memberlist_client.go
@@ -268,13 +268,17 @@ type KV struct {
maxCasRetries int
}
+// Message describes incoming or outgoing message, and local state after applying incoming message, or state when sending message.
// Fields are exported for templating to work.
type message struct {
- ID int // Unique local ID of the message.
- Time time.Time // Time when message was sent or received.
- Size int // Message size
- Pair KeyValuePair
- Version uint // For sent message, which version the message reflects. For received message, version after applying the message.
+ ID int // Unique local ID of the message.
+ Time time.Time // Time when message was sent or received.
+ Size int // Message size
+ Pair KeyValuePair
+
+ // Following values are computed on the receiving node, based on local state.
+ Version uint // For sent message, which version the message reflects. For received message, version after applying the message.
+ Changes []string // List of changes in this message (as computed by *this* node).
}
type valueDesc struct {
@@ -904,6 +908,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
Size: len(pairData),
Pair: kvPair,
Version: version,
+ Changes: change.MergeContent(),
})
m.queueBroadcast(key, change.MergeContent(), version, pairData)
@@ -948,11 +953,17 @@ func (m *KV) NotifyMsg(msg []byte) {
// we have a ring update! Let's merge it with our version of the ring for given key
mod, version, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec)
+ changes := []string(nil)
+ if mod != nil {
+ changes = mod.MergeContent()
+ }
+
m.addReceivedMessage(message{
Time: time.Now(),
Size: len(msg),
Pair: kvPair,
Version: version,
+ Changes: changes,
})
if err != nil {
@@ -965,6 +976,7 @@ func (m *KV) NotifyMsg(msg []byte) {
Size: len(msg),
Pair: kvPair,
Version: version,
+ Changes: changes,
})
// Forward this message
@@ -1112,11 +1124,17 @@ func (m *KV) MergeRemoteState(data []byte, join bool) {
// we have both key and value, try to merge it with our state
change, newver, err := m.mergeBytesValueForKey(kvPair.Key, kvPair.Value, codec)
+ changes := []string(nil)
+ if change != nil {
+ changes = change.MergeContent()
+ }
+
m.addReceivedMessage(message{
Time: received,
Size: int(kvPairLength),
Pair: kvPair, // Makes a copy of kvPair.
Version: newver,
+ Changes: changes,
})
if err != nil {
@@ -1264,6 +1282,16 @@ func (m *KV) getSentAndReceivedMessages() (sent, received []message) {
return append([]message(nil), m.sentMessages...), append([]message(nil), m.receivedMessages...)
}
+func (m *KV) deleteSentReceivedMessages() {
+ m.messagesMu.Lock()
+ defer m.messagesMu.Unlock()
+
+ m.sentMessages = nil
+ m.sentMessagesSize = 0
+ m.receivedMessages = nil
+ m.receivedMessagesSize = 0
+}
+
func addMessageToBuffer(msgs []message, size int, limit int, msg message) ([]message, int) {
msgs = append(msgs, msg)
size += msg.Size
| |