Skip to content

Commit

Permalink
Merge pull request #478 from kubescape/panic
Browse files Browse the repository at this point in the history
avoid panic on shared container data by getting it once
  • Loading branch information
matthyx authored Feb 9, 2025
2 parents e03574f + a3f297f commit ee88fa6
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 94 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/armosec/utils-k8s-go v0.0.30
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cenkalti/backoff/v5 v5.0.1
github.com/cilium/ebpf v0.17.2
github.com/crewjam/rfc5424 v0.1.0
github.com/cyphar/filepath-securejoin v0.4.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEe
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v5 v5.0.1 h1:kGZdCHH1+eW+Yd0wftimjMuhg9zidDvNF5aGdnkkb+U=
github.com/cenkalti/backoff/v5 v5.0.1/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand Down
52 changes: 27 additions & 25 deletions pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff/v5"
mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
Expand Down Expand Up @@ -611,14 +611,15 @@ func (am *ApplicationProfileManager) startApplicationProfiling(ctx context.Conte
ctx, span := otel.Tracer("").Start(ctx, "ApplicationProfileManager.startApplicationProfiling")
defer span.End()

if err := am.waitForSharedContainerData(container.Runtime.ContainerID); err != nil {
sharedData, err := am.waitForSharedContainerData(container.Runtime.ContainerID)
if err != nil {
logger.L().Error("ApplicationProfileManager - container not found in shared data",
helpers.String("container ID", container.Runtime.ContainerID),
helpers.String("k8s workload", k8sContainerID))
return
}

if !am.cfg.EnableRuntimeDetection && am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).PreRunningContainer {
if !am.cfg.EnableRuntimeDetection && sharedData.PreRunningContainer {
logger.L().Debug("ApplicationProfileManager - skip container", helpers.String("reason", "preRunning container"),
helpers.String("container ID", container.Runtime.ContainerID),
helpers.String("k8s workload", k8sContainerID))
Expand All @@ -630,22 +631,22 @@ func (am *ApplicationProfileManager) startApplicationProfiling(ctx context.Conte

watchedContainer := &utils.WatchedContainerData{
ContainerID: container.Runtime.ContainerID,
ImageID: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ImageID,
ImageTag: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ImageTag,
ImageID: sharedData.ImageID,
ImageTag: sharedData.ImageTag,
UpdateDataTicker: time.NewTicker(utils.AddJitter(am.cfg.InitialDelay, am.cfg.MaxJitterPercentage)),
SyncChannel: syncChannel,
K8sContainerID: k8sContainerID,
NsMntId: container.Mntns,
InstanceID: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).InstanceID,
TemplateHash: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).TemplateHash,
Wlid: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).Wlid,
ParentResourceVersion: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ParentResourceVersion,
ContainerInfos: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ContainerInfos,
ParentWorkloadSelector: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ParentWorkloadSelector,
SeccompProfilePath: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).SeccompProfilePath,
ContainerType: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ContainerType,
ContainerIndex: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ContainerIndex,
PreRunningContainer: am.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).PreRunningContainer,
InstanceID: sharedData.InstanceID,
TemplateHash: sharedData.TemplateHash,
Wlid: sharedData.Wlid,
ParentResourceVersion: sharedData.ParentResourceVersion,
ContainerInfos: sharedData.ContainerInfos,
ParentWorkloadSelector: sharedData.ParentWorkloadSelector,
SeccompProfilePath: sharedData.SeccompProfilePath,
ContainerType: sharedData.ContainerType,
ContainerIndex: sharedData.ContainerIndex,
PreRunningContainer: sharedData.PreRunningContainer,
}

if err := am.monitorContainer(ctx, container, watchedContainer); err != nil {
Expand All @@ -662,21 +663,22 @@ func (am *ApplicationProfileManager) waitForContainer(k8sContainerID string) err
if am.removedContainers.Contains(k8sContainerID) {
return fmt.Errorf("container %s has been removed", k8sContainerID)
}
return backoff.Retry(func() error {
_, err := backoff.Retry(context.Background(), func() (any, error) {
if am.trackedContainers.Contains(k8sContainerID) {
return nil
return nil, nil
}
return fmt.Errorf("container %s not found", k8sContainerID)
}, backoff.NewExponentialBackOff())
return nil, fmt.Errorf("container %s not found", k8sContainerID)
}, backoff.WithBackOff(backoff.NewExponentialBackOff()))
return err
}

func (am *ApplicationProfileManager) waitForSharedContainerData(containerID string) error {
return backoff.Retry(func() error {
if am.k8sObjectCache.GetSharedContainerData(containerID) != nil {
return nil
func (am *ApplicationProfileManager) waitForSharedContainerData(containerID string) (*utils.WatchedContainerData, error) {
return backoff.Retry(context.Background(), func() (*utils.WatchedContainerData, error) {
if sharedData := am.k8sObjectCache.GetSharedContainerData(containerID); sharedData != nil {
return sharedData, nil
}
return fmt.Errorf("container %s not found in shared data", containerID)
}, backoff.NewExponentialBackOff())
return nil, fmt.Errorf("container %s not found in shared data", containerID)
}, backoff.WithBackOff(backoff.NewExponentialBackOff()))
}

func (am *ApplicationProfileManager) ContainerCallback(notif containercollection.PubSubEvent) {
Expand Down
39 changes: 20 additions & 19 deletions pkg/malwaremanager/v1/malware_manager.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
package malwaremanager

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/cenkalti/backoff"
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/exporters"
"github.com/kubescape/node-agent/pkg/k8sclient"
"github.com/kubescape/node-agent/pkg/malwaremanager"
clamavv1 "github.com/kubescape/node-agent/pkg/malwaremanager/v1/clamav"
"github.com/kubescape/node-agent/pkg/metricsmanager"
"github.com/kubescape/node-agent/pkg/objectcache"
"github.com/kubescape/node-agent/pkg/utils"

"github.com/cenkalti/backoff/v5"
mapset "github.com/deckarep/golang-set/v2"
"github.com/dustin/go-humanize"
"github.com/goradd/maps"
Expand All @@ -24,7 +16,15 @@ import (
traceropentype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/types"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
events "github.com/kubescape/node-agent/pkg/ebpf/events"
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/ebpf/events"
"github.com/kubescape/node-agent/pkg/exporters"
"github.com/kubescape/node-agent/pkg/k8sclient"
"github.com/kubescape/node-agent/pkg/malwaremanager"
clamavv1 "github.com/kubescape/node-agent/pkg/malwaremanager/v1/clamav"
"github.com/kubescape/node-agent/pkg/metricsmanager"
"github.com/kubescape/node-agent/pkg/objectcache"
"github.com/kubescape/node-agent/pkg/utils"
)

const (
Expand Down Expand Up @@ -114,13 +114,14 @@ func (mm *MalwareManager) ContainerCallback(notif containercollection.PubSubEven

func (mm *MalwareManager) startMalwareManager(container *containercollection.Container) {
// Wait for the shared container data to be available
if err := mm.waitForSharedContainerData(container.Runtime.ContainerID); err != nil {
sharedData, err := mm.waitForSharedContainerData(container.Runtime.ContainerID)
if err != nil {
logger.L().Error("MalwareManager - failed to get shared container data", helpers.Error(err))
return
}
podID := utils.CreateK8sPodID(container.K8s.Namespace, container.K8s.PodName)
if !mm.podToWlid.Has(podID) {
w := mm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).Wlid
w := sharedData.Wlid
if w != "" {
mm.podToWlid.Set(podID, w)
} else {
Expand All @@ -129,13 +130,13 @@ func (mm *MalwareManager) startMalwareManager(container *containercollection.Con
}
}

func (mm *MalwareManager) waitForSharedContainerData(containerID string) error {
return backoff.Retry(func() error {
if mm.k8sObjectCache.GetSharedContainerData(containerID) != nil {
return nil
func (mm *MalwareManager) waitForSharedContainerData(containerID string) (*utils.WatchedContainerData, error) {
return backoff.Retry(context.Background(), func() (*utils.WatchedContainerData, error) {
if sharedData := mm.k8sObjectCache.GetSharedContainerData(containerID); sharedData != nil {
return sharedData, nil
}
return fmt.Errorf("container %s not found in shared data", containerID)
}, backoff.NewExponentialBackOff())
return nil, fmt.Errorf("container %s not found in shared data", containerID)
}, backoff.WithBackOff(backoff.NewExponentialBackOff()))
}

func (mm *MalwareManager) ReportEvent(eventType utils.EventType, event utils.K8sEvent) {
Expand Down
58 changes: 30 additions & 28 deletions pkg/networkmanager/v2/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff/v5"
mapset "github.com/deckarep/golang-set/v2"
"github.com/google/uuid"
"github.com/goradd/maps"
Expand Down Expand Up @@ -68,15 +68,6 @@ func CreateNetworkManager(ctx context.Context, cfg config.Config, clusterName st
}
}

func (nm *NetworkManager) waitForSharedContainerData(containerID string) error {
return backoff.Retry(func() error {
if nm.k8sObjectCache.GetSharedContainerData(containerID) != nil {
return nil
}
return fmt.Errorf("container %s not found in shared data", containerID)
}, backoff.NewExponentialBackOff())
}

func (nm *NetworkManager) deleteResources(watchedContainer *utils.WatchedContainerData) {
// make sure we don't run deleteResources and saveProfile at the same time
nm.containerMutexes.Lock(watchedContainer.K8sContainerID)
Expand Down Expand Up @@ -389,14 +380,15 @@ func (nm *NetworkManager) startNetworkMonitoring(ctx context.Context, container
ctx, span := otel.Tracer("").Start(ctx, "NetworkManager.startNetworkMonitoring")
defer span.End()

if err := nm.waitForSharedContainerData(container.Runtime.ContainerID); err != nil {
sharedData, err := nm.waitForSharedContainerData(container.Runtime.ContainerID)
if err != nil {
logger.L().Error("NetworkManager - failed to get shared container data", helpers.Error(err),
helpers.String("container ID", container.Runtime.ContainerID),
helpers.String("k8s workload", k8sContainerID))
return
}

if !nm.cfg.EnableRuntimeDetection && nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).PreRunningContainer {
if !nm.cfg.EnableRuntimeDetection && sharedData.PreRunningContainer {
logger.L().Debug("NetworkManager - skip container", helpers.String("reason", "preRunning container"),
helpers.String("container ID", container.Runtime.ContainerID),
helpers.String("k8s workload", k8sContainerID))
Expand All @@ -408,22 +400,22 @@ func (nm *NetworkManager) startNetworkMonitoring(ctx context.Context, container

watchedContainer := &utils.WatchedContainerData{
ContainerID: container.Runtime.ContainerID,
ImageID: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ImageID,
ImageTag: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ImageTag,
ImageID: sharedData.ImageID,
ImageTag: sharedData.ImageTag,
UpdateDataTicker: time.NewTicker(utils.AddJitter(nm.cfg.InitialDelay, nm.cfg.MaxJitterPercentage)),
SyncChannel: syncChannel,
K8sContainerID: k8sContainerID,
NsMntId: container.Mntns,
InstanceID: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).InstanceID,
TemplateHash: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).TemplateHash,
Wlid: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).Wlid,
ParentResourceVersion: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ParentResourceVersion,
ContainerInfos: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ContainerInfos,
ParentWorkloadSelector: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ParentWorkloadSelector,
SeccompProfilePath: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).SeccompProfilePath,
ContainerType: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ContainerType,
ContainerIndex: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).ContainerIndex,
PreRunningContainer: nm.k8sObjectCache.GetSharedContainerData(container.Runtime.ContainerID).PreRunningContainer,
InstanceID: sharedData.InstanceID,
TemplateHash: sharedData.TemplateHash,
Wlid: sharedData.Wlid,
ParentResourceVersion: sharedData.ParentResourceVersion,
ContainerInfos: sharedData.ContainerInfos,
ParentWorkloadSelector: sharedData.ParentWorkloadSelector,
SeccompProfilePath: sharedData.SeccompProfilePath,
ContainerType: sharedData.ContainerType,
ContainerIndex: sharedData.ContainerIndex,
PreRunningContainer: sharedData.PreRunningContainer,
}

if err := nm.monitorContainer(ctx, container, watchedContainer); err != nil {
Expand All @@ -440,12 +432,22 @@ func (nm *NetworkManager) waitForContainer(k8sContainerID string) error {
if nm.removedContainers.Contains(k8sContainerID) {
return fmt.Errorf("container %s has been removed", k8sContainerID)
}
return backoff.Retry(func() error {
_, err := backoff.Retry(context.Background(), func() (any, error) {
if nm.trackedContainers.Contains(k8sContainerID) {
return nil
return nil, nil
}
return nil, fmt.Errorf("container %s not found", k8sContainerID)
}, backoff.WithBackOff(backoff.NewExponentialBackOff()))
return err
}

func (nm *NetworkManager) waitForSharedContainerData(containerID string) (*utils.WatchedContainerData, error) {
return backoff.Retry(context.Background(), func() (*utils.WatchedContainerData, error) {
if sharedData := nm.k8sObjectCache.GetSharedContainerData(containerID); sharedData != nil {
return sharedData, nil
}
return fmt.Errorf("container %s not found", k8sContainerID)
}, backoff.NewExponentialBackOff())
return nil, fmt.Errorf("container %s not found in shared data", containerID)
}, backoff.WithBackOff(backoff.NewExponentialBackOff()))
}

func (nm *NetworkManager) ContainerCallback(notif containercollection.PubSubEvent) {
Expand Down
18 changes: 10 additions & 8 deletions pkg/rulemanager/v1/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
backoffv5 "github.com/cenkalti/backoff/v5"
mapset "github.com/deckarep/golang-set/v2"
"github.com/dustin/go-humanize"
"github.com/goradd/maps"
Expand Down Expand Up @@ -154,14 +155,15 @@ func (rm *RuleManager) monitorContainer(container *containercollection.Container
}

func (rm *RuleManager) startRuleManager(container *containercollection.Container, k8sContainerID string) {
if err := rm.waitForSharedContainerData(container.Runtime.ContainerID); err != nil {
sharedData, err := rm.waitForSharedContainerData(container.Runtime.ContainerID)
if err != nil {
logger.L().Error("RuleManager - failed to get shared container data", helpers.Error(err))
return
}

podID := utils.CreateK8sPodID(container.K8s.Namespace, container.K8s.PodName)
if !rm.podToWlid.Has(podID) {
w := rm.objectCache.K8sObjectCache().GetSharedContainerData(container.Runtime.ContainerID).Wlid
w := sharedData.Wlid
if w != "" {
rm.podToWlid.Set(podID, w)
} else {
Expand Down Expand Up @@ -210,13 +212,13 @@ func (rm *RuleManager) ContainerCallback(notif containercollection.PubSubEvent)
}
}

func (rm *RuleManager) waitForSharedContainerData(containerID string) error {
return backoff.Retry(func() error {
if rm.objectCache.K8sObjectCache().GetSharedContainerData(containerID) != nil {
return nil
func (rm *RuleManager) waitForSharedContainerData(containerID string) (*utils.WatchedContainerData, error) {
return backoffv5.Retry(context.Background(), func() (*utils.WatchedContainerData, error) {
if sharedData := rm.objectCache.K8sObjectCache().GetSharedContainerData(containerID); sharedData != nil {
return sharedData, nil
}
return fmt.Errorf("container %s not found in shared data", containerID)
}, backoff.NewExponentialBackOff())
return nil, fmt.Errorf("container %s not found in shared data", containerID)
}, backoffv5.WithBackOff(backoffv5.NewExponentialBackOff()))
}

func (rm *RuleManager) RegisterPeekFunc(peek func(mntns uint64) ([]string, error)) {
Expand Down
Loading

0 comments on commit ee88fa6

Please sign in to comment.