Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions agent/consul/fsm/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
StorageBackend: storageBackend,
})

fsm.state.SystemMetadataSet(10, &structs.SystemMetadataEntry{Key: structs.SystemMetadataVirtualIPsEnabled, Value: "true"})

// Add some state
node1 := &structs.Node{
ID: "610918a6-464f-fa9b-1a95-03bd6e88ed92",
Expand Down Expand Up @@ -79,8 +81,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
Connect: connectConf,
})

psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("web", nil)}
vip, err := fsm.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.1")

fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})

fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
fsm.state.EnsureCheck(7, &structs.HealthCheck{
Node: "foo",
Expand Down Expand Up @@ -442,6 +450,10 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
},
}
require.NoError(t, fsm.state.EnsureConfigEntry(26, serviceIxn))
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("foo", nil)}
vip, err = fsm.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")

// mesh config entry
meshConfig := &structs.MeshConfigEntry{
Expand All @@ -465,10 +477,10 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
Port: 8000,
Connect: connectConf,
})
psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)}
vip, err := fsm.state.VirtualIPForService(psn)
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)}
vip, err = fsm.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.1")
require.Equal(t, vip, "240.0.0.3")

fsm.state.EnsureService(30, "foo", &structs.NodeService{
ID: "backend",
Expand All @@ -480,7 +492,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("backend", nil)}
vip, err = fsm.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")
require.Equal(t, vip, "240.0.0.4")

_, serviceNames, err := fsm.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
require.NoError(t, err)
Expand Down Expand Up @@ -534,15 +546,15 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
},
}))

// Add a service-resolver entry to get a virtual IP for service foo
// Add a service-resolver entry to get a virtual IP for service goo
resolverEntry := &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "foo",
Name: "goo",
}
require.NoError(t, fsm.state.EnsureConfigEntry(34, resolverEntry))
vip, err = fsm.state.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.NewServiceName("foo", nil)})
vip, err = fsm.state.VirtualIPForService(structs.PeeredServiceName{ServiceName: structs.NewServiceName("goo", nil)})
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.3")
require.Equal(t, vip, "240.0.0.5")

// Resources
resource, err := storageBackend.WriteCAS(context.Background(), &pbresource.Resource{
Expand Down Expand Up @@ -665,18 +677,26 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Equal(t, uint64(25), checks[0].ModifyIndex)

// Verify virtual IPs are consistent.
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)}
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("web", nil)}
vip, err = fsm2.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.1")
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("backend", nil)}
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("foo", nil)}
vip, err = fsm2.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.2")
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("foo", nil)}
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("frontend", nil)}
vip, err = fsm2.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.3")
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("backend", nil)}
vip, err = fsm2.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.4")
psn = structs.PeeredServiceName{ServiceName: structs.NewServiceName("goo", nil)}
vip, err = fsm2.state.VirtualIPForService(psn)
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.5")

// Verify key is set
_, d, err := fsm2.state.KVSGet(nil, "/test", nil)
Expand Down
15 changes: 11 additions & 4 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
if err != nil {
return err
}
if supported {
if supported && sn.Name != "" {
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
vip, err := assignServiceVirtualIP(tx, idx, psn)
if err != nil {
Expand Down Expand Up @@ -2110,7 +2110,13 @@ func freeServiceVirtualIP(

// Don't deregister the virtual IP if at least one resolver/router/splitter config entry still
// references this service.
configEntryVIPKinds := []string{structs.ServiceResolver, structs.ServiceRouter, structs.ServiceSplitter}
configEntryVIPKinds := []string{
structs.ServiceResolver,
structs.ServiceRouter,
structs.ServiceSplitter,
structs.ServiceDefaults,
structs.ServiceIntentions,
}
for _, kind := range configEntryVIPKinds {
_, entry, err := configEntryTxn(tx, nil, kind, psn.ServiceName.Name, &psn.ServiceName.EnterpriseMeta)
if err != nil {
Expand Down Expand Up @@ -3051,14 +3057,15 @@ func (s *Store) ServiceVirtualIPs() (uint64, []ServiceVirtualIP, error) {
tx := s.db.Txn(false)
defer tx.Abort()

return servicesVirtualIPsTxn(tx)
return servicesVirtualIPsTxn(tx, nil)
}

func servicesVirtualIPsTxn(tx ReadTxn) (uint64, []ServiceVirtualIP, error) {
func servicesVirtualIPsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []ServiceVirtualIP, error) {
iter, err := tx.Get(tableServiceVirtualIPs, indexID)
if err != nil {
return 0, nil, err
}
ws.Add(iter.WatchCh())

var vips []ServiceVirtualIP
for raw := iter.Next(); raw != nil; raw = iter.Next() {
Expand Down
41 changes: 33 additions & 8 deletions agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package state
import (
"errors"
"fmt"
"strings"

memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -465,9 +466,8 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *a
return fmt.Errorf("failed updating index: %s", err)
}

// If this is a resolver/router/splitter, attempt to delete the virtual IP associated
// with this service.
if kind == structs.ServiceResolver || kind == structs.ServiceRouter || kind == structs.ServiceSplitter {
// Attempt to delete the virtual IP associated with this service, if applicable.
if configEntryHasVirtualIP(c) {
psn := structs.PeeredServiceName{ServiceName: sn}
if err := freeServiceVirtualIP(tx, idx, psn, nil); err != nil {
return fmt.Errorf("failed to clean up virtual IP for %q: %v", psn.String(), err)
Expand Down Expand Up @@ -519,11 +519,14 @@ func insertConfigEntryWithTxn(tx WriteTxn, idx uint64, conf structs.ConfigEntry)
if err != nil {
return err
}
case structs.ServiceResolver:
fallthrough
case structs.ServiceRouter:
fallthrough
case structs.ServiceSplitter:
}

// Assign virtual-ips, if needed
supported, err := virtualIPsSupported(tx, nil)
if err != nil {
return err
}
if supported && configEntryHasVirtualIP(conf) {
psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName(conf.GetName(), conf.GetEnterpriseMeta())}
if _, err := assignServiceVirtualIP(tx, idx, psn); err != nil {
return err
Expand All @@ -541,6 +544,28 @@ func insertConfigEntryWithTxn(tx WriteTxn, idx uint64, conf structs.ConfigEntry)
return nil
}

func configEntryHasVirtualIP(c structs.ConfigEntry) bool {
if c == nil || c.GetName() == "" {
return false
}
switch c.GetKind() {
case structs.ServiceRouter:
return true
case structs.ServiceResolver:
return true
case structs.ServiceSplitter:
return true
case structs.ServiceDefaults:
return true
case structs.ServiceIntentions:
entMeta := c.GetEnterpriseMeta()
return !strings.Contains(c.GetName(), "*") &&
!strings.Contains(entMeta.NamespaceOrDefault(), "*") &&
!strings.Contains(entMeta.PartitionOrDefault(), "*")
}
return false
}

// validateProposedConfigEntryInGraph can be used to verify graph integrity for
// a proposed graph create/update/delete.
//
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/state/intention.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,7 @@ func (s *Store) intentionTopologyTxn(
// We only need to do this for upstreams currently, so that tproxy can find which discovery chains should be
// contacted for failover scenarios. Virtual services technically don't need to be considered as downstreams,
// because they will take on the identity of the calling service, rather than the chain itself.
vipIndex, vipServices, err := servicesVirtualIPsTxn(tx)
vipIndex, vipServices, err := servicesVirtualIPsTxn(tx, ws)
if err != nil {
return index, nil, fmt.Errorf("failed to list service virtual IPs: %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions agent/consul/state/intention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2097,6 +2097,7 @@ func disableLegacyIntentions(s *Store) error {

func testConfigStateStore(t *testing.T) *Store {
s := testStateStore(t)
s.SystemMetadataSet(5, &structs.SystemMetadataEntry{Key: structs.SystemMetadataVirtualIPsEnabled, Value: "true"})
disableLegacyIntentions(s)
return s
}
Expand Down Expand Up @@ -2651,6 +2652,7 @@ func TestStore_IntentionTopology_Destination(t *testing.T) {

func TestStore_IntentionTopology_Watches(t *testing.T) {
s := testConfigStateStore(t)
s.SystemMetadataSet(10, &structs.SystemMetadataEntry{Key: structs.SystemMetadataVirtualIPsEnabled, Value: "true"})

var i uint64 = 1
require.NoError(t, s.EnsureNode(i, &structs.Node{
Expand Down Expand Up @@ -2687,7 +2689,8 @@ func TestStore_IntentionTopology_Watches(t *testing.T) {
index, got, err = s.IntentionTopology(ws, target, false, acl.Deny, structs.IntentionTargetService)
require.NoError(t, err)
require.Equal(t, uint64(2), index)
require.Empty(t, got)
// Because API is a virtual service, it is included in this output.
require.Equal(t, structs.ServiceList{structs.NewServiceName("api", nil)}, got)

// Watch should not fire after unrelated intention changes
require.NoError(t, s.EnsureConfigEntry(i, &structs.ServiceIntentionsConfigEntry{
Expand All @@ -2701,15 +2704,14 @@ func TestStore_IntentionTopology_Watches(t *testing.T) {
},
}))
i++

// TODO(freddy) Why is this firing?
// require.False(t, watchFired(ws))

// Result should not have changed
index, got, err = s.IntentionTopology(ws, target, false, acl.Deny, structs.IntentionTargetService)
require.NoError(t, err)
require.Equal(t, uint64(3), index)
require.Empty(t, got)
require.Equal(t, structs.ServiceList{structs.NewServiceName("api", nil)}, got)

// Watch should fire after service list changes
require.NoError(t, s.EnsureService(i, "foo", &structs.NodeService{
Expand Down