From f5761e9dec83ba28f1abbd2837afec1ce3d0c1c2 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 14 May 2024 16:52:39 -0400 Subject: [PATCH] services: fix data integrity errors for Nomad native services This changeset fixes three potential data integrity issues between allocations and their Nomad native service registrations. * When a node is marked down because it missed heartbeats, we remove Vault and Consul tokens (for the pre-Workload Identity workflows) after we've written the node update to Raft. This is unavoidably non-transactional because the Consul and Vault servers aren't in the same Raft cluster as Nomad itself. But we've unnecessarily mirrored this same behavior to deregister Nomad services. This makes it possible for the leader to successfully write the node update to Raft without removing services. To address this, move the delete into the same Raft transaction. One minor caveat with this approach is the upgrade path: if the leader is upgraded first and a node is marked down during this window, older followers will have stale information until they are also upgraded. This is unavoidable without requiring the leader to unconditionally make an extra Raft write for every down node until 2 LTS versions after Nomad 1.8.0. This temporary reduction in data integrity for stale reads seems like a reasonable tradeoff. * When an allocation is marked client-terminal from the client in `UpdateAllocsFromClient`, we have an opportunity to ensure data integrity by deregistering services for that allocation. * When an allocation is deleted during eval garbage collection, we have an opportunity to ensure data integrity by deregistering services for that allocation. This is a cheap no-op if the allocation has been previously marked client-terminal. This changeset does not address client-side retries for the originally reported issue, which will be done in a separate PR. Ref: https://github.com/hashicorp/nomad/issues/16616 --- .changelog/20590.txt | 3 + nomad/core_sched_test.go | 63 +++--- nomad/node_endpoint.go | 25 --- nomad/node_endpoint_test.go | 204 ++++++++++++------ nomad/state/state_store.go | 17 ++ .../state/state_store_service_registration.go | 41 +++- 6 files changed, 233 insertions(+), 120 deletions(-) create mode 100644 .changelog/20590.txt diff --git a/.changelog/20590.txt b/.changelog/20590.txt new file mode 100644 index 000000000000..4406f131d281 --- /dev/null +++ b/.changelog/20590.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed bug where Nomad services might not be deregistered when nodes are marked down or allocations are terminal +``` diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index bac54499720e..e23d410ed3c9 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -38,8 +38,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { eval := mock.Eval() eval.Status = structs.EvalStatusFailed store.UpsertJobSummary(999, mock.JobSummary(eval.JobID)) - err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval}) - require.Nil(t, err) + must.NoError(t, store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})) // Insert mock job with rescheduling disabled job := mock.Job() @@ -48,8 +47,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) { Attempts: 0, Interval: 0 * time.Second, } - err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job) - require.Nil(t, err) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 1001, nil, job)) // Insert "dead" alloc alloc := mock.Alloc() @@ -65,10 +63,22 @@ func TestCoreScheduler_EvalGC(t *testing.T) { alloc2.ClientStatus = structs.AllocClientStatusLost alloc2.JobID = eval.JobID alloc2.TaskGroup = job.TaskGroups[0].Name - err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2}) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2})) + + // Insert service for "dead" alloc + service := &structs.ServiceRegistration{ + ID: fmt.Sprintf("_nomad-task-%s-group-api-countdash-api-http", alloc.ID), + ServiceName: "countdash-api", + Namespace: eval.Namespace, + NodeID: alloc.NodeID, + Datacenter: "dc1", + JobID: eval.JobID, + AllocID: alloc.ID, + Address: "192.168.200.200", + Port: 29001, + } + must.NoError(t, store.UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 1002, []*structs.ServiceRegistration{service})) // Update the time tables to make this work tt := s1.fsm.TimeTable() @@ -76,43 +86,30 @@ func TestCoreScheduler_EvalGC(t *testing.T) { // Create a core scheduler snap, err := store.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, err) core := NewCoreScheduler(s1, snap) // Attempt the GC gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) - err = core.Process(gc) - if err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, core.Process(gc)) // Should be gone ws := memdb.NewWatchSet() out, err := store.EvalByID(ws, eval.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != nil { - t.Fatalf("bad: %v", out) - } + must.NoError(t, err) + must.Nil(t, out, must.Sprint("expected eval to be GC'd")) outA, err := store.AllocByID(ws, alloc.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA != nil { - t.Fatalf("bad: %v", outA) - } + must.NoError(t, err) + must.Nil(t, outA, must.Sprint("expected alloc to be GC'd")) outA2, err := store.AllocByID(ws, alloc2.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if outA2 != nil { - t.Fatalf("bad: %v", outA2) - } + must.NoError(t, err) + must.Nil(t, outA2, must.Sprint("expected alloc to be GC'd")) + + services, err := store.GetServiceRegistrationsByNodeID(nil, alloc.NodeID) + must.NoError(t, err) + must.Len(t, 0, services) } // Tests GC behavior on allocations being rescheduled diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 3ac97d9a66b9..c2f2914d190e 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -692,31 +692,6 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct _ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true) } - // Identify the service registrations current placed on the downed - // node. - serviceRegistrations, err := n.srv.State().GetServiceRegistrationsByNodeID(ws, args.NodeID) - if err != nil { - n.logger.Error("looking up service registrations for node failed", - "node_id", args.NodeID, "error", err) - return err - } - - // If the node has service registrations assigned to it, delete these - // via Raft. - if l := len(serviceRegistrations); l > 0 { - n.logger.Debug("deleting service registrations on node due to down state", - "num_service_registrations", l, "node_id", args.NodeID) - - deleteRegReq := structs.ServiceRegistrationDeleteByNodeIDRequest{NodeID: args.NodeID} - - _, index, err = n.srv.raftApply(structs.ServiceRegistrationDeleteByNodeIDRequestType, &deleteRegReq) - if err != nil { - n.logger.Error("failed to delete service registrations for node", - "node_id", args.NodeID, "error", err) - return err - } - } - default: ttl, err := n.srv.resetHeartbeatTimer(args.NodeID) if err != nil { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 9c01ec10780c..b8b0af658886 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1487,7 +1487,7 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) { // Create a node and upsert this into state. node := mock.Node() - require.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) + must.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) // Generate service registrations, ensuring the nodeID is set to the // generated node from above. @@ -1498,16 +1498,16 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) { } // Upsert the service registrations into state. - require.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) + must.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) // Check the service registrations are in state as we expect, so we can // have confidence in the rest of the test. ws := memdb.NewWatchSet() nodeRegs, err := testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID) - require.NoError(t, err) - require.Len(t, nodeRegs, 2) - require.Equal(t, nodeRegs[0].NodeID, node.ID) - require.Equal(t, nodeRegs[1].NodeID, node.ID) + must.NoError(t, err) + must.Len(t, 2, nodeRegs) + must.Eq(t, nodeRegs[0].NodeID, node.ID) + must.Eq(t, nodeRegs[1].NodeID, node.ID) // Generate and trigger a node down status update. This mimics what happens // when the node fails its heart-beating. @@ -1520,13 +1520,17 @@ func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) { var reply structs.NodeUpdateResponse nodeEndpoint := NewNodeEndpoint(testServer, nil) - require.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) + must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) // Query our state, to ensure the node service registrations have been // removed. nodeRegs, err = testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID) - require.NoError(t, err) - require.Len(t, nodeRegs, 0) + must.NoError(t, err) + must.Len(t, 0, nodeRegs) + + // Re-send the status update, to ensure we get no error if service + // registrations have already been removed + must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) } // TestClientEndpoint_UpdateDrain asserts the ability to initiate drain @@ -3044,7 +3048,7 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { } } -func TestClientEndpoint_UpdateAlloc(t *testing.T) { +func TestNode_UpdateAlloc(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { @@ -3057,7 +3061,6 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { defer cleanupS1() codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - require := require.New(t) // Create the register request node := mock.Node() @@ -3068,34 +3071,28 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { // Fetch the response var resp structs.GenericResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { - t.Fatalf("err: %v", err) - } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) - state := s1.fsm.State() + store := s1.fsm.State() // Inject mock job job := mock.Job() job.ID = "mytestjob" - err := state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job) - require.Nil(err) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)) // Inject fake allocations alloc := mock.Alloc() alloc.JobID = job.ID alloc.NodeID = node.ID - err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) - require.Nil(err) + must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))) alloc.TaskGroup = job.TaskGroups[0].Name alloc2 := mock.Alloc() alloc2.JobID = job.ID alloc2.NodeID = node.ID - err = state.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID)) - require.Nil(err) + must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID))) alloc2.TaskGroup = job.TaskGroups[0].Name - err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2}) - require.Nil(err) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2})) // Attempt updates of more than one alloc for the same job clientAlloc1 := new(structs.Allocation) @@ -3113,36 +3110,31 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { } var resp2 structs.NodeAllocsResponse start := time.Now() - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2) - require.Nil(err) - require.NotEqual(uint64(0), resp2.Index) - - if diff := time.Since(start); diff < batchUpdateInterval { - t.Fatalf("too fast: %v", diff) - } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2)) + must.Greater(t, 0, resp2.Index) + must.GreaterEq(t, batchUpdateInterval, time.Since(start)) // Lookup the alloc ws := memdb.NewWatchSet() - out, err := state.AllocByID(ws, alloc.ID) - require.Nil(err) - require.Equal(structs.AllocClientStatusFailed, out.ClientStatus) - require.True(out.ModifyTime > 0) + out, err := store.AllocByID(ws, alloc.ID) + must.NoError(t, err) + must.Eq(t, structs.AllocClientStatusFailed, out.ClientStatus) + must.Greater(t, 0, out.ModifyTime) // Assert that exactly one eval with TriggeredBy EvalTriggerRetryFailedAlloc exists - evaluations, err := state.EvalsByJob(ws, job.Namespace, job.ID) - require.Nil(err) - require.True(len(evaluations) != 0) + evaluations, err := store.EvalsByJob(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.Greater(t, 0, len(evaluations)) foundCount := 0 for _, resultEval := range evaluations { if resultEval.TriggeredBy == structs.EvalTriggerRetryFailedAlloc && resultEval.WaitUntil.IsZero() { foundCount++ } } - require.Equal(1, foundCount, "Should create exactly one eval for failed allocs") - + must.Eq(t, 1, foundCount, must.Sprint("Should create exactly one eval for failed allocs")) } -func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { +func TestNode_UpdateAlloc_NodeNotReady(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, nil) @@ -3157,15 +3149,13 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var resp structs.GenericResponse - err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp) - require.NoError(t, err) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) // Inject mock job and allocation. - state := s1.fsm.State() + store := s1.fsm.State() job := mock.Job() - err = state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job) - require.NoError(t, err) + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)) alloc := mock.Alloc() alloc.JobID = job.ID @@ -3173,14 +3163,12 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { alloc.TaskGroup = job.TaskGroups[0].Name alloc.ClientStatus = structs.AllocClientStatusRunning - err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) - require.NoError(t, err) - err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) - require.NoError(t, err) + must.NoError(t, store.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})) // Mark node as down. - err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil) - require.NoError(t, err) + must.NoError(t, store.UpdateNodeStatus( + structs.MsgTypeTestSetup, 101, node.ID, structs.NodeStatusDown, time.Now().UnixNano(), nil)) // Try to update alloc. updatedAlloc := new(structs.Allocation) @@ -3192,31 +3180,127 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var allocUpdateResp structs.NodeAllocsResponse - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "not allowed to update allocs") + err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) + must.ErrorContains(t, err, "not allowed to update allocs") // Send request without an explicit node ID. updatedAlloc.NodeID = "" err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "missing node ID") + must.ErrorContains(t, err, "missing node ID") // Send request with invalid node ID. updatedAlloc.NodeID = "not-valid" err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "node lookup failed") + must.ErrorContains(t, err, "node lookup failed") // Send request with non-existing node ID. updatedAlloc.NodeID = uuid.Generate() err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.ErrorContains(t, err, "not found") + must.ErrorContains(t, err, "not found") // Mark node as ready and try again. - err = state.UpdateNodeStatus(structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil) - require.NoError(t, err) + must.NoError(t, store.UpdateNodeStatus( + structs.MsgTypeTestSetup, 102, node.ID, structs.NodeStatusReady, time.Now().UnixNano(), nil)) updatedAlloc.NodeID = node.ID - err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp) - require.NoError(t, err) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)) +} + +func TestNode_UpdateAllocServiceRegistrations(t *testing.T) { + ci.Parallel(t) + + srv, cleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + + defer cleanup() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + + store := srv.fsm.State() + index := uint64(100) + + // Inject mock node, job, allocations for that job, and service + // registrations for those allocs + node := mock.Node() + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node)) + + job := mock.Job() + job.ID = "mytestjob" + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, job)) + + alloc0 := mock.Alloc() + alloc0.JobID = job.ID + alloc0.NodeID = node.ID + index++ + must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc0.JobID))) + alloc0.TaskGroup = job.TaskGroups[0].Name + + alloc1 := mock.Alloc() + alloc1.JobID = job.ID + alloc1.NodeID = node.ID + index++ + must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID))) + alloc1.TaskGroup = job.TaskGroups[0].Name + + alloc2 := mock.Alloc() // will have no service registration + alloc2.JobID = job.ID + alloc2.NodeID = node.ID + index++ + must.NoError(t, store.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID))) + alloc2.TaskGroup = job.TaskGroups[0].Name + + index++ + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc0, alloc1, alloc2})) + + serviceFor := func(allocID string, port int) *structs.ServiceRegistration { + return &structs.ServiceRegistration{ + ID: fmt.Sprintf("_nomad-task-%s-group-api-countdash-api-http", allocID), + ServiceName: "countdash-api", + Namespace: job.Namespace, + NodeID: node.ID, + Datacenter: node.Datacenter, + JobID: job.ID, + AllocID: allocID, + Tags: []string{"bar"}, + Address: "192.168.200.200", + Port: port, + } + } + + service0 := serviceFor(alloc0.ID, 29001) + service1 := serviceFor(alloc1.ID, 29002) + index++ + must.NoError(t, store.UpsertServiceRegistrations( + structs.MsgTypeTestSetup, index, []*structs.ServiceRegistration{service0, service1})) + + // no-op + update := &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc0, alloc1, alloc2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.NodeAllocsResponse + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp)) + + services, err := store.GetServiceRegistrationsByNodeID(nil, node.ID) + must.NoError(t, err) + must.Len(t, 2, services, must.Sprint("no-op update should not have deleted services")) + + // fail one allocation + alloc0 = alloc0.Copy() + alloc0.ClientStatus = structs.AllocClientStatusFailed + update = &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc0, alloc1, alloc2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp)) + + services, err = store.GetServiceRegistrationsByNodeID(nil, node.ID) + must.NoError(t, err) + must.Eq(t, []*structs.ServiceRegistration{service1}, services, + must.Sprint("failing an allocation should result in its service being deleted")) } func TestClientEndpoint_BatchUpdate(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 277f393ae047..a77ab60c2fa0 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1131,6 +1131,12 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update if err := txn.Insert("index", &IndexEntry{"nodes", txn.Index}); err != nil { return fmt.Errorf("index update failed: %v", err) } + + // Deregister any services on the node in the same transaction + if copyNode.Status == structs.NodeStatusDown { + s.deleteServiceRegistrationByNodeIDTxn(txn, txn.Index, copyNode.ID) + } + return nil } @@ -3629,6 +3635,10 @@ func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitia // Mark that we have made a successful modification to the allocs // table. allocsTableUpdated = true + + if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, alloc); err != nil { + return fmt.Errorf("service registration delete for alloc failed: %v", err) + } } // Update the indexes @@ -3975,6 +3985,13 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc * if err := s.setJobStatuses(index, txn, jobs, false); err != nil { return fmt.Errorf("setting job status failed: %v", err) } + + if copyAlloc.ClientTerminalStatus() { + if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, copyAlloc.ID); err != nil { + return err + } + } + return nil } diff --git a/nomad/state/state_store_service_registration.go b/nomad/state/state_store_service_registration.go index f4f7beae6516..72390e420387 100644 --- a/nomad/state/state_store_service_registration.go +++ b/nomad/state/state_store_service_registration.go @@ -129,11 +129,25 @@ func (s *StateStore) DeleteServiceRegistrationByNodeID( txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() - num, err := txn.DeleteAll(TableServiceRegistrations, indexNodeID, nodeID) + err := s.deleteServiceRegistrationByNodeIDTxn(txn, index, nodeID) if err != nil { return fmt.Errorf("deleting service registrations failed: %v", err) } + return txn.Commit() +} + +// deleteServiceRegistrationByNodeIDTxn deletes all service registrations that +// belong on a single node, in an existing transaction. If there are no +// registrations tied to the nodeID, the call will noop without an error. +func (s *StateStore) deleteServiceRegistrationByNodeIDTxn( + txn *txn, index uint64, nodeID string) error { + + num, err := txn.DeleteAll(TableServiceRegistrations, indexNodeID, nodeID) + if err != nil { + return err + } + // If we did not delete any entries, do not update the index table. // Otherwise, update the table with the latest index. switch num { @@ -144,8 +158,31 @@ func (s *StateStore) DeleteServiceRegistrationByNodeID( return fmt.Errorf("index update failed: %v", err) } } + return nil +} - return txn.Commit() +// deleteServiceRegistrationByAllocIDTxn deletes all service registrations that +// belong to an allocation, in an existing transaction. If there are no +// registrations tied to the alloc ID, the call will noop without an error. +func (s *StateStore) deleteServiceRegistrationByAllocIDTxn( + txn *txn, index uint64, allocID string) error { + + num, err := txn.DeleteAll(TableServiceRegistrations, indexAllocID, allocID) + if err != nil { + return err + } + + // If we did not delete any entries, do not update the index table. + // Otherwise, update the table with the latest index. + switch num { + case 0: + return nil + default: + if err := txn.Insert(tableIndex, &IndexEntry{TableServiceRegistrations, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + return nil } // GetServiceRegistrations returns an iterator that contains all service