Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2444,6 +2444,19 @@ func TestDecommissionEnqueueReplicas(t *testing.T) {

// Ensure that the scratch range's replica was proactively enqueued.
require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID)

// Check that the node was marked as decommissioning in each of the nodes'
// decommissioningNodeMap. This needs to be wrapped in a SucceedsSoon to
// deal with gossip propagation delays.
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
srv := tc.Server(i)
if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists {
return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID())
}
}
return nil
})
}

decommissionAndCheck(2 /* decommissioningSrvIdx */)
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback(
// Nothing more to do.
return
}
t.nodes[decommissioningNodeID] = struct{}{}

logLimiter := log.Every(5 * time.Second) // avoid log spam
if err := stores.VisitStores(func(store *kvserver.Store) error {
Expand Down Expand Up @@ -216,3 +217,15 @@ func (s *Server) Decommission(
}
return nil
}

// DecommissioningNodeMap returns the set of node IDs that are decommissioning
// from the perspective of the server.
func (s *Server) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
s.decomNodeMap.RLock()
defer s.decomNodeMap.RUnlock()
nodes := make(map[roachpb.NodeID]interface{})
for key, val := range s.decomNodeMap.nodes {
nodes[key] = val
}
return nodes
}
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Server struct {
admin *adminServer
status *statusServer
drain *drainServer
decomNodeMap *decommissioningNodeMap
authentication *authenticationServer
migrationServer *migrationServer
tsDB *ts.DB
Expand Down Expand Up @@ -844,6 +845,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
admin: sAdmin,
status: sStatus,
drain: drain,
decomNodeMap: decomNodeMap,
authentication: sAuth,
tsDB: tsDB,
tsServer: &sTS,
Expand Down
4 changes: 4 additions & 0 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ type TestServerInterface interface {
// Decommission idempotently sets the decommissioning flag for specified nodes.
Decommission(ctx context.Context, targetStatus livenesspb.MembershipStatus, nodeIDs []roachpb.NodeID) error

// DecommissioningNodeMap returns a map of nodeIDs that are known to the
// server to be decommissioning.
DecommissioningNodeMap() map[roachpb.NodeID]interface{}

// SplitRange splits the range containing splitKey.
SplitRange(splitKey roachpb.Key) (left roachpb.RangeDescriptor, right roachpb.RangeDescriptor, err error)

Expand Down