Skip to content

Commit

Permalink
Host hash sensor (#484)
Browse files Browse the repository at this point in the history
* First commit

Signed-off-by: Ben <[email protected]>

* Adding base host rules

Signed-off-by: Amit Schendel <[email protected]>

* Adding base code of host rule manager

Signed-off-by: Amit Schendel <[email protected]>

* Adding mock

Signed-off-by: Amit Schendel <[email protected]>

* fixing process details

Signed-off-by: Ben <[email protected]>

* Adding main support for rule manager

Signed-off-by: Amit Schendel <[email protected]>

* Removing import

Signed-off-by: Amit Schendel <[email protected]>

* Adding process tree support

Signed-off-by: Amit Schendel <[email protected]>

* Adding fixed config

Signed-off-by: Amit Schendel <[email protected]>

* Chaning log to debug to prevent spamming

Signed-off-by: Amit Schendel <[email protected]>

* Adding new code

Signed-off-by: Amit Schendel <[email protected]>

* Adding rules

Signed-off-by: Amit Schendel <[email protected]>

* Adding fixed rules

Signed-off-by: Amit Schendel <[email protected]>

* Adding path and rules

Signed-off-by: Amit Schendel <[email protected]>

* Commenting read env variable from proc

Signed-off-by: Amit Schendel <[email protected]>

* Added host agent rules (#485)

* Added host agent rules

Signed-off-by: Afek Berger <[email protected]>

* fixed tests & naming

Signed-off-by: Afek Berger <[email protected]>

---------

Signed-off-by: Afek Berger <[email protected]>

* Fixing rule name

Signed-off-by: Amit Schendel <[email protected]>

* Configuring event receiver export

Signed-off-by: Ben <[email protected]>

* fix

Signed-off-by: Ben <[email protected]>

* Fixing test

Signed-off-by: Amit Schendel <[email protected]>

* Changing unused params to _

Signed-off-by: Amit Schendel <[email protected]>

* Adding syscall peeking

Signed-off-by: Amit Schendel <[email protected]>

* Adding rule creation at constructor

Signed-off-by: Amit Schendel <[email protected]>

* Adding generic cooldown queue

Signed-off-by: Amit Schendel <[email protected]>

* implementing caching in the hosthashsensor

Signed-off-by: Ben <[email protected]>

* skip env check when running on host

Signed-off-by: Matthias Bertschy <[email protected]>

* fix kernel version parsing

Signed-off-by: Matthias Bertschy <[email protected]>

* Using temp k8s interface

Signed-off-by: Amit Schendel <[email protected]>

* Adding cloudmetadata to alerts

Signed-off-by: Amit Schendel <[email protected]>

* First commit

Signed-off-by: Ben <[email protected]>

* Adding base host rules

Signed-off-by: Amit Schendel <[email protected]>

* Adding base code of host rule manager

Signed-off-by: Amit Schendel <[email protected]>

* Adding mock

Signed-off-by: Amit Schendel <[email protected]>

* fixing process details

Signed-off-by: Ben <[email protected]>

* Adding main support for rule manager

Signed-off-by: Amit Schendel <[email protected]>

* Removing import

Signed-off-by: Amit Schendel <[email protected]>

* Adding process tree support

Signed-off-by: Amit Schendel <[email protected]>

* Adding fixed config

Signed-off-by: Amit Schendel <[email protected]>

* Chaning log to debug to prevent spamming

Signed-off-by: Amit Schendel <[email protected]>

* Adding new code

Signed-off-by: Amit Schendel <[email protected]>

* Adding rules

Signed-off-by: Amit Schendel <[email protected]>

* Adding fixed rules

Signed-off-by: Amit Schendel <[email protected]>

* Adding path and rules

Signed-off-by: Amit Schendel <[email protected]>

* Commenting read env variable from proc

Signed-off-by: Amit Schendel <[email protected]>

* Added host agent rules (#485)

* Added host agent rules

Signed-off-by: Afek Berger <[email protected]>

* fixed tests & naming

Signed-off-by: Afek Berger <[email protected]>

---------

Signed-off-by: Afek Berger <[email protected]>

* Fixing rule name

Signed-off-by: Amit Schendel <[email protected]>

* Configuring event receiver export

Signed-off-by: Ben <[email protected]>

* fix

Signed-off-by: Ben <[email protected]>

* Fixing test

Signed-off-by: Amit Schendel <[email protected]>

* Changing unused params to _

Signed-off-by: Amit Schendel <[email protected]>

* Adding syscall peeking

Signed-off-by: Amit Schendel <[email protected]>

* Adding rule creation at constructor

Signed-off-by: Amit Schendel <[email protected]>

* Adding generic cooldown queue

Signed-off-by: Amit Schendel <[email protected]>

* implementing caching in the hosthashsensor

Signed-off-by: Ben <[email protected]>

* Using temp k8s interface

Signed-off-by: Amit Schendel <[email protected]>

* Adding cloudmetadata to alerts

Signed-off-by: Amit Schendel <[email protected]>

* skip env check when running on host

Signed-off-by: Matthias Bertschy <[email protected]>

* fix kernel version parsing

Signed-off-by: Matthias Bertschy <[email protected]>

* split main into node and host agent

Signed-off-by: Matthias Bertschy <[email protected]>

* Ptracewatcher

Signed-off-by: Ben <[email protected]>

* Adding host network watcher (#486)

* Adding host network watcher

Signed-off-by: Amit Schendel <[email protected]>

* Fixing process tree

Signed-off-by: Amit Schendel <[email protected]>

* adding a goreleaser config

Signed-off-by: Matthias Bertschy <[email protected]>

---------

Signed-off-by: Amit Schendel <[email protected]>
Signed-off-by: Matthias Bertschy <[email protected]>
Co-authored-by: Matthias Bertschy <[email protected]>

* Fixing Dockerfile - @matthias blame

Signed-off-by: Amit Schendel <[email protected]>

* fix

Signed-off-by: Ben <[email protected]>

* Removing network on host

Signed-off-by: Amit Schendel <[email protected]>

* Clean unused code

Signed-off-by: Amit Schendel <[email protected]>

* Removing io_uring init on host

Signed-off-by: Amit Schendel <[email protected]>

* Adding reporting of open and exec to hash sensor

Signed-off-by: Amit Schendel <[email protected]>

* Adding injected rule creator

Signed-off-by: Amit Schendel <[email protected]>

* Public validate

Signed-off-by: Amit Schendel <[email protected]>

* Removing files

Signed-off-by: Amit Schendel <[email protected]>

* Removing host container

Signed-off-by: Amit Schendel <[email protected]>

* fixing nits before merging

Signed-off-by: Matthias Bertschy <[email protected]>

---------

Signed-off-by: Ben <[email protected]>
Signed-off-by: Amit Schendel <[email protected]>
Signed-off-by: Afek Berger <[email protected]>
Signed-off-by: Matthias Bertschy <[email protected]>
Co-authored-by: Amit Schendel <[email protected]>
Co-authored-by: Afek Berger <[email protected]>
Co-authored-by: Matthias Bertschy <[email protected]>
Co-authored-by: Amit Schendel <[email protected]>
  • Loading branch information
5 people authored Feb 20, 2025
1 parent e34aa76 commit 963c7ef
Show file tree
Hide file tree
Showing 37 changed files with 449 additions and 220 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ temp
.vscode
resources/ebpf/falco/*
node-agent
__pycache__
__pycache__
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ TAG?=test
# TAG?=v0.0.1

binary:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o $(BINARY_NAME)
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o $(BINARY_NAME) ./cmd/main.go

docker-build:
docker buildx build --platform linux/amd64 -t $(IMAGE):$(TAG) -f $(DOCKERFILE_PATH) --load .
Expand Down
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ARG TARGETOS TARGETARCH
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg \
GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /out/node-agent .
GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /out/node-agent ./cmd/main.go

FROM gcr.io/distroless/static-debian12:latest

Expand Down
21 changes: 15 additions & 6 deletions main.go → cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ import (
func main() {
ctx := context.Background()

cfg, err := config.LoadConfig("/etc/config")
configDir := "/etc/config"
if envPath := os.Getenv("CONFIG_DIR"); envPath != "" {
configDir = envPath
}

cfg, err := config.LoadConfig(configDir)
if err != nil {
logger.L().Ctx(ctx).Fatal("load config error", helpers.Error(err))
}
Expand Down Expand Up @@ -91,7 +96,7 @@ func main() {

// Check if we need to validate the kernel version.
if os.Getenv("SKIP_KERNEL_VERSION_CHECK") == "" {
err = validator.CheckPrerequisites()
err = validator.CheckPrerequisites(cfg)
if err != nil {
logger.L().Ctx(ctx).Error("error during kernel validation", helpers.Error(err))

Expand Down Expand Up @@ -143,6 +148,7 @@ func main() {
healthManager.Start(ctx)

// Create clients
logger.L().Info("Kubernetes mode is true")
k8sClient := k8sinterface.NewKubernetesApi()
storageClient, err := storage.CreateStorage(clusterData.Namespace, cfg.UpdateDataPeriod)
if err != nil {
Expand Down Expand Up @@ -226,6 +232,9 @@ func main() {
// create the process manager
processManager = processmanagerv1.CreateProcessManager(ctx)

// create exporter
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, cfg.NodeName, cloudMetadata)

// create ruleBinding cache
ruleBindingCache := rulebindingcachev1.NewCache(cfg.NodeName, k8sClient)
dWatcher.AddAdaptor(ruleBindingCache)
Expand All @@ -244,9 +253,6 @@ func main() {
// create object cache
objCache = objectcachev1.NewObjectCache(k8sObjectCache, apc, nnc, dc)

// create exporter
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, cfg.NodeName, cloudMetadata)

// create runtimeDetection managers
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, cfg.NodeName, clusterData.ClusterName, processManager, dnsResolver, nil)
if err != nil {
Expand Down Expand Up @@ -309,7 +315,10 @@ func main() {
}

// Create the container handler
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, igK8sClient, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, sbomManager, &ruleBindingNotify, igK8sClient.RuntimeConfig, nil, nil, processManager, clusterData.ClusterName, objCache)
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient,
igK8sClient, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager,
malwareManager, sbomManager, &ruleBindingNotify, igK8sClient.RuntimeConfig, nil, nil,
processManager, clusterData.ClusterName, objCache)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the container watcher", helpers.Error(err))
}
Expand Down
6 changes: 4 additions & 2 deletions demo/general_attack/webapp/ping-app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: ping-app
labels:
app: ping-app
kubescape.io/max-sniffing-time: "5m"
spec:

containers:
Expand All @@ -26,6 +27,7 @@ spec:
- protocol: TCP
port: 80
targetPort: 80
type: LoadBalancer
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
Expand All @@ -34,8 +36,8 @@ metadata:
name: ping-app-role
rules:
- apiGroups: [""]
resources: ["*"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
resources: ["pods"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DmitriyVTitov/size v1.5.0
github.com/anchore/syft v1.18.1
github.com/aquilax/truncate v1.0.0
github.com/armosec/armoapi-go v0.0.512
github.com/armosec/armoapi-go v0.0.527
github.com/armosec/utils-k8s-go v0.0.30
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
Expand All @@ -28,10 +28,11 @@ require (
github.com/goradd/maps v1.0.0
github.com/grafana/pyroscope-go v1.2.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90
github.com/inspektor-gadget/inspektor-gadget v0.36.1
github.com/kubescape/backend v0.0.25
github.com/kubescape/go-logger v0.0.23
github.com/kubescape/k8s-interface v0.0.188
github.com/kubescape/k8s-interface v0.0.189
github.com/kubescape/storage v0.0.158
github.com/moby/sys/mountinfo v0.7.2
github.com/opencontainers/go-digest v1.0.0
Expand Down Expand Up @@ -197,7 +198,6 @@ require (
github.com/hashicorp/hcl v1.0.1-vault-7 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/iceber/iouring-go v0.0.0-20230403020409-002cfd2e2a90 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jinzhu/copier v0.4.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/armosec/armoapi-go v0.0.512 h1:ZuXlm63s5/O4rpZVHeKhGUvku23iE7biJq1g30M4j7o=
github.com/armosec/armoapi-go v0.0.512/go.mod h1:SYyDaNaZ7jc8D40dySr9Lz0hw+qkach+wt5G6lIqS7o=
github.com/armosec/armoapi-go v0.0.527 h1:9NDMycWleLQTGPhvL7xCL81+cgCOGTJiWlHKxGTvTE4=
github.com/armosec/armoapi-go v0.0.527/go.mod h1:SYyDaNaZ7jc8D40dySr9Lz0hw+qkach+wt5G6lIqS7o=
github.com/armosec/gojay v1.2.17 h1:VSkLBQzD1c2V+FMtlGFKqWXNsdNvIKygTKJI9ysY8eM=
github.com/armosec/gojay v1.2.17/go.mod h1:vuvX3DlY0nbVrJ0qCklSS733AWMoQboq3cFyuQW9ybc=
github.com/armosec/utils-go v0.0.58 h1:g9RnRkxZAmzTfPe2ruMo2OXSYLwVSegQSkSavOfmaIE=
Expand Down Expand Up @@ -700,8 +700,8 @@ github.com/kubescape/backend v0.0.25 h1:PLESA7KGJskebR5hiSqPeJ1cPQ8Ra+4yNYXKyIej
github.com/kubescape/backend v0.0.25/go.mod h1:FpazfN+c3Ucuvv4jZYCnk99moSBRNMVIxl5aWCZAEBo=
github.com/kubescape/go-logger v0.0.23 h1:5xh+Nm8eGImhFbtippRKLaFgsvlKE1ufvQhNM2P/570=
github.com/kubescape/go-logger v0.0.23/go.mod h1:Ayg7g769c7sXVB+P3fkJmbsJpoEmMmaUf9jeo+XuC3U=
github.com/kubescape/k8s-interface v0.0.188 h1:muG8qzXqA3dQ0myreg/V4DgxLx5A2S+Jj6Ur+OJCefc=
github.com/kubescape/k8s-interface v0.0.188/go.mod h1:j9snZbH+RxOaa1yG/bWgTClj90q7To0rGgQepxy4b+k=
github.com/kubescape/k8s-interface v0.0.189 h1:3rdato12XNkHh5z3VI/xrZQHu4MV7ANXErMnHZp7jGk=
github.com/kubescape/k8s-interface v0.0.189/go.mod h1:j9snZbH+RxOaa1yG/bWgTClj90q7To0rGgQepxy4b+k=
github.com/kubescape/storage v0.0.158 h1:TbI1/rrRq+0gNbbPl8Z1weik0ShTWPJfy8lXjnzaDjw=
github.com/kubescape/storage v0.0.158/go.mod h1:K3QWf+zcXmXxfeQ2HD0dd0bF4FJ5gbxLTRZ7nx4dHXw=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
Expand Down
12 changes: 12 additions & 0 deletions pkg/cloudmetadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,15 @@ func enrichCloudMetadataForAWS(ctx context.Context, client *k8sinterface.Kuberne

logger.L().Debug("enriched cloud metadata from aws-auth ConfigMap")
}

// GetCloudMetadataWithIMDS retrieves cloud metadata for a given node using IMDS
func GetCloudMetadataWithIMDS(ctx context.Context) (*apitypes.CloudMetadata, error) {
cMetadataClient := k8sInterfaceCloudMetadata.NewMetadataClient(true)

cMetadata, err := cMetadataClient.GetMetadata(ctx)
if err != nil {
return nil, err
}

return cMetadata, nil
}
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
EnableHttpDetection bool `mapstructure:"httpDetectionEnabled"`
EnableNetworkTracing bool `mapstructure:"networkServiceEnabled"`
EnableNodeProfile bool `mapstructure:"nodeProfileServiceEnabled"`
EnableHostMalwareSensor bool `mapstructure:"hostMalwareSensorEnabled"`
EnableHostNetworkSensor bool `mapstructure:"hostNetworkSensorEnabled"`
NodeProfileInterval time.Duration `mapstructure:"nodeProfileInterval"`
EnableSeccomp bool `mapstructure:"seccompServiceEnabled"`
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
Expand All @@ -38,6 +40,7 @@ type Config struct {
NamespaceName string `mapstructure:"namespaceName"`
NodeName string `mapstructure:"nodeName"`
PodName string `mapstructure:"podName"`
KubernetesMode bool `mapstructure:"kubernetesMode"`
}

// LoadConfig reads configuration from file or environment variables.
Expand All @@ -56,6 +59,9 @@ func LoadConfig(path string) (Config, error) {
viper.SetDefault("namespaceName", os.Getenv(NamespaceEnvVar))
viper.SetDefault("nodeName", os.Getenv(NodeNameEnvVar))
viper.SetDefault("podName", os.Getenv(PodNameEnvVar))
viper.SetDefault("hostMalwareSensorEnabled", false)
viper.SetDefault("hostNetworkSensorEnabled", false)
viper.SetDefault("kubernetesMode", true)

viper.AutomaticEnv()

Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func TestLoadConfig(t *testing.T) {
EnableNetworkTracing: true,
EnableNodeProfile: true,
EnableHttpDetection: true,
EnableHostMalwareSensor: false,
EnableHostNetworkSensor: false,
KubernetesMode: true,
InitialDelay: 2 * time.Minute,
MaxSniffingTime: 6 * time.Hour,
UpdateDataPeriod: 1 * time.Minute,
Expand Down
33 changes: 23 additions & 10 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type IGContainerWatcher struct {
ruleManager rulemanager.RuleManagerClient
malwareManager malwaremanager.MalwareManagerClient
sbomManager sbommanager.SbomManagerClient

// IG Collections
containerCollection *containercollection.ContainerCollection
tracerCollection *tracercollection.TracerCollection
Expand Down Expand Up @@ -171,8 +172,18 @@ type IGContainerWatcher struct {

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)

func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, igK8sClient *containercollection.K8sClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, sbomManager sbommanager.SbomManagerClient, ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], thirdPartyEnricher containerwatcher.ThirdPartyEnricher, processManager processmanager.ProcessManagerClient, clusterName string, objectCache objectcache.ObjectCache) (*IGContainerWatcher, error) { // Use container collection to get notified for new containers
func CreateIGContainerWatcher(cfg config.Config,
applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi,
igK8sClient *containercollection.K8sClient, networkManagerClient networkmanager.NetworkManagerClient,
dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient,
malwareManager malwaremanager.MalwareManagerClient, sbomManager sbommanager.SbomManagerClient,
ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig,
thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]],
thirdPartyEnricher containerwatcher.ThirdPartyEnricher, processManager processmanager.ProcessManagerClient,
clusterName string, objectCache objectcache.ObjectCache) (*IGContainerWatcher, error) { // Use container collection to get notified for new containers

containerCollection := &containercollection.ContainerCollection{}

// Create a tracer collection instance
tracerCollection, err := tracercollection.NewTracerCollection(containerCollection)
if err != nil {
Expand Down Expand Up @@ -203,6 +214,16 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
// Create an exec worker pool
execWorkerPool, err := ants.NewPoolWithFunc(execWorkerPoolSize, func(i interface{}) {
event := i.(events.ExecEvent)

path := event.Comm
if len(event.Args) > 0 {
path = event.Args[0]
}

if path == "" {
return
}

// ignore events with empty container name
if event.K8s.ContainerName == "" {
return
Expand All @@ -215,15 +236,6 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
return
}

path := event.Comm
if len(event.Args) > 0 {
path = event.Args[0]
}

if path == "" {
return
}

ruleManager.ReportEvent(utils.ExecveEventType, &event)
malwareManager.ReportEvent(utils.ExecveEventType, &event)

Expand Down Expand Up @@ -270,6 +282,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
// Create a network worker pool
networkWorkerPool, err := ants.NewPoolWithFunc(networkWorkerPoolSize, func(i interface{}) {
event := i.(tracernetworktype.Event)

// ignore events with empty container name
if event.K8s.ContainerName == "" {
return
Expand Down
4 changes: 4 additions & 0 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
)

func (ch *IGContainerWatcher) containerCallback(notif containercollection.PubSubEvent) {
logger.L().Debug("IGContainerWatcher.containerCallback - received container event", helpers.String("event", fmt.Sprintf("%+v", notif)), helpers.String("container", fmt.Sprintf("%+v", notif.Container)))
if notif.Container == nil || notif.Container.Runtime.ContainerID == "" {
return
}
// check if the container should be ignored
if ch.ignoreContainer(notif.Container.K8s.Namespace, notif.Container.K8s.PodName) {
// avoid loops when the container is being removed
Expand Down
61 changes: 61 additions & 0 deletions pkg/cooldownqueue/cooldownqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cooldownqueue

import (
"time"

"istio.io/pkg/cache"
)

const (
DefaultExpiration = 5 * time.Second
EvictionInterval = 1 * time.Second
)

// CooldownQueue is a queue that lets clients put events into it with a cooldown
//
// When a client puts an event into a queue, it waits for a cooldown period before
// the event is forwarded to the consumer. If and event for the same key is put into the queue
// again before the cooldown period is over, the event is overridden and the cooldown period is reset.
type CooldownQueue[T any] struct {
closed bool
seenEvents cache.ExpiringCache
innerChan chan T // Private channel
resultChan <-chan T // Read-only public channel
}

// NewCooldownQueue returns a new Cooldown Queue
func NewCooldownQueue[T any](cooldown time.Duration, evictionInterval time.Duration) *CooldownQueue[T] {
events := make(chan T)
callback := func(key, value any) {
events <- value.(T)
}
c := cache.NewTTLWithCallback(cooldown, evictionInterval, callback)
return &CooldownQueue[T]{
seenEvents: c,
innerChan: events,
resultChan: events,
}
}

func (q *CooldownQueue[T]) Closed() bool {
return q.closed
}

// Enqueue enqueues an event in the Cooldown Queue
func (q *CooldownQueue[T]) Enqueue(e T, key string) {
if q.closed {
return
}

q.seenEvents.Set(key, e)
}

func (q *CooldownQueue[T]) Stop() {
q.closed = true
close(q.innerChan)
}

// ResultChan returns a read-only channel for consuming events.
func (q *CooldownQueue[T]) ResultChan() <-chan T {
return q.resultChan
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/kubescape/node-agent/pkg/watcher"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -33,17 +34,17 @@ func TestCooldownQueue_Enqueue(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := NewCooldownQueue()
q := NewCooldownQueue[watch.Event](DefaultExpiration, EvictionInterval)
go func() {
time.Sleep(10 * time.Second)
q.Stop()
}()
for _, e := range tt.inEvents {
time.Sleep(50 * time.Millisecond) // need to sleep to preserve order since the insertion is async
q.Enqueue(e)
q.Enqueue(e, watcher.MakeEventKey(e))
}
var outEvents []watch.Event
for e := range q.ResultChan {
for e := range q.ResultChan() {
outEvents = append(outEvents, e)
}
// sort outEvents to make the comparison easier
Expand Down Expand Up @@ -91,7 +92,7 @@ func Test_makeEventKey(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := makeEventKey(tt.e)
got := watcher.MakeEventKey(tt.e)
assert.Equal(t, tt.want, got)
})
}
Expand Down
Loading

0 comments on commit 963c7ef

Please sign in to comment.