Skip to content

Commit

Permalink
server: v2store deprecation: Use publishV3 instead of publish V2.
Browse files Browse the repository at this point in the history
Additionally had to fix: etcdctl backup command such that it
scrubs the 'edit membership' commands from the WAL log
(as it used to, but it ignored v3: update_members_attr message).
  • Loading branch information
ptabor committed Mar 27, 2021
1 parent 8ee1dd9 commit c6e88d1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 94 deletions.
6 changes: 6 additions & 0 deletions etcdctl/ctlv2/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
continue
}

if raftReq.ClusterMemberAttrSet != nil {
log.Println("ignoring cluster_member_attr_set")
remove()
continue
}

if v3 || raftReq.Header == nil {
continue
}
Expand Down
67 changes: 1 addition & 66 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (s *EtcdServer) adjustTicks() {
func (s *EtcdServer) Start() {
s.start()
s.GoAttach(func() { s.adjustTicks() })
s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
s.GoAttach(s.purgeFile)
s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
s.GoAttach(s.monitorVersions)
Expand Down Expand Up @@ -1855,7 +1855,6 @@ func (s *EtcdServer) sync(timeout time.Duration) {
// with the static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
// TODO: replace publish() in 3.6
func (s *EtcdServer) publishV3(timeout time.Duration) {
req := &membershippb.ClusterMemberAttrSetRequest{
Member_ID: uint64(s.id),
Expand Down Expand Up @@ -1906,70 +1905,6 @@ func (s *EtcdServer) publishV3(timeout time.Duration) {
}
}

// publish registers server information into the cluster. The information
// is the JSON representation of this server's member struct, updated with the
// static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
//
// Use v2 store to encode member attributes, and apply through Raft
// but does not go through v2 API endpoint, which means even with v2
// client handler disabled (e.g. --enable-v2=false), cluster can still
// process publish requests through rafthttp
// TODO: Deprecate v2 store in 3.6
func (s *EtcdServer) publish(timeout time.Duration) {
lg := s.Logger()
b, err := json.Marshal(s.attributes)
if err != nil {
lg.Panic("failed to marshal JSON", zap.Error(err))
return
}
req := pb.Request{
Method: "PUT",
Path: membership.MemberAttributesStorePath(s.id),
Val: string(b),
}

for {
ctx, cancel := context.WithTimeout(s.ctx, timeout)
_, err := s.Do(ctx, req)
cancel()
switch err {
case nil:
close(s.readych)
lg.Info(
"published local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("request-path", req.Path),
zap.String("cluster-id", s.cluster.ID().String()),
zap.Duration("publish-timeout", timeout),
)
return

case ErrStopped:
lg.Warn(
"stopped publish because server is stopped",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
return

default:
lg.Warn(
"failed to publish local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("request-path", req.Path),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
}
}
}

func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
atomic.AddInt64(&s.inflightSnapshots, 1)

Expand Down
61 changes: 33 additions & 28 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/idutil"
"go.etcd.io/etcd/pkg/v3/pbutil"
Expand All @@ -36,6 +38,7 @@ import (
"go.etcd.io/etcd/pkg/v3/wait"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
Expand All @@ -49,6 +52,7 @@ import (
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)

// TestDoLocalAction tests requests which do not need to go through raft to be applied,
Expand Down Expand Up @@ -1424,26 +1428,29 @@ func TestPublish(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
ch <- &applyResult{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.TODO())
lg := zaptest.NewLogger(t)
be, _ := backend.NewDefaultTmpBackend()
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},

ctx: ctx,
cancel: cancel,
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
}
srv.publish(time.Hour)
srv.publishV3(time.Hour)

action := n.Action()
if len(action) != 1 {
Expand All @@ -1453,24 +1460,12 @@ func TestPublish(t *testing.T) {
t.Fatalf("action = %s, want Propose", action[0].Name)
}
data := action[0].Params[0].([]byte)
var r pb.Request
var r pb.InternalRaftRequest
if err := r.Unmarshal(data); err != nil {
t.Fatalf("unmarshal request error: %v", err)
}
if r.Method != "PUT" {
t.Errorf("method = %s, want PUT", r.Method)
}
wm := membership.Member{ID: 1, Attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}}
if wpath := membership.MemberAttributesStorePath(wm.ID); r.Path != wpath {
t.Errorf("path = %s, want %s", r.Path, wpath)
}
var gattr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &gattr); err != nil {
t.Fatalf("unmarshal val error: %v", err)
}
if !reflect.DeepEqual(gattr, wm.Attributes) {
t.Errorf("member = %v, want %v", gattr, wm.Attributes)
}
assert.Equal(t, &membershippb.ClusterMemberAttrSetRequest{Member_ID: 0x1, MemberAttributes: &membershippb.Attributes{
Name: "node1", ClientUrls: []string{"http://a", "http://b"}}}, r.ClusterMemberAttrSet)
}

// TestPublishStopped tests that publish will be stopped if server is stopped.
Expand Down Expand Up @@ -1498,25 +1493,35 @@ func TestPublishStopped(t *testing.T) {
cancel: cancel,
}
close(srv.stopping)
srv.publish(time.Hour)
srv.publishV3(time.Hour)
}

// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
n := newNodeRecorderStream()

lg := zaptest.NewLogger(t)
be, _ := backend.NewDefaultTmpBackend()
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
Cfg: config.ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
lg: lg,
readych: make(chan struct{}),
Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000},
id: 1,
r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}),
w: mockwait.NewNop(),
stopping: make(chan struct{}),
attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
cluster: &membership.RaftCluster{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
authStore: auth.NewAuthStore(lg, be, nil, nil, 0),
be: be,
ctx: ctx,
cancel: cancel,
}

// expect multiple proposals from retrying
ch := make(chan struct{})
go func() {
Expand All @@ -1535,7 +1540,7 @@ func TestPublishRetry(t *testing.T) {
}
}
}()
srv.publish(10 * time.Nanosecond)
srv.publishV3(10 * time.Nanosecond)
ch <- struct{}{}
<-ch
}
Expand Down
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.5.1
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
go.etcd.io/bbolt v1.3.5
Expand Down

0 comments on commit c6e88d1

Please sign in to comment.