Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: memberlist delete operation #644

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 14 additions & 23 deletions kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,6 @@ func (m *KV) Delete(key string) error {
if err != nil {
return err
}

if newver > 0 {
m.notifyWatchers(key)
m.broadcastNewValue(key, change, newver, c, false, deleted, updated)
Expand Down Expand Up @@ -1171,14 +1170,13 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key)
return
}
data, err := codec.Encode(change)

data, err := handlePossibleNilEncode(codec, change)
if err != nil {
level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err)
m.numberOfBroadcastMessagesDropped.Inc()
return
}

kvPair := KeyValuePair{Key: key, Value: data, Codec: codec.CodecID(), Deleted: deleted, UpdateTimeMillis: updateTimeMillis(updateTime)}
pairData, err := kvPair.Marshal()
if err != nil {
Expand All @@ -1187,7 +1185,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
return
}

mergedChanges := handlePossibleNilMergeContent(change)
mergedChanges := change.MergeContent()
m.addSentMessage(Message{
Time: time.Now(),
Size: len(pairData),
Expand Down Expand Up @@ -1285,7 +1283,6 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
case update := <-workerCh:
// we have a value update! Let's merge it with our current version for given key
mod, version, deleted, updated, err := m.mergeBytesValueForKey(key, update.value, update.codec, update.deleted, update.updateTime)

changes := []string(nil)
if mod != nil {
changes = mod.MergeContent()
Expand Down Expand Up @@ -1522,6 +1519,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
// This is safe because the entire function runs under the store lock; we do not return
// the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
curr := m.store[key]

// if current entry is nil but the incoming for that key is deleted then we return no change, as we do not want to revive the entry.
if curr.value == nil && deleted {
return nil, 0, false, time.Time{}, err
}

// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.Version != casVersion {
return nil, 0, false, time.Time{}, errVersionMismatch
Expand All @@ -1530,8 +1533,6 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
if err != nil {
return nil, 0, false, time.Time{}, err
}

newVersion = curr.Version + 1
newUpdated = curr.UpdateTime
newDeleted = curr.Deleted

Expand Down Expand Up @@ -1566,6 +1567,12 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, incomingValue
}
}

if change == nil && curr.Deleted != newDeleted {
// return result as change if the only thing that changes is the Delete state of the entry.
change = result
}

newVersion = curr.Version + 1
m.store[key] = ValueDesc{
value: result,
Version: newVersion,
Expand Down Expand Up @@ -1695,19 +1702,3 @@ func updateTimeMillis(ts time.Time) int64 {
}
return ts.UnixMilli()
}

func handlePossibleNilEncode(codec codec.Codec, change Mergeable) ([]byte, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided that the handle* funcions are not needed since we handle the change=nil at the mergeValueForKey level. That was the main reason these were needed at the first place. I validated that the []byte{}was the one causing the Error failed to decode value: snappy: corrupt input at the first place. Please let me know what you think

if change == nil {
return []byte{}, nil
}

return codec.Encode(change)
}

func handlePossibleNilMergeContent(change Mergeable) []string {
if change == nil {
return []string{}
}

return change.MergeContent()
}
41 changes: 26 additions & 15 deletions kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"testing"
"time"

"github.com/grafana/dskit/test"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -252,6 +254,16 @@ func getLocalhostAddrs() []string {
return []string{localhostIP}
}

func checkMemberlistEntry(t *testing.T, kv *Client, key string, duration time.Duration) {
test.Poll(t, duration, nil, func() interface{} {
val := get(t, kv, key)
if val != nil {
return fmt.Errorf("expected nil, got: %v", val)
}
return nil
})
}

func TestBasicGetAndCas(t *testing.T) {
c := dataCodec{}

Expand Down Expand Up @@ -580,6 +592,8 @@ func TestDelete(t *testing.T) {

c := dataCodec{}

reg := prometheus.NewRegistry()

var cfg KVConfig
flagext.DefaultValues(&cfg)
cfg.TCPTransport = TCPTransportConfig{
Expand All @@ -588,11 +602,11 @@ func TestDelete(t *testing.T) {
}
cfg.GossipNodes = 1
cfg.GossipInterval = 100 * time.Millisecond
cfg.ObsoleteEntriesTimeout = 1 * time.Second
cfg.ObsoleteEntriesTimeout = 500 * time.Millisecond
cfg.ClusterLabelVerificationDisabled = true
cfg.Codecs = []codec.Codec{c}

mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, reg)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv))
defer services.StopAndAwaitTerminated(context.Background(), mkv) //nolint:errcheck

Expand All @@ -614,12 +628,14 @@ func TestDelete(t *testing.T) {
t.Fatalf("Failed to delete key %s: %v", key, err)
}

time.Sleep(2 * time.Second) // wait for obsolete entries to be removed
val = get(t, kv, key)
checkMemberlistEntry(t, kv, key, 2*time.Second)

if val != nil {
t.Errorf("Expected nil, got: %v", val)
}
// Validate that there are no encoding errors during the Delete flow.
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP memberlist_client_messages_to_broadcast_dropped_total Number of broadcast messages intended to be sent but were dropped due to encoding errors or for being too big
# TYPE memberlist_client_messages_to_broadcast_dropped_total counter
memberlist_client_messages_to_broadcast_dropped_total 0
`), "memberlist_client_messages_to_broadcast_dropped_total"))
}

func TestDeleteMultipleClients(t *testing.T) {
Expand Down Expand Up @@ -676,14 +692,9 @@ func TestDeleteMultipleClients(t *testing.T) {
t.Fatalf("Failed to delete key %s: %v", key, err)
}

time.Sleep(5 * deleteTime) // wait for obsolete entries to be removed

val, err = kv1.Get(context.Background(), key)
require.NoError(t, err)
require.Nil(t, val)
val, err = kv2.Get(context.Background(), key)
require.NoError(t, err)
require.Nil(t, val)
// wait for the obselete entries to be removed.
checkMemberlistEntry(t, kv1, key, 10*deleteTime)
checkMemberlistEntry(t, kv2, key, 10*deleteTime)
}

func TestMultipleClients(t *testing.T) {
Expand Down