Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 16 additions & 27 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ type Result struct {
Trace *traceutil.Trace
}

type applyFunc func(r *pb.InternalRaftRequest) *Result
type applyFunc func(*pb.InternalRaftRequest, membership.ShouldApplyV3) *Result

// applierV3 is the interface for processing V3 raft messages
type applierV3 interface {
// Apply executes the generic portion of application logic for the current applier, but
// delegates the actual execution to the applyFunc method.
Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result

Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
Expand Down Expand Up @@ -103,6 +103,9 @@ type applierV3 interface {
RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3)
DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3)
}

type ApplierOptions struct {
Expand Down Expand Up @@ -135,8 +138,8 @@ func newApplierV3Backend(opts ApplierOptions) applierV3 {
}
}

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result {
return applyFunc(r)
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
return applyFunc(r, shouldApplyV3)
}

func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
Expand Down Expand Up @@ -384,39 +387,25 @@ func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleList
return resp, err
}

type ApplierMembership struct {
lg *zap.Logger
cluster *membership.RaftCluster
snapshotServer SnapshotServer
}

func NewApplierMembership(lg *zap.Logger, cluster *membership.RaftCluster, snapshotServer SnapshotServer) *ApplierMembership {
return &ApplierMembership{
lg: lg,
cluster: cluster,
snapshotServer: snapshotServer,
}
}

func (a *ApplierMembership) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
prevVersion := a.cluster.Version()
func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
prevVersion := a.options.Cluster.Version()
newVersion := semver.Must(semver.NewVersion(r.Ver))
a.cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3)
a.options.Cluster.SetVersion(newVersion, api.UpdateCapability, shouldApplyV3)
// Force snapshot after cluster version downgrade.
if prevVersion != nil && newVersion.LessThan(*prevVersion) {
lg := a.lg
lg := a.options.Logger
if lg != nil {
lg.Info("Cluster version downgrade detected, forcing snapshot",
zap.String("prev-cluster-version", prevVersion.String()),
zap.String("new-cluster-version", newVersion.String()),
)
}
a.snapshotServer.ForceSnapshot()
a.options.SnapshotServer.ForceSnapshot()
}
}

func (a *ApplierMembership) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
a.cluster.UpdateAttributes(
func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
a.options.Cluster.UpdateAttributes(
types.ID(r.Member_ID),
membership.Attributes{
Name: r.MemberAttributes.Name,
Expand All @@ -426,12 +415,12 @@ func (a *ApplierMembership) ClusterMemberAttrSet(r *membershippb.ClusterMemberAt
)
}

func (a *ApplierMembership) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
d := version.DowngradeInfo{Enabled: false}
if r.Enabled {
d = version.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
}
a.cluster.SetDowngradeInfo(&d, shouldApplyV3)
a.options.Cluster.SetDowngradeInfo(&d, shouldApplyV3)
}

type quotaApplierV3 struct {
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/apply/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/lease"
)
Expand All @@ -40,7 +41,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a
return &authApplierV3{applierV3: base, as: as, lessor: lessor}
}

func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result {
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
aa.mu.Lock()
defer aa.mu.Unlock()
if r.Header != nil {
Expand All @@ -56,7 +57,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *
return &Result{Err: err}
}
}
ret := aa.applierV3.Apply(r, applyFunc)
ret := aa.applierV3.Apply(r, shouldApplyV3, applyFunc)
aa.authInfo.Username = ""
aa.authInfo.Revision = 0
return ret
Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/apply/apply_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func dummyIndexWaiter(_ uint64) <-chan struct{} {
return ch
}

func dummyApplyFunc(_ *pb.InternalRaftRequest) *Result {
func dummyApplyFunc(_ *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
return &Result{}
}

Expand Down Expand Up @@ -217,7 +217,7 @@ func TestAuthApplierV3_Apply(t *testing.T) {
mustCreateRolesAndEnableAuth(t, authApplier)
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
result := authApplier.Apply(tc.request, dummyApplyFunc)
result := authApplier.Apply(tc.request, membership.ApplyBoth, dummyApplyFunc)
require.Equalf(t, result, tc.expectResult, "Apply: got %v, expect: %v", result, tc.expectResult)
})
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestAuthApplierV3_AdminPermission(t *testing.T) {
if tc.adminPermissionNeeded {
tc.request.Header = &pb.RequestHeader{Username: userReadOnly}
}
result := authApplier.Apply(tc.request, dummyApplyFunc)
result := authApplier.Apply(tc.request, membership.ApplyBoth, dummyApplyFunc)
require.Equalf(t, errors.Is(result.Err, auth.ErrPermissionDenied), tc.adminPermissionNeeded, "Admin permission needed")
})
}
Expand Down
34 changes: 30 additions & 4 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

type UberApplier interface {
Apply(r *pb.InternalRaftRequest) *Result
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result
}

type uberApplier struct {
Expand Down Expand Up @@ -78,18 +79,18 @@ func (a *uberApplier) restoreAlarms() {
}
}

func (a *uberApplier) Apply(r *pb.InternalRaftRequest) *Result {
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
// We first execute chain of Apply() calls down the hierarchy:
// (i.e. CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend),
// then dispatch() unpacks the request to a specific method (like Put),
// that gets executed down the hierarchy again:
// i.e. CorruptApplier.Put(CappedApplier.Put(...(BackendApplier.Put(...)))).
return a.applyV3.Apply(r, a.dispatch)
return a.applyV3.Apply(r, shouldApplyV3, a.dispatch)
}

// dispatch translates the request (r) into appropriate call (like Put) on
// the underlying applyV3 object.
func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
func (a *uberApplier) dispatch(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
op := "unknown"
ar := &Result{}
defer func(start time.Time) {
Expand All @@ -101,6 +102,31 @@ func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
}
}(time.Now())

switch {
case r.ClusterVersionSet != nil:
op = "ClusterVersionSet" // Implemented in 3.5.x
a.applyV3.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return ar
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
a.applyV3.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
return ar
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet" // Implemented in 3.5.x
a.applyV3.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
return ar
case r.DowngradeVersionTest != nil:
op = "DowngradeVersionTest" // Implemented in 3.6 for test only
// do nothing, we are just to ensure etcdserver don't panic in case
// users(test cases) intentionally inject DowngradeVersionTestRequest
// into the WAL files.
return ar
default:
}
if !shouldApplyV3 {
return nil
}

switch {
case r.Range != nil:
op = "Range"
Expand Down
16 changes: 8 additions & 8 deletions server/etcdserver/apply/uber_applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ func TestUberApplier_Alarm_Corrupt(t *testing.T) {
MemberID: memberID,
Alarm: pb.AlarmType_CORRUPT,
},
})
}, membership.ApplyBoth)
require.NotNil(t, result)
require.NoError(t, result.Err)

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
result = ua.Apply(tc.request)
result = ua.Apply(tc.request, membership.ApplyBoth)
require.NotNil(t, result)
require.Equalf(t, tc.expectError, result.Err, "Apply: got %v, expect: %v", result.Err, tc.expectError)
})
Expand Down Expand Up @@ -232,13 +232,13 @@ func TestUberApplier_Alarm_Quota(t *testing.T) {
MemberID: memberID,
Alarm: pb.AlarmType_NOSPACE,
},
})
}, membership.ApplyBoth)
require.NotNil(t, result)
require.NoError(t, result.Err)

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
result = ua.Apply(tc.request)
result = ua.Apply(tc.request, membership.ApplyBoth)
require.NotNil(t, result)
require.Equalf(t, tc.expectError, result.Err, "Apply: got %v, expect: %v", result.Err, tc.expectError)
})
Expand All @@ -255,11 +255,11 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
MemberID: memberID,
Alarm: pb.AlarmType_NOSPACE,
},
})
}, membership.ApplyBoth)
require.NotNil(t, result)
require.NoError(t, result.Err)

result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}})
result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}}, membership.ApplyBoth)
require.NotNil(t, result)
require.Equalf(t, errors.ErrNoSpace, result.Err, "Apply: got %v, expect: %v", result.Err, errors.ErrNoSpace)

Expand All @@ -270,11 +270,11 @@ func TestUberApplier_Alarm_Deactivate(t *testing.T) {
MemberID: memberID,
Alarm: pb.AlarmType_NOSPACE,
},
})
}, membership.ApplyBoth)
require.NotNil(t, result)
require.NoError(t, result.Err)

result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}})
result = ua.Apply(&pb.InternalRaftRequest{Put: &pb.PutRequest{Key: []byte(key)}}, membership.ApplyBoth)
require.NotNil(t, result)
assert.NoError(t, result.Err)
}
39 changes: 1 addition & 38 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/apply"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/etcdserver/txn"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/features"
"go.etcd.io/etcd/server/v3/lease"
Expand Down Expand Up @@ -1985,7 +1984,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
ar = s.applyInternalRaftRequest(&raftReq, shouldApplyV3)
ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
}

// do not re-toApply applied entries.
Expand Down Expand Up @@ -2021,42 +2020,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
})
}

func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply.Result {
if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil && r.DowngradeVersionTest == nil {
if !shouldApplyV3 {
return nil
}
return s.uberApply.Apply(r)
}
membershipApplier := apply.NewApplierMembership(s.lg, s.cluster, s)
op := "unknown"
defer func(start time.Time) {
txn.ApplySecObserve("v3", op, true, time.Since(start))
txn.WarnOfExpensiveRequest(s.lg, s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, nil, nil)
}(time.Now())
switch {
case r.ClusterVersionSet != nil:
op = "ClusterVersionSet" // Implemented in 3.5.x
membershipApplier.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
return &apply.Result{}
case r.ClusterMemberAttrSet != nil:
op = "ClusterMemberAttrSet" // Implemented in 3.5.x
membershipApplier.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
case r.DowngradeInfoSet != nil:
op = "DowngradeInfoSet" // Implemented in 3.5.x
membershipApplier.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
case r.DowngradeVersionTest != nil:
op = "DowngradeVersionTest" // Implemented in 3.6 for test only
// do nothing, we are just to ensure etcdserver don't panic in case
// users(test cases) intentionally inject DowngradeVersionTestRequest
// into the WAL files.
default:
s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
return nil
}
return &apply.Result{}
}

func noSideEffect(r *pb.InternalRaftRequest) bool {
return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestApplyRepeat(t *testing.T) {

type uberApplierMock struct{}

func (uberApplierMock) Apply(r *pb.InternalRaftRequest) *apply2.Result {
func (uberApplierMock) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply2.Result {
return &apply2.Result{}
}

Expand Down