diff --git a/api/types/authority.go b/api/types/authority.go index 59f08e2c6c7c2..2058ab4cbc596 100644 --- a/api/types/authority.go +++ b/api/types/authority.go @@ -728,3 +728,39 @@ func (f *CertAuthorityFilter) FromMap(m map[string]string) { (*f)[CertAuthType(key)] = val } } + +// Contains checks if the CA filter contains another CA filter as a subset. +// Unlike other filters, a CA filter's scope becomes more broad as map keys +// are added to it. +// Therefore, to check if kind's filter contains the subset's filter, +// we should check that the subset's keys are all present in kind and as +// narrow or narrower. +// A special case is when kind's filter is either empty or specifies all +// authorities, in which case it is as broad as possible and subset's filter +// is always contained within it. +func (f CertAuthorityFilter) Contains(other CertAuthorityFilter) bool { + if len(f) == 0 { + // f has no filter, which is as broad as possible. + return true + } + + if len(other) == 0 { + // f has a filter, but other does not. + // treat this as "contained" if f's filter is for all authorities. + for _, caType := range CertAuthTypes { + clusterName, ok := f[caType] + if !ok || clusterName != Wildcard { + return false + } + } + return true + } + + for k, v := range other { + v2, ok := f[k] + if !ok || (v2 != Wildcard && v2 != v) { + return false + } + } + return true +} diff --git a/api/types/events.go b/api/types/events.go index 375680090d7ec..99a2aafaf53e9 100644 --- a/api/types/events.go +++ b/api/types/events.go @@ -176,6 +176,13 @@ func (kind WatchKind) Contains(subset WatchKind) bool { return false } + if kind.Kind == KindCertAuthority { + var a, b CertAuthorityFilter + a.FromMap(kind.Filter) + b.FromMap(subset.Filter) + return a.Contains(b) + } + for k, v := range kind.Filter { if subset.Filter[k] != v { return false diff --git a/api/types/events_test.go b/api/types/events_test.go index 95d87a454b3e5..1babc0ae42bb7 100644 --- a/api/types/events_test.go +++ b/api/types/events_test.go @@ -25,6 +25,10 @@ import ( // TestWatchKindContains tests that the WatchKind.Contains method correctly detects whether its receiver contains its // argument. func TestWatchKindContains(t *testing.T) { + allCAFilter := make(CertAuthorityFilter) + for _, caType := range CertAuthTypes { + allCAFilter[caType] = Wildcard + } testCases := []struct { name string kind WatchKind @@ -148,6 +152,109 @@ func TestWatchKindContains(t *testing.T) { }, assertion: require.False, }, + { + name: "yes: superset and subset have no CA filter", + kind: WatchKind{ + Kind: "cert_authority", + }, + other: WatchKind{ + Kind: "cert_authority", + }, + assertion: require.True, + }, + { + name: "yes: superset has no CA filter", + kind: WatchKind{ + Kind: "cert_authority", + }, + other: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "b", + "c": "d", + }, + }, + assertion: require.True, + }, + { + name: "yes: superset filter matches all, subset has no CA filter", + kind: WatchKind{ + Kind: "cert_authority", + Filter: allCAFilter.IntoMap(), + }, + other: WatchKind{ + Kind: "cert_authority", + }, + assertion: require.True, + }, + { + name: "yes: subset has narrower CA filter", + kind: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "b", + "c": Wildcard, + "e": "f", + }, + }, + other: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "b", + "c": "d", + }, + }, + assertion: require.True, + }, + { + name: "no: superset filter does not match all, subset has no CA filter", + kind: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "b", + "c": "d", + }, + }, + other: WatchKind{ + Kind: "cert_authority", + }, + assertion: require.False, + }, + { + name: "no: subset has wider CA filter", + kind: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "b", + "c": "d", + }, + }, + other: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "b", + "c": "d", + "e": "", + }, + }, + assertion: require.False, + }, + { + name: "no: subset filter does not match", + kind: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "b", + }, + }, + other: WatchKind{ + Kind: "cert_authority", + Filter: map[string]string{ + "a": "", + }, + }, + assertion: require.False, + }, } for _, tc := range testCases { diff --git a/lib/cache/cache.go b/lib/cache/cache.go index cd49afa2a87bd..0131d2bb8973a 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -96,6 +96,21 @@ func isHighVolumeResource(kind string) bool { return ok } +// makeAllKnownCAsFilter makes a filter that matches all known CA types. +// This should be installed by default on every CA watcher, unless a filter is +// otherwise specified, to avoid complicated server-side hacks if/when we add +// a new CA type. +// This is different from a nil/empty filter in that all the CA types that the +// client knows about will be returned rather than all the CA types that the +// server knows about. +func makeAllKnownCAsFilter() types.CertAuthorityFilter { + filter := make(types.CertAuthorityFilter, len(types.CertAuthTypes)) + for _, t := range types.CertAuthTypes { + filter[t] = types.Wildcard + } + return filter +} + // ForAuth sets up watch configuration for the auth server func ForAuth(cfg Config) Config { cfg.target = "auth" @@ -164,7 +179,7 @@ func ForAuth(cfg Config) Config { func ForProxy(cfg Config) Config { cfg.target = "proxy" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindClusterAuditConfig}, {Kind: types.KindClusterNetworkingConfig}, @@ -221,7 +236,7 @@ func ForProxy(cfg Config) Config { func ForRemoteProxy(cfg Config) Config { cfg.target = "remote-proxy" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindClusterAuditConfig}, {Kind: types.KindClusterNetworkingConfig}, @@ -253,7 +268,7 @@ func ForRemoteProxy(cfg Config) Config { func ForOldRemoteProxy(cfg Config) Config { cfg.target = "remote-proxy-old" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindClusterAuditConfig}, {Kind: types.KindClusterNetworkingConfig}, @@ -313,7 +328,7 @@ func ForNode(cfg Config) Config { func ForKubernetes(cfg Config) Config { cfg.target = "kube" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindClusterAuditConfig}, {Kind: types.KindClusterNetworkingConfig}, @@ -333,7 +348,7 @@ func ForKubernetes(cfg Config) Config { func ForApps(cfg Config) Config { cfg.target = "apps" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindClusterAuditConfig}, {Kind: types.KindClusterNetworkingConfig}, @@ -355,7 +370,7 @@ func ForApps(cfg Config) Config { func ForDatabases(cfg Config) Config { cfg.target = "db" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindClusterAuditConfig}, {Kind: types.KindClusterNetworkingConfig}, @@ -377,7 +392,7 @@ func ForDatabases(cfg Config) Config { func ForWindowsDesktop(cfg Config) Config { cfg.target = "windows_desktop" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindClusterAuditConfig}, {Kind: types.KindClusterNetworkingConfig}, @@ -397,7 +412,7 @@ func ForWindowsDesktop(cfg Config) Config { func ForDiscovery(cfg Config) Config { cfg.target = "discovery" cfg.Watches = []types.WatchKind{ - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindClusterName}, {Kind: types.KindNamespace, Name: apidefaults.Namespace}, {Kind: types.KindNode}, @@ -417,7 +432,7 @@ func ForOkta(cfg Config) Config { cfg.target = "okta" cfg.Watches = []types.WatchKind{ {Kind: types.KindClusterName}, - {Kind: types.KindCertAuthority, LoadSecrets: false}, + {Kind: types.KindCertAuthority, LoadSecrets: false, Filter: makeAllKnownCAsFilter().IntoMap()}, {Kind: types.KindUser}, {Kind: types.KindAppServer}, {Kind: types.KindClusterNetworkingConfig}, diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index c11f7e4f95d31..52491acf907b5 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -2702,6 +2702,25 @@ func TestCache_Backoff(t *testing.T) { // TestSetupConfigFns ensures that all WatchKinds used in setup config functions are present in ForAuth() as well. func TestSetupConfigFns(t *testing.T) { + t.Parallel() + ctx := context.Background() + bk, err := memory.New(memory.Config{ + Context: ctx, + Mirror: true, + }) + require.NoError(t, err) + defer bk.Close() + + clusterConfigCache, err := local.NewClusterConfigurationService(bk) + require.NoError(t, err) + + clusterName, err := services.NewClusterNameWithRandomID(types.ClusterNameSpecV2{ + ClusterName: "example.com", + }) + require.NoError(t, err) + err = clusterConfigCache.UpsertClusterName(clusterName) + require.NoError(t, err) + setupFuncs := map[string]SetupConfigFn{ "ForProxy": ForProxy, "ForRemoteProxy": ForRemoteProxy, @@ -2716,20 +2735,27 @@ func TestSetupConfigFns(t *testing.T) { } authKindMap := make(map[resourceKind]types.WatchKind) - for _, wk := range ForAuth(Config{}).Watches { + for _, wk := range ForAuth(Config{ClusterConfig: clusterConfigCache}).Watches { authKindMap[resourceKind{kind: wk.Kind, subkind: wk.SubKind}] = wk } for name, f := range setupFuncs { t.Run(name, func(t *testing.T) { - for _, wk := range f(Config{}).Watches { + for _, wk := range f(Config{ClusterConfig: clusterConfigCache}).Watches { authWK, ok := authKindMap[resourceKind{kind: wk.Kind, subkind: wk.SubKind}] if !ok || !authWK.Contains(wk) { t.Errorf("%s includes WatchKind %s that is missing from ForAuth", name, wk.String()) } + if wk.Kind == types.KindCertAuthority { + require.NotEmpty(t, wk.Filter, "every setup fn except auth should have a CA filter") + } } }) } + + authCAWatchKind, ok := authKindMap[resourceKind{kind: types.KindCertAuthority}] + require.True(t, ok) + require.Empty(t, authCAWatchKind.Filter, "auth should not use a CA filter") } type proxyEvents struct { @@ -3306,3 +3332,86 @@ const testEntityDescriptor = ` ` + +// TestCAWatcherFilters tests cache CA watchers with filters are not rejected +// by auth, even if a CA filter includes a "new" CA type. +func TestCAWatcherFilters(t *testing.T) { + t.Parallel() + ctx := context.Background() + p := newPackForAuth(t) + t.Cleanup(p.Close) + + allCAsAndNewCAFilter := makeAllKnownCAsFilter() + // auth will never send such an event, but it won't reject the watch request + // either since auth cache's confirmedKinds dont have a CA filter. + allCAsAndNewCAFilter["someBackportedCAType"] = "*" + + tests := []struct { + desc string + filter types.CertAuthorityFilter + watcher types.Watcher + }{ + { + desc: "empty filter", + }, + { + desc: "all CAs filter", + filter: makeAllKnownCAsFilter(), + }, + { + desc: "all CAs and a new CA filter", + filter: allCAsAndNewCAFilter, + }, + } + + // setup watchers for each test case before we generate events. + for i := range tests { + test := &tests[i] + w, err := p.cache.NewWatcher(ctx, types.Watch{Kinds: []types.WatchKind{ + { + Kind: types.KindCertAuthority, + Filter: test.filter.IntoMap(), + }, + }}) + require.NoError(t, err) + test.watcher = w + t.Cleanup(func() { + require.NoError(t, w.Close()) + }) + } + + // generate an OpPut event. + ca := suite.NewTestCA(types.UserCA, "example.com") + require.NoError(t, p.trustS.UpsertCertAuthority(ctx, ca)) + + const fetchTimeout = time.Second + for _, test := range tests { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + event := fetchEvent(t, test.watcher, fetchTimeout) + require.Equal(t, types.OpInit, event.Type) + + event = fetchEvent(t, test.watcher, fetchTimeout) + require.Equal(t, types.OpPut, event.Type) + require.Equal(t, types.KindCertAuthority, event.Resource.GetKind()) + gotCA, ok := event.Resource.(*types.CertAuthorityV2) + require.True(t, ok) + require.Equal(t, types.UserCA, gotCA.GetType()) + }) + } +} + +func fetchEvent(t *testing.T, w types.Watcher, timeout time.Duration) types.Event { + t.Helper() + timeoutC := time.After(timeout) + var ev types.Event + select { + case <-timeoutC: + require.Fail(t, "Timeout waiting for event", w.Error()) + case <-w.Done(): + require.Fail(t, "Watcher exited with error", w.Error()) + case ev = <-w.Events(): + } + return ev +} diff --git a/lib/services/local/events.go b/lib/services/local/events.go index e5ecebabe291c..3077a84583fca 100644 --- a/lib/services/local/events.go +++ b/lib/services/local/events.go @@ -66,7 +66,7 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type var parser resourceParser switch kind.Kind { case types.KindCertAuthority: - parser = newCertAuthorityParser(kind.LoadSecrets) + parser = newCertAuthorityParser(kind.LoadSecrets, kind.Filter) case types.KindToken: parser = newProvisionTokenParser() case types.KindStaticTokens: @@ -349,16 +349,20 @@ func (p baseParser) match(key []byte) bool { return false } -func newCertAuthorityParser(loadSecrets bool) *certAuthorityParser { +func newCertAuthorityParser(loadSecrets bool, filter map[string]string) *certAuthorityParser { + var caFilter types.CertAuthorityFilter + caFilter.FromMap(filter) return &certAuthorityParser{ loadSecrets: loadSecrets, baseParser: newBaseParser(backend.Key(authoritiesPrefix)), + filter: caFilter, } } type certAuthorityParser struct { baseParser loadSecrets bool + filter types.CertAuthorityFilter } func (p *certAuthorityParser) parse(event backend.Event) (types.Resource, error) { @@ -383,6 +387,9 @@ func (p *certAuthorityParser) parse(event backend.Event) (types.Resource, error) if err != nil { return nil, trace.Wrap(err) } + if !p.filter.Match(ca) { + return nil, nil + } // never send private signing keys over event stream? // this might not be true setSigningKeys(ca, p.loadSecrets) diff --git a/lib/services/local/events_test.go b/lib/services/local/events_test.go new file mode 100644 index 0000000000000..1bfe8a086c1d6 --- /dev/null +++ b/lib/services/local/events_test.go @@ -0,0 +1,125 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package local + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + + "github.com/gravitational/teleport/api/types" + "github.com/gravitational/teleport/lib/backend/memory" + "github.com/gravitational/teleport/lib/services/suite" +) + +func TestNewWatcher_CertAuthority(t *testing.T) { + t.Parallel() + + // setup backend and events service + clock := clockwork.NewFakeClock() + bk, err := memory.New(memory.Config{ + Clock: clock, + }) + require.NoError(t, err) + t.Cleanup(func() { bk.Close() }) + eventsSvc := NewEventsService(bk) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // setup watchers - one filtered the other not + filteredWatcher, err := eventsSvc.NewWatcher(ctx, types.Watch{Kinds: []types.WatchKind{{ + Kind: types.KindCertAuthority, + Filter: types.CertAuthorityFilter{ + types.HostCA: "example.com", + }.IntoMap(), + LoadSecrets: false, + }}}) + require.NoError(t, err) + + unfilteredWatcher, err := eventsSvc.NewWatcher(ctx, types.Watch{Kinds: []types.WatchKind{{ + Kind: types.KindCertAuthority, + LoadSecrets: false, + }}}) + require.NoError(t, err) + + // create some CAs to generate OpPut events. + userCA := suite.NewTestCA(types.UserCA, "example.com") + hostCA := suite.NewTestCA(types.HostCA, "example.com") + hostCARemote := suite.NewTestCA(types.HostCA, "remote.com") + err = CreateResources(ctx, bk, userCA, hostCA, hostCARemote) + require.NoError(t, err) + + const fetchTimeout = 3 * time.Second + t.Run("with filter", func(t *testing.T) { + event := fetchEvent(t, filteredWatcher, fetchTimeout) + require.Equal(t, types.OpInit, event.Type) + + event = fetchEvent(t, filteredWatcher, fetchTimeout) + require.Equal(t, types.OpPut, event.Type) + ca, ok := event.Resource.(*types.CertAuthorityV2) + require.True(t, ok) + require.Equal(t, types.CertAuthID{ + Type: types.HostCA, + DomainName: "example.com", + }, ca.GetID()) + }) + + t.Run("without filter", func(t *testing.T) { + event := fetchEvent(t, unfilteredWatcher, fetchTimeout) + require.Equal(t, types.OpInit, event.Type) + + var putEvents []types.Event + putEvents = append(putEvents, fetchEvent(t, unfilteredWatcher, fetchTimeout)) + putEvents = append(putEvents, fetchEvent(t, unfilteredWatcher, fetchTimeout)) + putEvents = append(putEvents, fetchEvent(t, unfilteredWatcher, fetchTimeout)) + + gotCertAuthIDSet := map[types.CertAuthID]struct{}{} + for _, event := range putEvents { + require.Equal(t, types.OpPut, event.Type) + ca, ok := event.Resource.(*types.CertAuthorityV2) + require.True(t, ok) + gotCertAuthIDSet[ca.GetID()] = struct{}{} + } + want := map[types.CertAuthID]struct{}{ + {Type: types.UserCA, DomainName: "example.com"}: {}, + {Type: types.HostCA, DomainName: "example.com"}: {}, + {Type: types.HostCA, DomainName: "remote.com"}: {}, + } + require.Empty(t, cmp.Diff(want, gotCertAuthIDSet)) + }) +} + +func fetchEvent(t *testing.T, w types.Watcher, timeout time.Duration) types.Event { + t.Helper() + timeoutC := time.After(timeout) + var ev types.Event + select { + case <-timeoutC: + require.Fail(t, "Timeout waiting for event", w.Error()) + case <-w.Done(): + require.Fail(t, "Watcher exited with error", w.Error()) + case ev = <-w.Events(): + } + return ev +} diff --git a/lib/services/watcher.go b/lib/services/watcher.go index 3553b9f18a607..9412d68a3ecae 100644 --- a/lib/services/watcher.go +++ b/lib/services/watcher.go @@ -1455,6 +1455,9 @@ func (cfg *CertAuthorityWatcherConfig) CheckAndSetDefaults() error { } cfg.AuthorityGetter = getter } + if len(cfg.Types) == 0 { + return trace.BadParameter("missing parameter Types") + } return nil } @@ -1468,11 +1471,13 @@ func NewCertAuthorityWatcher(ctx context.Context, cfg CertAuthorityWatcherConfig CertAuthorityWatcherConfig: cfg, fanout: NewFanout(), cas: make(map[types.CertAuthType]map[string]types.CertAuthority, len(cfg.Types)), + filter: make(types.CertAuthorityFilter, len(cfg.Types)), initializationC: make(chan struct{}), } for _, t := range cfg.Types { collector.cas[t] = make(map[string]types.CertAuthority) + collector.filter[t] = types.Wildcard } // Resource watcher require the fanout to be initialized before passing in. // Otherwise, Emit() may fail due to a race condition mentioned in https://github.com/gravitational/teleport/issues/19289 @@ -1503,10 +1508,14 @@ type caCollector struct { // initializationC is used to check whether the initial sync has completed initializationC chan struct{} once sync.Once + filter types.CertAuthorityFilter } // Subscribe is used to subscribe to the lock updates. func (c *caCollector) Subscribe(ctx context.Context, filter types.CertAuthorityFilter) (types.Watcher, error) { + if len(filter) == 0 { + filter = c.filter + } watch := types.Watch{ Kinds: []types.WatchKind{ { @@ -1532,7 +1541,7 @@ func (c *caCollector) Subscribe(ctx context.Context, filter types.CertAuthorityF // resourceKinds specifies the resource kind to watch. func (c *caCollector) resourceKinds() []types.WatchKind { - return []types.WatchKind{{Kind: types.KindCertAuthority}} + return []types.WatchKind{{Kind: types.KindCertAuthority, Filter: c.filter.IntoMap()}} } // isInitialized is used to check that the cache has done its initial @@ -1620,12 +1629,9 @@ func (c *caCollector) processEventAndUpdateCurrent(ctx context.Context, event ty } func (c *caCollector) watchingType(t types.CertAuthType) bool { - for _, caType := range c.Types { - if caType == t { - return true - } + if _, ok := c.cas[t]; ok { + return true } - return false }