Skip to content

Commit

Permalink
services: fix data integrity errors for Nomad native services
Browse files Browse the repository at this point in the history
This changeset fixes three potential data integrity issues between allocations
and their Nomad native service registrations.

* When a node is marked down because it missed heartbeats, we remove Vault and
  Consul tokens (for the pre-Workload Identity workflows) after we've written
  the node update to Raft. This is unavoidably non-transactional because the
  Consul and Vault servers aren't in the same Raft cluster as Nomad itself. But
  we've unnecessarily mirrored this same behavior to deregister Nomad
  services. This makes it possible for the leader to successfully write the node
  update to Raft without removing services.

  To address this, move the delete into the same Raft transaction. One minor
  caveat with this approach is the upgrade path: if the leader is upgraded first
  and a node is marked down during this window, older followers will have stale
  information until they are also upgraded. This is unavoidable without
  requiring the leader to unconditionally make an extra Raft write for every
  down node until 2 LTS versions after Nomad 1.8.0. This temporary reduction in
  data integrity for stale reads seems like a reasonable tradeoff.

* When an allocation is marked client-terminal from the client in
  `UpdateAllocsFromClient`, we have an opportunity to ensure data integrity by
  deregistering services for that allocation.

* When an allocation is deleted during eval garbage collection, we have an
  opportunity to ensure data integrity by deregistering services for that
  allocation. This is a cheap no-op if the allocation has been previously marked
  client-terminal.

This changeset does not address client-side retries for the originally reported
issue, which will be done in a separate PR.

Ref: #16616
  • Loading branch information
tgross committed May 15, 2024
1 parent c9fd93c commit f5761e9
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 120 deletions.
3 changes: 3 additions & 0 deletions .changelog/20590.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
services: Fixed bug where Nomad services might not be deregistered when nodes are marked down or allocations are terminal
```
63 changes: 30 additions & 33 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
require.Nil(t, err)
must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval}))

// Insert mock job with rescheduling disabled
job := mock.Job()
Expand All @@ -48,8 +47,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
Attempts: 0,
Interval: 0 * time.Second,
}
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job)
require.Nil(t, err)
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job))

// Insert "dead" alloc
alloc := mock.Alloc()
Expand All @@ -65,54 +63,53 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
alloc2.ClientStatus = structs.AllocClientStatusLost
alloc2.JobID = eval.JobID
alloc2.TaskGroup = job.TaskGroups[0].Name
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2}))

// Insert service for "dead" alloc
service := &structs.ServiceRegistration{
ID: fmt.Sprintf("_nomad-task-%s-group-api-countdash-api-http", alloc.ID),
ServiceName: "countdash-api",
Namespace: eval.Namespace,
NodeID: alloc.NodeID,
Datacenter: "dc1",
JobID: eval.JobID,
AllocID: alloc.ID,
Address: "192.168.200.200",
Port: 29001,
}
must.NoError(t, store.UpsertServiceRegistrations(
structs.MsgTypeTestSetup, 1002, []*structs.ServiceRegistration{service}))

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))

// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, core.Process(gc))

// Should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
must.NoError(t, err)
must.Nil(t, out, must.Sprint("expected eval to be GC'd"))

outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
must.NoError(t, err)
must.Nil(t, outA, must.Sprint("expected alloc to be GC'd"))

outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 != nil {
t.Fatalf("bad: %v", outA2)
}
must.NoError(t, err)
must.Nil(t, outA2, must.Sprint("expected alloc to be GC'd"))

services, err := store.GetServiceRegistrationsByNodeID(nil, alloc.NodeID)
must.NoError(t, err)
must.Len(t, 0, services)
}

// Tests GC behavior on allocations being rescheduled
Expand Down
25 changes: 0 additions & 25 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,31 +692,6 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
_ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true)
}

// Identify the service registrations current placed on the downed
// node.
serviceRegistrations, err := n.srv.State().GetServiceRegistrationsByNodeID(ws, args.NodeID)
if err != nil {
n.logger.Error("looking up service registrations for node failed",
"node_id", args.NodeID, "error", err)
return err
}

// If the node has service registrations assigned to it, delete these
// via Raft.
if l := len(serviceRegistrations); l > 0 {
n.logger.Debug("deleting service registrations on node due to down state",
"num_service_registrations", l, "node_id", args.NodeID)

deleteRegReq := structs.ServiceRegistrationDeleteByNodeIDRequest{NodeID: args.NodeID}

_, index, err = n.srv.raftApply(structs.ServiceRegistrationDeleteByNodeIDRequestType, &deleteRegReq)
if err != nil {
n.logger.Error("failed to delete service registrations for node",
"node_id", args.NodeID, "error", err)
return err
}
}

default:
ttl, err := n.srv.resetHeartbeatTimer(args.NodeID)
if err != nil {
Expand Down
Loading

0 comments on commit f5761e9

Please sign in to comment.