From a1ae772256a3afa23ccf32980514868f2f4141f9 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Fri, 24 Oct 2025 11:54:21 +0200 Subject: [PATCH 1/2] MergeStreamsWithPriority --- lib/itertools/stream/stream.go | 76 ++++++++ lib/itertools/stream/stream_test.go | 262 ++++++++++++++++++++++++++++ 2 files changed, 338 insertions(+) diff --git a/lib/itertools/stream/stream.go b/lib/itertools/stream/stream.go index 855abd91f7316..17d6ff8105bf4 100644 --- a/lib/itertools/stream/stream.go +++ b/lib/itertools/stream/stream.go @@ -405,6 +405,82 @@ func MergeStreams[T any]( } } +// MergeStreamsWithPriority merges two sorted streams prioritizing streamA when compare indicates items are equal. +func MergeStreamsWithPriority[T any]( + streamA Stream[T], + streamB Stream[T], + compare func(a, b T) int, +) Stream[T] { + return func(yield func(T, error) bool) { + var itemA, itemB T + + nextA, stopA := iter.Pull2(streamA) + defer stopA() + nextB, stopB := iter.Pull2(streamB) + defer stopB() + + itemA, errA, okA := nextA() + itemB, errB, okB := nextB() + + // Both streams have items, merge with priority for A + for okA && okB { + if errA != nil { + yield(*new(T), errA) + return + } + if errB != nil { + yield(*new(T), errB) + return + } + + switch cmp := compare(itemA, itemB); { + case cmp < 0: + if !yield(itemA, nil) { + return + } + itemA, errA, okA = nextA() + + case cmp > 0: + if !yield(itemB, nil) { + return + } + itemB, errB, okB = nextB() + + default: // cmp == 0 + // Yield A and skip B. + if !yield(itemA, nil) { + return + } + itemA, errA, okA = nextA() + itemB, errB, okB = nextB() + } + } + + // Drain + for okA { + if errA != nil { + yield(*new(T), errA) + return + } + if !yield(itemA, nil) { + return + } + itemA, errA, okA = nextA() + } + + for okB { + if errB != nil { + yield(*new(T), errB) + return + } + if !yield(itemB, nil) { + return + } + itemB, errB, okB = nextB() + } + } +} + // TakeWhile iterates the stream taking items while predicate returns true func TakeWhile[T any](stream Stream[T], predicate func(T) bool) Stream[T] { return func(yield func(T, error) bool) { diff --git a/lib/itertools/stream/stream_test.go b/lib/itertools/stream/stream_test.go index 7dc9dcfc0545c..7ab3e10013ba2 100644 --- a/lib/itertools/stream/stream_test.go +++ b/lib/itertools/stream/stream_test.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "strings" "testing" "time" @@ -885,6 +886,267 @@ func TestMergeStreams(t *testing.T) { }) } +func TestMergeStreamsWithPriority(t *testing.T) { + t.Parallel() + + type Item struct { + name string + stream string + } + + newItem := func(stream string, name string) Item { + return Item{name: name, stream: stream} + } + + newItemSlice := func(stream string, names ...string) []Item { + out := make([]Item, 0, len(names)) + for _, name := range names { + out = append(out, newItem(stream, name)) + } + return out + } + + compareFunc := func(a, b Item) int { + return strings.Compare(a.name, b.name) + } + + // Test the case where the streams should have interlaced values. + t.Run("interlaced streams", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamB", "b"), + newItem("streamA", "c"), + newItem("streamB", "d"), + newItem("streamA", "e"), + newItem("streamB", "f"), + } + streamA := Slice(newItemSlice("streamA", "a", "c", "e")) + streamB := Slice(newItemSlice("streamB", "b", "d", "f")) + + got, err := Collect(MergeStreamsWithPriority(streamA, streamB, compareFunc)) + require.NoError(t, err) + require.Equal(t, want, got) + }) + + // Test the case where streamA is empty. + t.Run("stream A empty", func(t *testing.T) { + want := []Item{ + newItem("streamB", "b"), + newItem("streamB", "d"), + newItem("streamB", "f"), + } + streamA := Empty[Item]() + streamB := Slice(newItemSlice("streamB", "b", "d", "f")) + + got, err := Collect(MergeStreamsWithPriority(streamA, streamB, compareFunc)) + + require.NoError(t, err) + require.Equal(t, want, got) + + }) + + // Test the case where streamB is empty. + t.Run("stream B empty", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamA", "c"), + newItem("streamA", "e"), + } + streamA := Slice(newItemSlice("streamA", "a", "c", "e")) + streamB := Empty[Item]() + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.NoError(t, err) + require.Equal(t, want, got) + }) + + // Test the case where both streams are empty. + t.Run("both streams empty", func(t *testing.T) { + streamA := Empty[Item]() + streamB := Empty[Item]() + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.NoError(t, err) + require.Empty(t, got) + }) + + // Test the case where every value in streamA is lower than every value in streamB. + t.Run("compare always favors A", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamB", "b"), + newItem("streamA", "c"), + newItem("streamB", "d"), + newItem("streamA", "e"), + newItem("streamB", "f"), + } + streamA := Slice(newItemSlice("streamA", "a", "c", "e")) + streamB := Slice(newItemSlice("streamB", "b", "d", "f")) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.NoError(t, err) + require.Equal(t, want, got) + }) + + // Test the case where every value in streamB is lower than every value in streamA. + t.Run("compare always favors B", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamB", "b"), + newItem("streamA", "c"), + newItem("streamB", "d"), + newItem("streamA", "e"), + newItem("streamB", "f"), + } + streamA := Slice(newItemSlice("streamA", "a", "c", "e")) + streamB := Slice(newItemSlice("streamB", "b", "d", "f")) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.NoError(t, err) + require.Equal(t, want, got) + }) + + t.Run("compare always favors A with clashing item yielding A", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamB", "b"), + newItem("streamA", "c"), + newItem("streamA", "e"), + newItem("streamB", "f"), + } + streamA := Slice(newItemSlice("streamA", "a", "c", "e")) + streamB := Slice(newItemSlice("streamB", "b", "c", "f")) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.NoError(t, err) + require.Equal(t, want, got) + }) + + t.Run("compare always favors B with clashing item yielding A", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamB", "b"), + newItem("streamA", "c"), + newItem("streamA", "e"), + newItem("streamB", "f"), + } + streamA := Slice(newItemSlice("streamA", "a", "c", "e")) + streamB := Slice(newItemSlice("streamB", "b", "c", "f")) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.NoError(t, err) + require.Equal(t, want, got) + }) + + t.Run("compare clashing keys always favors A", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamA", "b"), + newItem("streamA", "c"), + } + streamA := Slice(newItemSlice("streamA", "a", "b", "c")) + streamB := Slice(newItemSlice("streamB", "a", "b", "c")) + + got, err := Collect(MergeStreamsWithPriority(streamA, streamB, compareFunc)) + require.NoError(t, err) + require.Equal(t, want, got) + }) + + t.Run("mid stream fail in A", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + } + + streamA := Chain( + Slice(newItemSlice("streamA", "a")), + Fail[Item](fmt.Errorf("some error")), + Slice(newItemSlice("streamA", "c", "e")), + ) + streamB := Slice(newItemSlice("streamB", "b", "d", "f")) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.Error(t, err) + require.Equal(t, want, got) + }) + + t.Run("mid stream B with Fail and key clash", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamA", "b"), + } + + streamA := Slice(newItemSlice("streamA", "a", "b", "c")) + streamB := Chain( + Slice(newItemSlice("streamB", "b")), + Fail[Item](fmt.Errorf("some error")), + Slice(newItemSlice("streamB", "d", "f")), + ) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.Error(t, err) + require.Equal(t, want, got) + }) + + t.Run("tail stream B fails", func(t *testing.T) { + want := []Item{ + newItem("streamA", "a"), + newItem("streamA", "b"), + newItem("streamA", "c"), + newItem("streamB", "d"), + } + + streamA := Slice(newItemSlice("streamA", "a", "b", "c")) + streamB := Chain( + Slice(newItemSlice("streamB", "d")), + Fail[Item](fmt.Errorf("some error")), + ) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.Error(t, err) + require.Equal(t, want, got) + }) + + t.Run("tail stream A fails", func(t *testing.T) { + want := []Item{ + newItem("streamB", "a"), + newItem("streamB", "b"), + newItem("streamB", "c"), + newItem("streamA", "d"), + } + + streamA := Chain( + Slice(newItemSlice("streamA", "d")), + Fail[Item](fmt.Errorf("some error")), + ) + streamB := Slice(newItemSlice("streamB", "a", "b", "c")) + + resultStream := MergeStreamsWithPriority(streamA, streamB, compareFunc) + got, err := Collect(resultStream) + + require.Error(t, err) + require.Equal(t, want, got) + }) + +} + func TestTakeWhile(t *testing.T) { t.Parallel() From d8bb637fd2933b2cca532000d3930b45a8a0f693 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Fri, 24 Oct 2025 12:24:43 +0200 Subject: [PATCH 2/2] Virtual resources for health_check_config --- constants.go | 8 +- .../healthcheckconfigv1/service_test.go | 148 +++++++++++++++++- lib/auth/init.go | 12 +- lib/services/local/events.go | 14 +- lib/services/local/health_check_config.go | 115 ++++++++++++-- .../local/health_check_config_test.go | 17 +- lib/services/presets.go | 33 +++- 7 files changed, 314 insertions(+), 33 deletions(-) diff --git a/constants.go b/constants.go index fc0d4f891a1ef..db811fd6496db 100644 --- a/constants.go +++ b/constants.go @@ -776,8 +776,14 @@ var PresetRoles = []string{PresetEditorRoleName, PresetAccessRoleName, PresetAud const ( // PresetDefaultHealthCheckConfigName is the name of a preset - // default health_check_config that enables health checks for all resources. + // health_check_config that enables health checks for all database + // resources. For historical reasons, it's named "default" even though it + // applies to databases only. PresetDefaultHealthCheckConfigName = "default" + // VirtualDefaultHealthCheckConfigKubeName is the name of a virtual + // health_check_config that enables health checks for all Kubernetes + // resources. + VirtualDefaultHealthCheckConfigKubeName = "default_kube" ) const ( diff --git a/lib/auth/healthcheckconfig/healthcheckconfigv1/service_test.go b/lib/auth/healthcheckconfig/healthcheckconfigv1/service_test.go index 03f80aba67079..b1b523c80720d 100644 --- a/lib/auth/healthcheckconfig/healthcheckconfigv1/service_test.go +++ b/lib/auth/healthcheckconfig/healthcheckconfigv1/service_test.go @@ -23,11 +23,16 @@ import ( "fmt" "os" "testing" + "time" + "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/durationpb" + "github.com/gravitational/teleport" healthcheckconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/healthcheckconfig/v1" labelv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/label/v1" "github.com/gravitational/teleport/api/types" @@ -98,7 +103,7 @@ func TestHealthCheckConfigCRUD(t *testing.T) { resp, err := clt.ServiceUnderTest.ListHealthCheckConfigs(ctx, &healthcheckconfigv1.ListHealthCheckConfigsRequest{}) if err == nil { require.NotNil(t, resp) - require.Len(t, resp.Configs, 2, "the test bootstrapped exactly 2 health_check_config resources") + require.Len(t, resp.Configs, 4, "expected 2 inserted, and 2 virtual presets") } return err }, @@ -199,7 +204,7 @@ func (c *accessTest) run(t *testing.T) { ctx, clt := c.setup(t, spec) err := c.actionFn(t, ctx, clt) require.Error(t, err) - require.IsType(t, trace.AccessDenied(""), err) + require.True(t, trace.IsAccessDenied(err)) }) t.Run(fmt.Sprintf("%s is denied", c.name), func(t *testing.T) { @@ -210,7 +215,7 @@ func (c *accessTest) run(t *testing.T) { ctx, clt := c.setup(t, spec) err := c.actionFn(t, ctx, clt) require.Error(t, err) - require.IsType(t, trace.AccessDenied(""), err) + require.True(t, trace.IsAccessDenied(err)) }) } @@ -315,3 +320,140 @@ func authorizerForDummyUser(t *testing.T, clt testClient, roleSpecs ...types.Rol }, }) } + +func newPresetService(t *testing.T, verbs ...string) (testClient, context.Context) { + t.Helper() + clt := newService(t, t.Context()) + return clt, authorizerForDummyUser(t, clt, types.RoleSpecV6{ + Allow: types.RoleConditions{ + Rules: []types.Rule{healthCheckConfigRule(verbs...)}, + }, + }) +} + +type presetTest struct { + name string + verbs []string + run func(t *testing.T, ctx context.Context, clt testClient) +} + +func TestHealthCheckConfigPresets(t *testing.T) { + t.Parallel() + + tests := []presetTest{ + { + name: "Get", + verbs: []string{types.VerbRead}, + run: func(t *testing.T, ctx context.Context, clt testClient) { + // Ensure that virtual presets can be retrieved without being + // explicitly created. + + kube, err := clt.ServiceUnderTest.GetHealthCheckConfig(ctx, &healthcheckconfigv1.GetHealthCheckConfigRequest{ + Name: teleport.VirtualDefaultHealthCheckConfigKubeName, + }) + require.NoError(t, err) + require.Equal(t, services.VirtualDefaultHealthCheckConfigKube(), kube) + }, + }, + { + name: "Create And Get", + verbs: []string{types.VerbCreate, types.VerbRead}, + run: func(t *testing.T, ctx context.Context, clt testClient) { + // Ensure that an explicitly created preset can be retrieved + // without being overwritten by the virtual preset. + + // Kube preset + kube1 := services.VirtualDefaultHealthCheckConfigKube() + kube1.Spec.Interval = durationpb.New(99 * time.Second) + _, err := clt.ServiceUnderTest.CreateHealthCheckConfig(ctx, &healthcheckconfigv1.CreateHealthCheckConfigRequest{ + Config: kube1, + }) + require.NoError(t, err) + kube2, err := clt.ServiceUnderTest.GetHealthCheckConfig(ctx, &healthcheckconfigv1.GetHealthCheckConfigRequest{ + Name: teleport.VirtualDefaultHealthCheckConfigKubeName, + }) + require.NoError(t, err) + require.Equal(t, kube1.Spec.Interval.AsDuration(), kube2.Spec.Interval.AsDuration()) + }, + }, + { + name: "List", + verbs: []string{types.VerbRead, types.VerbList}, + run: func(t *testing.T, ctx context.Context, clt testClient) { + // Ensure that presets can be listed without being + // explicitly created. + resp, err := clt.ServiceUnderTest.ListHealthCheckConfigs(ctx, &healthcheckconfigv1.ListHealthCheckConfigsRequest{}) + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Configs, 1, "expected 1 virtual preset") + for _, actual := range resp.Configs { + switch actual.GetMetadata().GetName() { + case teleport.VirtualDefaultHealthCheckConfigKubeName: + require.Empty(t, cmp.Diff(services.VirtualDefaultHealthCheckConfigKube(), actual, protocmp.Transform())) + } + } + }, + }, + { + name: "Create And List", + verbs: []string{types.VerbCreate, types.VerbRead, types.VerbList}, + run: func(t *testing.T, ctx context.Context, clt testClient) { + // Ensure that explicitly created presets can be listed + // without being overwritten by the virtual presets. + + kube := services.VirtualDefaultHealthCheckConfigKube() + kube.Spec.Interval = durationpb.New(99 * time.Second) + _, err := clt.ServiceUnderTest.CreateHealthCheckConfig(ctx, &healthcheckconfigv1.CreateHealthCheckConfigRequest{ + Config: kube, + }) + require.NoError(t, err) + + resp, err := clt.ServiceUnderTest.ListHealthCheckConfigs(ctx, &healthcheckconfigv1.ListHealthCheckConfigsRequest{}) + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Configs, 1, "expected 1 virtual preset") + for _, actual := range resp.Configs { + switch actual.GetMetadata().GetName() { + case teleport.VirtualDefaultHealthCheckConfigKubeName: + require.Equal(t, kube.Spec.Interval.AsDuration(), actual.Spec.Interval.AsDuration()) + } + } + }, + }, + { + name: "Get And Upsert", + verbs: []string{types.VerbCreate, types.VerbRead, types.VerbUpdate}, + run: func(t *testing.T, ctx context.Context, clt testClient) { + // Ensure that virtual presets can be retrieved then upserted. + + // Kube preset + kube1, err := clt.ServiceUnderTest.GetHealthCheckConfig(ctx, &healthcheckconfigv1.GetHealthCheckConfigRequest{ + Name: teleport.VirtualDefaultHealthCheckConfigKubeName, + }) + require.NoError(t, err) + require.Equal(t, services.VirtualDefaultHealthCheckConfigKube(), kube1) + kube1.Spec.Interval = durationpb.New(99 * time.Second) + kube2, err := clt.ServiceUnderTest.UpsertHealthCheckConfig(ctx, &healthcheckconfigv1.UpsertHealthCheckConfigRequest{ + Config: kube1, + }) + require.NoError(t, err) + require.Equal(t, kube1.Spec.Interval.AsDuration(), kube2.Spec.Interval.AsDuration()) + }, + }, + { + name: "Delete", + verbs: []string{types.VerbDelete}, + run: func(t *testing.T, ctx context.Context, clt testClient) { + // Ensure that virtual presets can be deleted without error. + _, err := clt.ServiceUnderTest.DeleteHealthCheckConfig(ctx, &healthcheckconfigv1.DeleteHealthCheckConfigRequest{ + Name: teleport.VirtualDefaultHealthCheckConfigKubeName, + }) + require.NoError(t, err) + }, + }, + } + for _, test := range tests { + clt, ctx := newPresetService(t, test.verbs...) + test.run(t, ctx, clt) + } +} diff --git a/lib/auth/init.go b/lib/auth/init.go index 676d0c08b99b9..de6ccb639ddc5 100644 --- a/lib/auth/init.go +++ b/lib/auth/init.go @@ -1492,18 +1492,10 @@ func createPresetDatabaseObjectImportRule(ctx context.Context, rules services.Da // createPresetHealthCheckConfig creates a default preset health check config // resource that enables health checks on all resources. func createPresetHealthCheckConfig(ctx context.Context, svc services.HealthCheckConfig) error { - page, _, err := svc.ListHealthCheckConfigs(ctx, 0, "") - if err != nil { - return trace.Wrap(err, "failed listing available health check configs") - } - if len(page) > 0 { - return nil - } preset := services.NewPresetHealthCheckConfig() - _, err = svc.CreateHealthCheckConfig(ctx, preset) - if err != nil && !trace.IsAlreadyExists(err) { + if _, err := svc.CreateHealthCheckConfig(ctx, preset); err != nil && !trace.IsAlreadyExists(err) { return trace.Wrap(err, - "failed creating preset health_check_config %s", + "failed creating preset "+types.KindHealthCheckConfig+" %+q", preset.GetMetadata().GetName(), ) } diff --git a/lib/services/local/events.go b/lib/services/local/events.go index 97403f9c0d2a2..69f60720e55b2 100644 --- a/lib/services/local/events.go +++ b/lib/services/local/events.go @@ -20,6 +20,7 @@ package local import ( "context" + "errors" "log/slog" "slices" "strings" @@ -344,6 +345,10 @@ func (w *watcher) parseEvent(e backend.Event) ([]types.Event, []error) { if p.match(e.Item.Key) { resource, err := p.parse(e) if err != nil { + if eo := parseEventOverrideError(nil); errors.As(err, &eo) { + events = append(events, eo...) + continue + } errs = append(errs, trace.Wrap(err)) continue } @@ -403,7 +408,10 @@ func (w *watcher) Close() error { // resourceParser is an interface // for parsing resource from backend byte event stream type resourceParser interface { - // parse parses resource from the backend event + // parse parses resource from the backend event; if the returned error is a + // parseEventOverrideError then the returned resource is ignored and the + // events in the parseEventOverrideError will be added to the generated + // events instead. parse(event backend.Event) (types.Resource, error) // match returns true if event key matches match(key backend.Key) bool @@ -411,6 +419,10 @@ type resourceParser interface { prefixes() []backend.Key } +type parseEventOverrideError []types.Event + +func (p parseEventOverrideError) Error() string { return "parseEventOverrideError" } + // baseParser is a partial implementation of resourceParser for the most common // resource types (stored under a static prefix). type baseParser struct { diff --git a/lib/services/local/health_check_config.go b/lib/services/local/health_check_config.go index 869dec40f7532..63c193d684596 100644 --- a/lib/services/local/health_check_config.go +++ b/lib/services/local/health_check_config.go @@ -20,13 +20,17 @@ package local import ( "context" + "iter" + "strings" "github.com/gravitational/trace" + "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" healthcheckconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/healthcheckconfig/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/backend" + iterstream "github.com/gravitational/teleport/lib/itertools/stream" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/services/local/generic" ) @@ -58,8 +62,42 @@ func NewHealthCheckConfigService(b backend.Backend) (*HealthCheckConfigService, }, nil } +func (*HealthCheckConfigService) hasVirtualResource(name string) bool { + switch name { + case teleport.VirtualDefaultHealthCheckConfigKubeName: + return true + default: + return false + } +} + +func (*HealthCheckConfigService) getVirtualResource(name string) *healthcheckconfigv1.HealthCheckConfig { + switch name { + case teleport.VirtualDefaultHealthCheckConfigKubeName: + return services.VirtualDefaultHealthCheckConfigKube() + default: + return nil + } +} + +func (*HealthCheckConfigService) rangeVirtualResources(start string) iter.Seq2[*healthcheckconfigv1.HealthCheckConfig, error] { + return func(yield func(*healthcheckconfigv1.HealthCheckConfig, error) bool) { + switch { + case start <= teleport.VirtualDefaultHealthCheckConfigKubeName: + if !yield(services.VirtualDefaultHealthCheckConfigKube(), nil) { + return + } + fallthrough + default: + } + } +} + // CreateHealthCheckConfig creates a new HealthCheckConfig resource. func (s *HealthCheckConfigService) CreateHealthCheckConfig(ctx context.Context, config *healthcheckconfigv1.HealthCheckConfig) (*healthcheckconfigv1.HealthCheckConfig, error) { + // we don't need to check if config refers to a potentially virtual resource + // because creating on top of a virtual resource is very convenient so it's + // a break from the semantics of Create that we want to allow created, err := s.svc.CreateResource(ctx, config) return created, trace.Wrap(err) } @@ -68,6 +106,11 @@ func (s *HealthCheckConfigService) CreateHealthCheckConfig(ctx context.Context, func (s *HealthCheckConfigService) GetHealthCheckConfig(ctx context.Context, name string) (*healthcheckconfigv1.HealthCheckConfig, error) { item, err := s.svc.GetResource(ctx, name) if err != nil { + if trace.IsNotFound(err) { + if virtualResource := s.getVirtualResource(name); virtualResource != nil { + return virtualResource, nil + } + } return nil, trace.Wrap(err) } return item, nil @@ -75,16 +118,44 @@ func (s *HealthCheckConfigService) GetHealthCheckConfig(ctx context.Context, nam // ListHealthCheckConfigs returns a paginated list of HealthCheckConfig resources. func (s *HealthCheckConfigService) ListHealthCheckConfigs(ctx context.Context, pageSize int, pageToken string) ([]*healthcheckconfigv1.HealthCheckConfig, string, error) { - items, nextKey, err := s.svc.ListResources(ctx, pageSize, pageToken) + items, nextPageToken, err := generic.CollectPageAndCursor( + iterstream.MergeStreamsWithPriority( + s.svc.Resources(ctx, pageToken, ""), + s.rangeVirtualResources(pageToken), + func(a, b *healthcheckconfigv1.HealthCheckConfig) int { + return strings.Compare(a.GetMetadata().GetName(), b.GetMetadata().GetName()) + }, + ), + pageSize, + func(v *healthcheckconfigv1.HealthCheckConfig) string { + return v.GetMetadata().GetName() + }, + ) if err != nil { return nil, "", trace.Wrap(err) } - return items, nextKey, nil + return items, nextPageToken, nil } // UpdateHealthCheckConfig updates an existing HealthCheckConfig resource. func (s *HealthCheckConfigService) UpdateHealthCheckConfig(ctx context.Context, config *healthcheckconfigv1.HealthCheckConfig) (*healthcheckconfigv1.HealthCheckConfig, error) { + if virtualResource := s.getVirtualResource(config.GetMetadata().GetName()); virtualResource != nil && config.GetMetadata().GetRevision() == virtualResource.GetMetadata().GetRevision() { + // a (successful) conditional update on a virtual resource is a create + // in storage; no real storage item is going to have the same revision + // as the virtual resource, so this must be a conditional update on the + // virtual resource that we know of + created, err := s.svc.CreateResource(ctx, config) + if err != nil { + if trace.IsAlreadyExists(err) { + return nil, trace.Wrap(backend.ErrIncorrectRevision) + } + return nil, trace.Wrap(err) + } + return created, nil + } + // if this was intended to be a conditional update on a virtual resource it + // was not a successful one, but we can let the storage deal with it updated, err := s.svc.ConditionalUpdateResource(ctx, config) return updated, trace.Wrap(err) } @@ -97,7 +168,13 @@ func (s *HealthCheckConfigService) UpsertHealthCheckConfig(ctx context.Context, // DeleteHealthCheckConfig removes the specified HealthCheckConfig resource. func (s *HealthCheckConfigService) DeleteHealthCheckConfig(ctx context.Context, name string) error { - return trace.Wrap(s.svc.DeleteResource(ctx, name)) + err := s.svc.DeleteResource(ctx, name) + if trace.IsNotFound(err) && s.hasVirtualResource(name) { + // we want to allow deleting virtual resources as a noop even if it's a + // little break from the standard semantics of Delete + return nil + } + return trace.Wrap(err) } // DeleteAllHealthCheckConfigs removes all HealthCheckConfig resources. @@ -105,29 +182,43 @@ func (s *HealthCheckConfigService) DeleteAllHealthCheckConfigs(ctx context.Conte return trace.Wrap(s.svc.DeleteAllResources(ctx)) } -func newHealthCheckConfigParser() *healthCheckConfigParser { - return &healthCheckConfigParser{ - baseParser: newBaseParser(backend.NewKey(healthCheckConfigPrefix)), +func newHealthCheckConfigParser() resourceParser { + return healthCheckConfigParser{} +} + +type healthCheckConfigParser struct{} + +func (healthCheckConfigParser) prefixes() []backend.Key { + return []backend.Key{ + backend.ExactKey(healthCheckConfigPrefix), } } -type healthCheckConfigParser struct { - baseParser +func (healthCheckConfigParser) match(key backend.Key) bool { + return strings.HasPrefix(key.String(), backend.SeparatorString+healthCheckConfigPrefix+backend.SeparatorString) } func (healthCheckConfigParser) parse(event backend.Event) (types.Resource, error) { switch event.Type { case types.OpDelete: - components := event.Item.Key.Components() - if len(components) < 2 { - return nil, trace.NotFound("failed parsing %s", event.Item.Key) + key := event.Item.Key.String() + name, found := strings.CutPrefix(key, backend.SeparatorString+healthCheckConfigPrefix+backend.SeparatorString) + if !found { + return nil, trace.NotFound("failed parsing "+types.KindHealthCheckConfig+" key %+q", key) + } + + if virtualResource := (*HealthCheckConfigService)(nil).getVirtualResource(name); virtualResource != nil { + return nil, parseEventOverrideError{{ + Type: types.OpPut, + Resource: types.Resource153ToLegacy(virtualResource), + }} } return &types.ResourceHeader{ Kind: types.KindHealthCheckConfig, Version: types.V1, Metadata: types.Metadata{ - Name: components[1], + Name: name, Namespace: apidefaults.Namespace, }, }, nil diff --git a/lib/services/local/health_check_config_test.go b/lib/services/local/health_check_config_test.go index b3c3e9c11cf31..712604e2dcd77 100644 --- a/lib/services/local/health_check_config_test.go +++ b/lib/services/local/health_check_config_test.go @@ -17,7 +17,6 @@ package local import ( - "context" "slices" "strings" "testing" @@ -35,12 +34,13 @@ import ( "github.com/gravitational/teleport/api/types/healthcheckconfig" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/memory" + iterstream "github.com/gravitational/teleport/lib/itertools/stream" ) func TestHealthCheckConfigService(t *testing.T) { t.Parallel() - ctx := context.Background() + ctx := t.Context() mem, err := memory.New(memory.Config{ Context: ctx, Clock: clockwork.NewFakeClock(), @@ -106,7 +106,7 @@ func TestHealthCheckConfigService(t *testing.T) { t.Run("delete not found", func(t *testing.T) { err := service.DeleteHealthCheckConfig(ctx, "asdf") - require.IsType(t, trace.NotFound(""), err) + require.ErrorIs(t, trace.NotFound("health_check_config \"asdf\" doesn't exist"), err) }) t.Run("delete", func(t *testing.T) { @@ -131,6 +131,17 @@ func TestHealthCheckConfigService(t *testing.T) { }) } +func TestHealthCheckConfigService_VirtualResources(t *testing.T) { + t.Parallel() + + virtualResources, err := iterstream.Collect((*HealthCheckConfigService)(nil).rangeVirtualResources("")) + require.NoError(t, err) + + require.True(t, slices.IsSortedFunc(virtualResources, func(a, b *healthcheckconfigv1.HealthCheckConfig) int { + return strings.Compare(a.GetMetadata().GetName(), b.GetMetadata().GetName()) + }), "expected virtual resources to be sorted") +} + func newHealthCheckConfig(t *testing.T, name string) *healthcheckconfigv1.HealthCheckConfig { t.Helper() cfg, err := healthcheckconfig.NewHealthCheckConfig(name, diff --git a/lib/services/presets.go b/lib/services/presets.go index cf1c3460fd1e1..8276648f3bf7a 100644 --- a/lib/services/presets.go +++ b/lib/services/presets.go @@ -855,15 +855,16 @@ func NewPresetMCPUserRole() types.Role { return role } -// NewPresetHealthCheckConfig returns a preset default health_check_config that -// enables health checks for all resources. +// NewPresetHealthCheckConfig returns a preset health_check_config enabling +// health checks for all databases resources. It's called "default" for +// historical reasons. func NewPresetHealthCheckConfig() *healthcheckconfigv1.HealthCheckConfig { return &healthcheckconfigv1.HealthCheckConfig{ Kind: types.KindHealthCheckConfig, Version: types.V1, Metadata: &headerv1.Metadata{ Name: teleport.PresetDefaultHealthCheckConfigName, - Description: "Enables all health checks by default", + Description: "Enables health checks for all databases by default", Namespace: apidefaults.Namespace, Labels: map[string]string{ types.TeleportInternalResourceType: types.PresetResource, @@ -881,6 +882,32 @@ func NewPresetHealthCheckConfig() *healthcheckconfigv1.HealthCheckConfig { } } +// VirtualDefaultHealthCheckConfigKube returns a health_check_config enabling +// health checks for all Kubernetes resources, intended to be used as a virtual +// default resource. +func VirtualDefaultHealthCheckConfigKube() *healthcheckconfigv1.HealthCheckConfig { + return &healthcheckconfigv1.HealthCheckConfig{ + Kind: types.KindHealthCheckConfig, + Version: types.V1, + Metadata: &headerv1.Metadata{ + Name: teleport.VirtualDefaultHealthCheckConfigKubeName, + Description: "Enables health checks for all Kubernetes clusters by default.", + // this revision MUST be changed every time we change the contents + // of the preset so that conditional updates can check against it + Revision: "d796f007-e60c-4747-8dde-f479aff6b743", + }, + Spec: &healthcheckconfigv1.HealthCheckConfigSpec{ + Match: &healthcheckconfigv1.Matcher{ + // match all kubernetes clusters + KubernetesLabels: []*labelv1.Label{{ + Name: types.Wildcard, + Values: []string{types.Wildcard}, + }}, + }, + }, + } +} + // bootstrapRoleMetadataLabels are metadata labels that will be applied to each role. // These are intended to add labels for older roles that didn't previously have them. func bootstrapRoleMetadataLabels() map[string]map[string]string {