Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/21909.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
state: ensure that identical manual virtual IP updates result in not bumping the modify indexes
```
32 changes: 28 additions & 4 deletions agent/consul/internal_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import (
"fmt"
"net"

hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"golang.org/x/exp/maps"

"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/serf"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/stringslice"
)

const MaximumManualVIPsPerService = 8
Expand Down Expand Up @@ -782,17 +785,38 @@ func (m *Internal) AssignManualServiceVIPs(args *structs.AssignServiceManualVIPs
return fmt.Errorf("cannot associate more than %d manual virtual IPs with the same service", MaximumManualVIPsPerService)
}

vipMap := make(map[string]struct{})
for _, ip := range args.ManualVIPs {
parsedIP := net.ParseIP(ip)
if parsedIP == nil || parsedIP.To4() == nil {
return fmt.Errorf("%q is not a valid IPv4 address", parsedIP.String())
}
vipMap[ip] = struct{}{}
}
// Silently ignore duplicates.
args.ManualVIPs = maps.Keys(vipMap)

psn := structs.PeeredServiceName{
ServiceName: structs.NewServiceName(args.Service, &args.EnterpriseMeta),
}

// Check to see if we can skip the raft apply entirely.
{
existingIPs, err := m.srv.fsm.State().ServiceManualVIPs(psn)
if err != nil {
return fmt.Errorf("error checking for existing manual ips for service: %w", err)
}
if existingIPs != nil && stringslice.EqualMapKeys(existingIPs.ManualIPs, vipMap) {
*reply = structs.AssignServiceManualVIPsResponse{
Found: true,
UnassignedFrom: nil,
}
return nil
}
}

req := state.ServiceVirtualIP{
Service: structs.PeeredServiceName{
ServiceName: structs.NewServiceName(args.Service, &args.EnterpriseMeta),
},
Service: psn,
ManualIPs: args.ManualVIPs,
}
resp, err := m.srv.raftApplyMsgpack(structs.UpdateVirtualIPRequestType, req)
Expand Down
64 changes: 51 additions & 13 deletions agent/consul/internal_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"testing"
"time"

"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -3885,36 +3885,66 @@ func TestInternal_AssignManualServiceVIPs(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.AssignManualServiceVIPs", req, &resp))

type testcase struct {
name string
req structs.AssignServiceManualVIPsRequest
expect structs.AssignServiceManualVIPsResponse
expectErr string
name string
req structs.AssignServiceManualVIPsRequest
expect structs.AssignServiceManualVIPsResponse
expectAgain structs.AssignServiceManualVIPsResponse
expectErr string
expectIPs []string
}
run := func(t *testing.T, tc testcase) {

run := func(t *testing.T, tc testcase, again bool) {
if tc.expectErr != "" && again {
return // we don't retest known errors
}

var resp structs.AssignServiceManualVIPsResponse
err := msgpackrpc.CallWithCodec(codec, "Internal.AssignManualServiceVIPs", tc.req, &resp)
if tc.expectErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectErr)
return
testutil.RequireErrorContains(t, err, tc.expectErr)
} else {
if again {
require.Equal(t, tc.expectAgain, resp)
} else {
require.Equal(t, tc.expect, resp)
}

psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName(tc.req.Service, nil)}
got, err := s1.fsm.State().ServiceManualVIPs(psn)
require.NoError(t, err)
require.NotNil(t, got)
require.Equal(t, tc.expectIPs, got.ManualIPs)
}
require.Equal(t, tc.expect, resp)
}

tcs := []testcase{
{
name: "successful manual ip assignment",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"1.1.1.1", "2.2.2.2"},
},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectIPs: []string{"1.1.1.1", "2.2.2.2"},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "successfully ignoring duplicates",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"1.2.3.4", "5.6.7.8", "1.2.3.4", "5.6.7.8"},
},
expectIPs: []string{"1.2.3.4", "5.6.7.8"},
expect: structs.AssignServiceManualVIPsResponse{Found: true},
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "reassign existing ip",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"8.8.8.8"},
},
expectIPs: []string{"8.8.8.8"},
expect: structs.AssignServiceManualVIPsResponse{
Found: true,
UnassignedFrom: []structs.PeeredServiceName{
Expand All @@ -3923,20 +3953,28 @@ func TestInternal_AssignManualServiceVIPs(t *testing.T) {
},
},
},
// When we repeat this operation the second time it's a no-op.
expectAgain: structs.AssignServiceManualVIPsResponse{Found: true},
},
{
name: "invalid ip",
req: structs.AssignServiceManualVIPsRequest{
Service: "web",
ManualVIPs: []string{"3.3.3.3", "invalid"},
},
expect: structs.AssignServiceManualVIPsResponse{},
expectErr: "not a valid",
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
t.Run("initial", func(t *testing.T) {
run(t, tc, false)
})
if tc.expectErr == "" {
t.Run("repeat", func(t *testing.T) {
run(t, tc, true) // only repeat a write if it isn't an known error
})
}
})
}
}
54 changes: 44 additions & 10 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"net"
"reflect"
"slices"
"sort"
"strings"

"github.com/hashicorp/go-memdb"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/types"
)

Expand Down Expand Up @@ -1106,6 +1109,9 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
for _, ip := range ips {
assignedIPs[ip] = struct{}{}
}

txnNeedsCommit := false

modifiedEntries := make(map[structs.PeeredServiceName]struct{})
for ip := range assignedIPs {
entry, err := tx.First(tableServiceVirtualIPs, indexManualVIPs, psn.ServiceName.PartitionOrDefault(), ip)
Expand All @@ -1118,7 +1124,13 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
}

newEntry := entry.(ServiceVirtualIP)
if newEntry.Service.ServiceName.Matches(psn.ServiceName) {

var (
thisServiceName = newEntry.Service.ServiceName
thisPeer = newEntry.Service.Peer
)

if thisServiceName.Matches(psn.ServiceName) && thisPeer == psn.Peer {
continue
}

Expand All @@ -1130,13 +1142,20 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
filteredIPs = append(filteredIPs, existingIP)
}
}
sort.Strings(filteredIPs)

newEntry.ManualIPs = filteredIPs
newEntry.ModifyIndex = idx
if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
modifiedEntries[newEntry.Service] = struct{}{}

if err := updateVirtualIPMaxIndexes(tx, idx, thisServiceName.PartitionOrDefault(), thisPeer); err != nil {
return false, nil, err
}

txnNeedsCommit = true
}

entry, err := tx.First(tableServiceVirtualIPs, indexID, psn)
Expand All @@ -1149,23 +1168,37 @@ func (s *Store) AssignManualServiceVIPs(idx uint64, psn structs.PeeredServiceNam
}

newEntry := entry.(ServiceVirtualIP)
newEntry.ManualIPs = ips
newEntry.ModifyIndex = idx

if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return false, nil, err
// Check to see if the slice already contains the same ips.
if !stringslice.EqualMapKeys(newEntry.ManualIPs, assignedIPs) {
newEntry.ManualIPs = slices.Clone(ips)
newEntry.ModifyIndex = idx

sort.Strings(newEntry.ManualIPs)

if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return false, nil, fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return false, nil, err
}
txnNeedsCommit = true
}
if err = tx.Commit(); err != nil {
return false, nil, err

if txnNeedsCommit {
if err = tx.Commit(); err != nil {
return false, nil, err
}
}

return true, maps.SliceOfKeys(modifiedEntries), nil
}

func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error {
// update global max index (for snapshots)
if err := indexUpdateMaxTxn(txn, idx, tableServiceVirtualIPs); err != nil {
return fmt.Errorf("failed while updating index: %w", err)
}
// update per-partition max index
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil {
return fmt.Errorf("failed while updating partitioned index: %w", err)
Expand Down Expand Up @@ -3086,6 +3119,7 @@ func servicesVirtualIPsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []ServiceVirt
vips = append(vips, vip)
}

// Pull from the global one
idx := maxIndexWatchTxn(tx, nil, tableServiceVirtualIPs)

return idx, vips, nil
Expand Down
Loading