Skip to content

Commit

Permalink
Merge pull request #474 from kubescape/watch
Browse files Browse the repository at this point in the history
use fullSpec to simplify List() and Watch() for storage
  • Loading branch information
matthyx authored Feb 7, 2025
2 parents b178bb4 + 70d653b commit 3c6256e
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 72 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/kubescape/backend v0.0.25
github.com/kubescape/go-logger v0.0.23
github.com/kubescape/k8s-interface v0.0.183
github.com/kubescape/storage v0.0.141
github.com/kubescape/storage v0.0.156
github.com/moby/sys/mountinfo v0.7.2
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
Expand Down Expand Up @@ -295,6 +295,7 @@ require (
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/yl2chen/cidranger v1.0.2 // indirect
go.mongodb.org/mongo-driver v1.17.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,8 @@ github.com/kubescape/go-logger v0.0.23 h1:5xh+Nm8eGImhFbtippRKLaFgsvlKE1ufvQhNM2
github.com/kubescape/go-logger v0.0.23/go.mod h1:Ayg7g769c7sXVB+P3fkJmbsJpoEmMmaUf9jeo+XuC3U=
github.com/kubescape/k8s-interface v0.0.183 h1:eTuHlKJkBYYA03AR/YGr4KUC+xnbV6SG0/8+yrt9Yrs=
github.com/kubescape/k8s-interface v0.0.183/go.mod h1:YjIAQtrK4nCy+XQ/6jwo+BqlLyJk7DN2Mx4pUcbzq10=
github.com/kubescape/storage v0.0.141 h1:dck2qWHtlui6T4rUoV3U9O/BNZybxlDN7Vrlnow4CSg=
github.com/kubescape/storage v0.0.141/go.mod h1:oqdNN8gANL58jagGDsEbRiWskvKK0s/ckdqrHZnG+Vw=
github.com/kubescape/storage v0.0.156 h1:5ioAOufZuSev6s6yLbFoyJjVrOIV6ckYHM15lLyba+E=
github.com/kubescape/storage v0.0.156/go.mod h1:7ai5ePqTXdSTCGjpEHVFXKggrbey/guM5e13w7Y3yMw=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
Expand Down Expand Up @@ -1077,6 +1077,8 @@ github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavM
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yl2chen/cidranger v1.0.2 h1:lbOWZVCG1tCRX4u24kuM1Tb4nHqWkDxwLdoS+SevawU=
github.com/yl2chen/cidranger v1.0.2/go.mod h1:9U1yz7WPYDwf0vpNWFaeRh0bjwz5RVgRy/9UEQfHl0g=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
21 changes: 4 additions & 17 deletions pkg/objectcache/applicationprofilecache/applicationprofilecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,8 @@ func (ap *ApplicationProfileCacheImpl) handleUserManagedProfile(appProfile *v1be
baseProfileName := strings.TrimPrefix(appProfile.GetName(), "ug-")
baseProfileUniqueName := objectcache.UniqueName(appProfile.GetNamespace(), baseProfileName)

// Get the full user managed profile from the storage
userManagedProfile, err := ap.getApplicationProfile(appProfile.GetNamespace(), appProfile.GetName())
if err != nil {
logger.L().Warning("ApplicationProfileCacheImpl - failed to get full application profile", helpers.Error(err))
return
}

// Store the user-managed profile temporarily
ap.userManagedProfiles.Set(baseProfileUniqueName, userManagedProfile)
ap.userManagedProfiles.Set(baseProfileUniqueName, appProfile)

// If we have the base profile cached, fetch a fresh copy and merge.
// If the base profile is not cached yet, the merge will be attempted when it's added.
Expand All @@ -102,7 +95,7 @@ func (ap *ApplicationProfileCacheImpl) handleUserManagedProfile(appProfile *v1be
return
}

mergedProfile := ap.performMerge(freshBaseProfile, userManagedProfile)
mergedProfile := ap.performMerge(freshBaseProfile, appProfile)
ap.slugToAppProfile.Set(baseProfileUniqueName, mergedProfile)

// Clean up the user-managed profile after successful merge
Expand Down Expand Up @@ -288,22 +281,16 @@ func (ap *ApplicationProfileCacheImpl) removeContainer(containerID string) {
// ------------------ watch application profile methods -----------------------

func (ap *ApplicationProfileCacheImpl) addFullApplicationProfile(appProfile *v1beta1.ApplicationProfile, apName string) {
fullAP, err := ap.getApplicationProfile(appProfile.GetNamespace(), appProfile.GetName())
if err != nil {
logger.L().Warning("ApplicationProfileCacheImpl - failed to get full application profile", helpers.Error(err))
return
}

// Check if there's a pending user-managed profile to merge
if ap.userManagedProfiles.Has(apName) {
userManagedProfile := ap.userManagedProfiles.Get(apName)
fullAP = ap.performMerge(fullAP, userManagedProfile)
appProfile = ap.performMerge(appProfile, userManagedProfile)
// Clean up the user-managed profile after successful merge
ap.userManagedProfiles.Delete(apName)
logger.L().Debug("ApplicationProfileCacheImpl - merged pending user-managed profile", helpers.String("name", apName))
}

ap.slugToAppProfile.Set(apName, fullAP)
ap.slugToAppProfile.Set(apName, appProfile)
for _, i := range ap.slugToContainers.Get(apName).ToSlice() {
ap.containerToSlug.Set(i, apName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,8 @@ func (nn *NetworkNeighborhoodCacheImpl) handleUserManagedNN(netNeighborhood *v1b
baseNNName := strings.TrimPrefix(netNeighborhood.GetName(), "ug-")
baseNNUniqueName := objectcache.UniqueName(netNeighborhood.GetNamespace(), baseNNName)

// Get the full user managed network neighborhood from the storage
userManagedNN, err := nn.getNetworkNeighborhood(netNeighborhood.GetNamespace(), netNeighborhood.GetName())
if err != nil {
logger.L().Warning("NetworkNeighborhoodCacheImpl - failed to get full network neighborhood", helpers.Error(err))
return
}

// Store the user-managed network neighborhood temporarily
nn.userManagedNetworkNeighborhood.Set(baseNNUniqueName, userManagedNN)
nn.userManagedNetworkNeighborhood.Set(baseNNUniqueName, netNeighborhood)

// If we have the base network neighborhood cached, fetch a fresh copy and merge.
// If the base network neighborhood is not cached yet, the merge will be attempted when it's added.
Expand All @@ -100,7 +93,7 @@ func (nn *NetworkNeighborhoodCacheImpl) handleUserManagedNN(netNeighborhood *v1b
return
}

mergedNN := nn.performMerge(freshBaseNN, userManagedNN)
mergedNN := nn.performMerge(freshBaseNN, netNeighborhood)
nn.slugToNetworkNeighborhood.Set(baseNNUniqueName, mergedNN)

// Clean up the user-managed network neighborhood after successful merge
Expand Down Expand Up @@ -285,22 +278,16 @@ func (nn *NetworkNeighborhoodCacheImpl) removeContainer(containerID string) {
// ------------------ watch network neighborhood methods -----------------------

func (nn *NetworkNeighborhoodCacheImpl) addFullNetworkNeighborhood(netNeighborhood *v1beta1.NetworkNeighborhood, nnName string) {
fullNN, err := nn.getNetworkNeighborhood(netNeighborhood.GetNamespace(), netNeighborhood.GetName())
if err != nil {
logger.L().Warning("NetworkNeighborhoodCacheImpl - failed to get full network neighborhood", helpers.Error(err))
return
}

// Check if there's a pending user-managed network neighborhood to merge
if nn.userManagedNetworkNeighborhood.Has(nnName) {
userManagedNN := nn.userManagedNetworkNeighborhood.Get(nnName)
fullNN = nn.performMerge(fullNN, userManagedNN)
netNeighborhood = nn.performMerge(netNeighborhood, userManagedNN)
// Clean up the user-managed network neighborhood after successful merge
nn.userManagedNetworkNeighborhood.Delete(nnName)
logger.L().Debug("NetworkNeighborhoodCacheImpl - merged pending user-managed network neighborhood", helpers.String("name", nnName))
}

nn.slugToNetworkNeighborhood.Set(nnName, fullNN)
nn.slugToNetworkNeighborhood.Set(nnName, netNeighborhood)
for _, i := range nn.slugToContainers.Get(nnName).ToSlice() {
nn.containerToSlug.Set(i, nnName)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/v1/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/storage"
"github.com/kubescape/storage/pkg/apis/softwarecomposition"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
"github.com/kubescape/storage/pkg/generated/clientset/versioned"
"github.com/kubescape/storage/pkg/generated/clientset/versioned/fake"
Expand Down Expand Up @@ -155,7 +156,7 @@ func (sc Storage) GetSBOM(name string) (*v1beta1.SBOMSyft, error) {
}

func (sc Storage) GetSBOMMeta(name string) (*v1beta1.SBOMSyft, error) {
return sc.StorageClient.SBOMSyfts(sc.namespace).Get(context.Background(), name, metav1.GetOptions{ResourceVersion: "metadata"})
return sc.StorageClient.SBOMSyfts(sc.namespace).Get(context.Background(), name, metav1.GetOptions{ResourceVersion: softwarecomposition.ResourceVersionMetadata})
}

func (sc Storage) ReplaceSBOM(SBOM *v1beta1.SBOMSyft) (*v1beta1.SBOMSyft, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/watcher/dynamicwatcher/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kubescape/node-agent/pkg/k8sclient"
"github.com/kubescape/node-agent/pkg/watcher"
"github.com/kubescape/node-agent/pkg/watcher/cooldownqueue"
"github.com/kubescape/storage/pkg/apis/softwarecomposition"
spdxv1beta1 "github.com/kubescape/storage/pkg/generated/clientset/versioned/typed/softwarecomposition/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/pager"
Expand Down Expand Up @@ -134,14 +135,17 @@ func (wh *WatchHandler) Stop(_ context.Context) {
func (wh *WatchHandler) chooseWatcher(res schema.GroupVersionResource, opts metav1.ListOptions) (watch.Interface, error) {
switch res.Resource {
case "applicationprofiles":
opts.ResourceVersion = softwarecomposition.ResourceVersionFullSpec
return wh.storageClient.ApplicationProfiles("").Watch(context.Background(), opts)
case "networkneighborhoods":
opts.ResourceVersion = softwarecomposition.ResourceVersionFullSpec
return wh.storageClient.NetworkNeighborhoods("").Watch(context.Background(), opts)
case "pods":
return wh.k8sClient.GetKubernetesClient().CoreV1().Pods("").Watch(context.Background(), opts)
case "runtimerulealertbindings":
return wh.k8sClient.GetDynamicClient().Resource(res).Namespace("").Watch(context.Background(), opts)
case "seccompprofiles":
opts.ResourceVersion = softwarecomposition.ResourceVersionFullSpec
return wh.storageClient.SeccompProfiles("").Watch(context.Background(), opts)
case "operatorcommands":
return wh.k8sClient.GetDynamicClient().Resource(res).Namespace("").Watch(context.Background(), opts)
Expand Down Expand Up @@ -212,14 +216,17 @@ func (wh *WatchHandler) watchRetry(ctx context.Context, res schema.GroupVersionR
func (wh *WatchHandler) chooseLister(res schema.GroupVersionResource, opts metav1.ListOptions) (runtime.Object, error) {
switch res.Resource {
case "applicationprofiles":
opts.ResourceVersion = softwarecomposition.ResourceVersionFullSpec
return wh.storageClient.ApplicationProfiles("").List(context.Background(), opts)
case "networkneighborhoods":
opts.ResourceVersion = softwarecomposition.ResourceVersionFullSpec
return wh.storageClient.NetworkNeighborhoods("").List(context.Background(), opts)
case "pods":
return wh.k8sClient.GetKubernetesClient().CoreV1().Pods("").List(context.Background(), opts)
case "runtimerulealertbindings":
return wh.k8sClient.GetDynamicClient().Resource(res).Namespace("").List(context.Background(), opts)
case "seccompprofiles":
opts.ResourceVersion = softwarecomposition.ResourceVersionFullSpec
return wh.storageClient.SeccompProfiles("").List(context.Background(), opts)
case "operatorcommands":
return wh.k8sClient.GetDynamicClient().Resource(res).Namespace("").List(context.Background(), opts)
Expand Down
24 changes: 2 additions & 22 deletions pkg/watcher/seccompprofilewatcher/seccompprofilewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package seccompprofilewatcher

import (
"context"
"fmt"

"github.com/kubescape/node-agent/pkg/seccompmanager"
"github.com/kubescape/node-agent/pkg/watcher"
Expand Down Expand Up @@ -48,25 +47,15 @@ func (sp *SeccompProfileWatcherImpl) WatchResources() []watcher.WatchResource {
// ------------------ watcher.Watcher methods -----------------------

func (sp *SeccompProfileWatcherImpl) AddHandler(ctx context.Context, obj runtime.Object) {
if _, ok := obj.(*v1beta1api.SeccompProfile); ok {
fullObj, err := sp.getFullSeccompProfile(obj)
if err != nil {
logger.L().Ctx(ctx).Warning("SeccompProfileWatcherImpl - failed to get full seccomp profile", helpers.Error(err))
return
}
if fullObj, ok := obj.(*v1beta1api.SeccompProfile); ok {
if err := sp.seccompManager.AddSeccompProfile(fullObj); err != nil {
logger.L().Ctx(ctx).Warning("SeccompProfileWatcherImpl - failed to add seccomp profile", helpers.Error(err))
}
}
}

func (sp *SeccompProfileWatcherImpl) ModifyHandler(ctx context.Context, obj runtime.Object) {
if _, ok := obj.(*v1beta1api.SeccompProfile); ok {
fullObj, err := sp.getFullSeccompProfile(obj)
if err != nil {
logger.L().Ctx(ctx).Warning("SeccompProfileWatcherImpl - failed to get full seccomp profile", helpers.Error(err))
return
}
if fullObj, ok := obj.(*v1beta1api.SeccompProfile); ok {
if err := sp.seccompManager.AddSeccompProfile(fullObj); err != nil {
logger.L().Ctx(ctx).Warning("SeccompProfileWatcherImpl - failed to modify seccomp profile", helpers.Error(err))
}
Expand All @@ -80,12 +69,3 @@ func (sp *SeccompProfileWatcherImpl) DeleteHandler(ctx context.Context, obj runt
}
}
}

func (sp *SeccompProfileWatcherImpl) getFullSeccompProfile(obj runtime.Object) (*v1beta1api.SeccompProfile, error) {
meta := obj.(metav1.Object)
fullObj, err := sp.storageClient.SeccompProfiles(meta.GetNamespace()).Get(context.Background(), meta.GetName(), metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get full seccomp profile: %w", err)
}
return fullObj, nil
}
2 changes: 1 addition & 1 deletion tests/chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ storage:
name: "storage"
image:
repository: quay.io/kubescape/storage
tag: v0.0.127
tag: v0.0.156
pullPolicy: IfNotPresent
cleanupInterval: "6h"
labels:
Expand Down
53 changes: 42 additions & 11 deletions tests/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/node-agent/pkg/ruleengine/v1"
"github.com/kubescape/node-agent/pkg/utils"
Expand All @@ -24,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
)

func tearDownTest(t *testing.T, startTime time.Time) {
Expand Down Expand Up @@ -413,11 +415,6 @@ func Test_07_RuleBindingApplyTest(t *testing.T) {
assert.NotEqualf(t, 0, exitCode, "Expected error when applying rule binding '%s'", file)
}

// TODO: create a test with an existing app profile and check if the alerts are generated
//func Test_08_BasicAlertTestExistingProfile(t *testing.T) {
//
//}

func Test_08_ApplicationProfilePatching(t *testing.T) {
k8sClient := k8sinterface.NewKubernetesApi()
storageclient := spdxv1beta1client.NewForConfigOrDie(k8sClient.K8SConfig)
Expand Down Expand Up @@ -981,12 +978,12 @@ func Test_13_MergingNetworkNeighborhoodTest(t *testing.T) {
{
Name: "TCP-80",
Protocol: "TCP",
Port: ptr(int32(80)),
Port: ptr.To(int32(80)),
},
{
Name: "TCP-443",
Protocol: "TCP",
Port: ptr(int32(443)),
Port: ptr.To(int32(443)),
},
},
},
Expand All @@ -1003,12 +1000,12 @@ func Test_13_MergingNetworkNeighborhoodTest(t *testing.T) {
{
Name: "TCP-80",
Protocol: "TCP",
Port: ptr(int32(80)),
Port: ptr.To(int32(80)),
},
{
Name: "TCP-443",
Protocol: "TCP",
Port: ptr(int32(443)),
Port: ptr.To(int32(443)),
},
},
},
Expand Down Expand Up @@ -1183,6 +1180,40 @@ func Test_14_RulePoliciesTest(t *testing.T) {
testutils.AssertNotContains(t, alerts, "Symlink Created Over Sensitive File", "ln", "endpoint-traffic")
}

func ptr(i int32) *int32 {
return &i
func Test_15_CompletedApCannotBecomeReadyAgain(t *testing.T) {
k8sClient := k8sinterface.NewKubernetesApi()
storageclient := spdxv1beta1client.NewForConfigOrDie(k8sClient.K8SConfig)

ns := testutils.NewRandomNamespace()
defer func() {
_ = k8sClient.KubernetesClient.CoreV1().Namespaces().Delete(context.Background(), ns.Name, v1.DeleteOptions{})
}()

// create an application profile with completed status
name := "test"
ap1, err := storageclient.ApplicationProfiles(ns.Name).Create(context.TODO(), &v1beta1.ApplicationProfile{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
helpersv1.CompletionMetadataKey: helpersv1.Complete,
helpersv1.StatusMetadataKey: helpersv1.Completed,
},
},
}, v1.CreateOptions{})
require.NoError(t, err)
require.Equal(t, helpersv1.Completed, ap1.Annotations[helpersv1.StatusMetadataKey])

// patch the application profile with ready status
patchOperations := []utils.PatchOperation{
{
Op: "replace",
Path: "/metadata/annotations/" + utils.EscapeJSONPointerElement(helpersv1.StatusMetadataKey),
Value: helpersv1.Ready,
},
}
patch, err := json.Marshal(patchOperations)
require.NoError(t, err)
ap2, err := storageclient.ApplicationProfiles(ns.Name).Patch(context.Background(), name, types.JSONPatchType, patch, v1.PatchOptions{})
assert.NoError(t, err) // patch should succeed
assert.Equal(t, helpersv1.Completed, ap2.Annotations[helpersv1.StatusMetadataKey]) // but the status should not change
}

0 comments on commit 3c6256e

Please sign in to comment.