Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,14 @@ var PresetRoles = []string{PresetEditorRoleName, PresetAccessRoleName, PresetAud

const (
// PresetDefaultHealthCheckConfigName is the name of a preset
// default health_check_config that enables health checks for all resources.
// health_check_config that enables health checks for all database
// resources. For historical reasons, it's named "default" even though it
// applies to databases only.
PresetDefaultHealthCheckConfigName = "default"
// VirtualDefaultHealthCheckConfigKubeName is the name of a virtual
// health_check_config that enables health checks for all Kubernetes
// resources.
VirtualDefaultHealthCheckConfigKubeName = "default_kube"
)

const (
Expand Down
148 changes: 145 additions & 3 deletions lib/auth/healthcheckconfig/healthcheckconfigv1/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/gravitational/teleport"
healthcheckconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/healthcheckconfig/v1"
labelv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/label/v1"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -98,7 +103,7 @@ func TestHealthCheckConfigCRUD(t *testing.T) {
resp, err := clt.ServiceUnderTest.ListHealthCheckConfigs(ctx, &healthcheckconfigv1.ListHealthCheckConfigsRequest{})
if err == nil {
require.NotNil(t, resp)
require.Len(t, resp.Configs, 2, "the test bootstrapped exactly 2 health_check_config resources")
require.Len(t, resp.Configs, 4, "expected 2 inserted, and 2 virtual presets")
}
return err
},
Expand Down Expand Up @@ -199,7 +204,7 @@ func (c *accessTest) run(t *testing.T) {
ctx, clt := c.setup(t, spec)
err := c.actionFn(t, ctx, clt)
require.Error(t, err)
require.IsType(t, trace.AccessDenied(""), err)
require.True(t, trace.IsAccessDenied(err))
})

t.Run(fmt.Sprintf("%s is denied", c.name), func(t *testing.T) {
Expand All @@ -210,7 +215,7 @@ func (c *accessTest) run(t *testing.T) {
ctx, clt := c.setup(t, spec)
err := c.actionFn(t, ctx, clt)
require.Error(t, err)
require.IsType(t, trace.AccessDenied(""), err)
require.True(t, trace.IsAccessDenied(err))
})
}

Expand Down Expand Up @@ -315,3 +320,140 @@ func authorizerForDummyUser(t *testing.T, clt testClient, roleSpecs ...types.Rol
},
})
}

func newPresetService(t *testing.T, verbs ...string) (testClient, context.Context) {
t.Helper()
clt := newService(t, t.Context())
return clt, authorizerForDummyUser(t, clt, types.RoleSpecV6{
Allow: types.RoleConditions{
Rules: []types.Rule{healthCheckConfigRule(verbs...)},
},
})
}

type presetTest struct {
name string
verbs []string
run func(t *testing.T, ctx context.Context, clt testClient)
}

func TestHealthCheckConfigPresets(t *testing.T) {
t.Parallel()

tests := []presetTest{
{
name: "Get",
verbs: []string{types.VerbRead},
run: func(t *testing.T, ctx context.Context, clt testClient) {
// Ensure that virtual presets can be retrieved without being
// explicitly created.

kube, err := clt.ServiceUnderTest.GetHealthCheckConfig(ctx, &healthcheckconfigv1.GetHealthCheckConfigRequest{
Name: teleport.VirtualDefaultHealthCheckConfigKubeName,
})
require.NoError(t, err)
require.Equal(t, services.VirtualDefaultHealthCheckConfigKube(), kube)
},
},
{
name: "Create And Get",
verbs: []string{types.VerbCreate, types.VerbRead},
run: func(t *testing.T, ctx context.Context, clt testClient) {
// Ensure that an explicitly created preset can be retrieved
// without being overwritten by the virtual preset.

// Kube preset
kube1 := services.VirtualDefaultHealthCheckConfigKube()
kube1.Spec.Interval = durationpb.New(99 * time.Second)
_, err := clt.ServiceUnderTest.CreateHealthCheckConfig(ctx, &healthcheckconfigv1.CreateHealthCheckConfigRequest{
Config: kube1,
})
require.NoError(t, err)
kube2, err := clt.ServiceUnderTest.GetHealthCheckConfig(ctx, &healthcheckconfigv1.GetHealthCheckConfigRequest{
Name: teleport.VirtualDefaultHealthCheckConfigKubeName,
})
require.NoError(t, err)
require.Equal(t, kube1.Spec.Interval.AsDuration(), kube2.Spec.Interval.AsDuration())
},
},
{
name: "List",
verbs: []string{types.VerbRead, types.VerbList},
run: func(t *testing.T, ctx context.Context, clt testClient) {
// Ensure that presets can be listed without being
// explicitly created.
resp, err := clt.ServiceUnderTest.ListHealthCheckConfigs(ctx, &healthcheckconfigv1.ListHealthCheckConfigsRequest{})
require.NoError(t, err)
require.NotNil(t, resp)
require.Len(t, resp.Configs, 1, "expected 1 virtual preset")
for _, actual := range resp.Configs {
switch actual.GetMetadata().GetName() {
case teleport.VirtualDefaultHealthCheckConfigKubeName:
require.Empty(t, cmp.Diff(services.VirtualDefaultHealthCheckConfigKube(), actual, protocmp.Transform()))
}
}
},
},
{
name: "Create And List",
verbs: []string{types.VerbCreate, types.VerbRead, types.VerbList},
run: func(t *testing.T, ctx context.Context, clt testClient) {
// Ensure that explicitly created presets can be listed
// without being overwritten by the virtual presets.

kube := services.VirtualDefaultHealthCheckConfigKube()
kube.Spec.Interval = durationpb.New(99 * time.Second)
_, err := clt.ServiceUnderTest.CreateHealthCheckConfig(ctx, &healthcheckconfigv1.CreateHealthCheckConfigRequest{
Config: kube,
})
require.NoError(t, err)

resp, err := clt.ServiceUnderTest.ListHealthCheckConfigs(ctx, &healthcheckconfigv1.ListHealthCheckConfigsRequest{})
require.NoError(t, err)
require.NotNil(t, resp)
require.Len(t, resp.Configs, 1, "expected 1 virtual preset")
for _, actual := range resp.Configs {
switch actual.GetMetadata().GetName() {
case teleport.VirtualDefaultHealthCheckConfigKubeName:
require.Equal(t, kube.Spec.Interval.AsDuration(), actual.Spec.Interval.AsDuration())
}
}
},
},
{
name: "Get And Upsert",
verbs: []string{types.VerbCreate, types.VerbRead, types.VerbUpdate},
run: func(t *testing.T, ctx context.Context, clt testClient) {
// Ensure that virtual presets can be retrieved then upserted.

// Kube preset
kube1, err := clt.ServiceUnderTest.GetHealthCheckConfig(ctx, &healthcheckconfigv1.GetHealthCheckConfigRequest{
Name: teleport.VirtualDefaultHealthCheckConfigKubeName,
})
require.NoError(t, err)
require.Equal(t, services.VirtualDefaultHealthCheckConfigKube(), kube1)
kube1.Spec.Interval = durationpb.New(99 * time.Second)
kube2, err := clt.ServiceUnderTest.UpsertHealthCheckConfig(ctx, &healthcheckconfigv1.UpsertHealthCheckConfigRequest{
Config: kube1,
})
require.NoError(t, err)
require.Equal(t, kube1.Spec.Interval.AsDuration(), kube2.Spec.Interval.AsDuration())
},
},
{
name: "Delete",
verbs: []string{types.VerbDelete},
run: func(t *testing.T, ctx context.Context, clt testClient) {
// Ensure that virtual presets can be deleted without error.
_, err := clt.ServiceUnderTest.DeleteHealthCheckConfig(ctx, &healthcheckconfigv1.DeleteHealthCheckConfigRequest{
Name: teleport.VirtualDefaultHealthCheckConfigKubeName,
})
require.NoError(t, err)
},
},
}
for _, test := range tests {
clt, ctx := newPresetService(t, test.verbs...)
test.run(t, ctx, clt)
}
}
12 changes: 2 additions & 10 deletions lib/auth/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,18 +1492,10 @@ func createPresetDatabaseObjectImportRule(ctx context.Context, rules services.Da
// createPresetHealthCheckConfig creates a default preset health check config
// resource that enables health checks on all resources.
func createPresetHealthCheckConfig(ctx context.Context, svc services.HealthCheckConfig) error {
page, _, err := svc.ListHealthCheckConfigs(ctx, 0, "")
if err != nil {
return trace.Wrap(err, "failed listing available health check configs")
}
if len(page) > 0 {
return nil
}
preset := services.NewPresetHealthCheckConfig()
_, err = svc.CreateHealthCheckConfig(ctx, preset)
if err != nil && !trace.IsAlreadyExists(err) {
if _, err := svc.CreateHealthCheckConfig(ctx, preset); err != nil && !trace.IsAlreadyExists(err) {
return trace.Wrap(err,
"failed creating preset health_check_config %s",
"failed creating preset "+types.KindHealthCheckConfig+" %+q",
preset.GetMetadata().GetName(),
)
}
Expand Down
76 changes: 76 additions & 0 deletions lib/itertools/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,82 @@ func MergeStreams[T any](
}
}

// MergeStreamsWithPriority merges two sorted streams prioritizing streamA when compare indicates items are equal.
func MergeStreamsWithPriority[T any](
streamA Stream[T],
streamB Stream[T],
compare func(a, b T) int,
) Stream[T] {
return func(yield func(T, error) bool) {
var itemA, itemB T

nextA, stopA := iter.Pull2(streamA)
defer stopA()
nextB, stopB := iter.Pull2(streamB)
defer stopB()

itemA, errA, okA := nextA()
itemB, errB, okB := nextB()

// Both streams have items, merge with priority for A
for okA && okB {
if errA != nil {
yield(*new(T), errA)
return
}
if errB != nil {
yield(*new(T), errB)
return
}

switch cmp := compare(itemA, itemB); {
case cmp < 0:
if !yield(itemA, nil) {
return
}
itemA, errA, okA = nextA()

case cmp > 0:
if !yield(itemB, nil) {
return
}
itemB, errB, okB = nextB()

default: // cmp == 0
// Yield A and skip B.
if !yield(itemA, nil) {
return
}
itemA, errA, okA = nextA()
itemB, errB, okB = nextB()
}
}

// Drain
for okA {
if errA != nil {
yield(*new(T), errA)
return
}
if !yield(itemA, nil) {
return
}
itemA, errA, okA = nextA()
}

for okB {
if errB != nil {
yield(*new(T), errB)
return
}
if !yield(itemB, nil) {
return
}
itemB, errB, okB = nextB()
}
}
}

// TakeWhile iterates the stream taking items while predicate returns true
func TakeWhile[T any](stream Stream[T], predicate func(T) bool) Stream[T] {
return func(yield func(T, error) bool) {
Expand Down
Loading
Loading