Skip to content

Commit

Permalink
Merge pull request #394 from kubescape/feature/processtree
Browse files Browse the repository at this point in the history
Feature/processtree
  • Loading branch information
amitschendel authored Oct 31, 2024
2 parents d23c7db + e62f268 commit 2b2bc24
Show file tree
Hide file tree
Showing 12 changed files with 1,722 additions and 163 deletions.
16 changes: 10 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/kubescape/node-agent/pkg/objectcache/k8scache"
"github.com/kubescape/node-agent/pkg/objectcache/networkneighborhoodcache"
objectcachev1 "github.com/kubescape/node-agent/pkg/objectcache/v1"
"github.com/kubescape/node-agent/pkg/processmanager"
processmanagerv1 "github.com/kubescape/node-agent/pkg/processmanager/v1"
"github.com/kubescape/node-agent/pkg/relevancymanager"
relevancymanagerv1 "github.com/kubescape/node-agent/pkg/relevancymanager/v1"
rulebinding "github.com/kubescape/node-agent/pkg/rulebindingmanager"
Expand Down Expand Up @@ -193,26 +195,27 @@ func main() {
var networkManagerClient networkmanager.NetworkManagerClient
var dnsManagerClient dnsmanager.DNSManagerClient
var dnsResolver dnsmanager.DNSResolver
if cfg.EnableNetworkTracing {
if cfg.EnableNetworkTracing || cfg.EnableRuntimeDetection {
dnsManager := dnsmanager.CreateDNSManager()
dnsManagerClient = dnsManager
// NOTE: dnsResolver is set for threat detection.
dnsResolver = dnsManager
networkManagerClient = networkmanagerv2.CreateNetworkManager(ctx, cfg, clusterData.ClusterName, k8sClient, storageClient, dnsManager, preRunningContainersIDs, k8sObjectCache)
} else {
if cfg.EnableRuntimeDetection {
logger.L().Ctx(ctx).Fatal("Network tracing is disabled, but runtime detection is enabled. Network tracing is required for runtime detection.")
}
dnsManagerClient = dnsmanager.CreateDNSManagerMock()
dnsResolver = dnsmanager.CreateDNSManagerMock()
networkManagerClient = networkmanager.CreateNetworkManagerMock()
}

var ruleManager rulemanager.RuleManagerClient
var processManager processmanager.ProcessManagerClient
var objCache objectcache.ObjectCache
var ruleBindingNotify chan rulebinding.RuleBindingNotify

if cfg.EnableRuntimeDetection {
// create the process manager
processManager = processmanagerv1.CreateProcessManager(ctx)

// create ruleBinding cache
ruleBindingCache := rulebindingcachev1.NewCache(nodeName, k8sClient)
dWatcher.AddAdaptor(ruleBindingCache)
Expand All @@ -235,7 +238,7 @@ func main() {
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, nodeName)

// create runtimeDetection managers
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, nodeName, clusterData.ClusterName)
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, nodeName, clusterData.ClusterName, processManager)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating RuleManager", helpers.Error(err))
}
Expand All @@ -244,6 +247,7 @@ func main() {
ruleManager = rulemanager.CreateRuleManagerMock()
objCache = objectcache.NewObjectCacheMock()
ruleBindingNotify = make(chan rulebinding.RuleBindingNotify, 1)
processManager = processmanager.CreateProcessManagerMock()
}

// Create the node profile manager
Expand All @@ -269,7 +273,7 @@ func main() {
}

// Create the container handler
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil)
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil, processManager)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the container watcher", helpers.Error(err))
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
tracersshtype "github.com/kubescape/node-agent/pkg/ebpf/gadgets/ssh/types"
tracersymlink "github.com/kubescape/node-agent/pkg/ebpf/gadgets/symlink/tracer"
tracersymlinktype "github.com/kubescape/node-agent/pkg/ebpf/gadgets/symlink/types"
"github.com/kubescape/node-agent/pkg/processmanager"

"github.com/kubescape/node-agent/pkg/malwaremanager"
"github.com/kubescape/node-agent/pkg/metricsmanager"
Expand Down Expand Up @@ -153,11 +154,13 @@ type IGContainerWatcher struct {
ruleBindingPodNotify *chan rulebinding.RuleBindingNotify
// container runtime
runtime *containerutilsTypes.RuntimeConfig
// process manager
processManager processmanager.ProcessManagerClient
}

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)

func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]]) (*IGContainerWatcher, error) {
func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], processManager processmanager.ProcessManagerClient) (*IGContainerWatcher, error) {
// Use container collection to get notified for new containers
containerCollection := &containercollection.ContainerCollection{}
// Create a tracer collection instance
Expand Down Expand Up @@ -203,6 +206,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
path = event.Args[0]
}
metrics.ReportEvent(utils.ExecveEventType)
processManager.ReportEvent(utils.ExecveEventType, &event)
applicationProfileManager.ReportFileExec(k8sContainerID, path, event.Args)
relevancyManager.ReportFileExec(event.Runtime.ContainerID, k8sContainerID, path)
ruleManager.ReportEvent(utils.ExecveEventType, &event)
Expand Down Expand Up @@ -449,6 +453,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
runtime: runtime,
thirdPartyTracers: mapset.NewSet[containerwatcher.CustomTracer](),
thirdPartyContainerReceivers: mapset.NewSet[containerwatcher.ContainerReceiver](),
processManager: processManager,
}, nil
}

Expand Down Expand Up @@ -494,11 +499,16 @@ func (ch *IGContainerWatcher) UnregisterContainerReceiver(receiver containerwatc

func (ch *IGContainerWatcher) Start(ctx context.Context) error {
if !ch.running {

if err := ch.startContainerCollection(ctx); err != nil {
return fmt.Errorf("setting up container collection: %w", err)
}

// We want to populate the initial processes before starting the tracers but after retrieving the shims.
if err := ch.processManager.PopulateInitialProcesses(); err != nil {
ch.stopContainerCollection()
return fmt.Errorf("populating initial processes: %w", err)
}

if err := ch.startTracers(); err != nil {
ch.stopContainerCollection()
return fmt.Errorf("starting app behavior tracing: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
ch.networkManager.ContainerCallback,
ch.malwareManager.ContainerCallback,
ch.ruleManager.ContainerCallback,
ch.processManager.ContainerCallback,
}

for receiver := range ch.thirdPartyContainerReceivers.Iter() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/containerwatcher/v1/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func BenchmarkIGContainerWatcher_openEventCallback(b *testing.B) {
assert.NoError(b, err)
mockExporter := metricsmanager.NewMetricsMock()

mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil)
mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil, nil)
assert.NoError(b, err)
event := &traceropentype.Event{
Event: types.Event{
Expand Down
20 changes: 20 additions & 0 deletions pkg/processmanager/process_manager_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package processmanager

import (
apitypes "github.com/armosec/armoapi-go/armotypes"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/kubescape/node-agent/pkg/utils"
)

// ProcessManagerClient is the interface for the process manager client.
// It provides methods to get process tree for a container or a PID.
// The manager is responsible for maintaining the process tree for all containers.
type ProcessManagerClient interface {
GetProcessTreeForPID(containerID string, pid int) (apitypes.Process, error)
// PopulateInitialProcesses is called to populate the initial process tree (parsed from /proc) for all containers.
PopulateInitialProcesses() error

// ReportEvent will be called to report new exec events to the process manager.
ReportEvent(eventType utils.EventType, event utils.K8sEvent)
ContainerCallback(notif containercollection.PubSubEvent)
}
32 changes: 32 additions & 0 deletions pkg/processmanager/process_manager_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package processmanager

import (
apitypes "github.com/armosec/armoapi-go/armotypes"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/kubescape/node-agent/pkg/utils"
)

type ProcessManagerMock struct {
}

var _ ProcessManagerClient = (*ProcessManagerMock)(nil)

func CreateProcessManagerMock() *ProcessManagerMock {
return &ProcessManagerMock{}
}

func (p *ProcessManagerMock) GetProcessTreeForPID(containerID string, pid int) (apitypes.Process, error) {
return apitypes.Process{}, nil
}

func (p *ProcessManagerMock) PopulateInitialProcesses() error {
return nil
}

func (p *ProcessManagerMock) ReportEvent(eventType utils.EventType, event utils.K8sEvent) {
// no-op
}

func (p *ProcessManagerMock) ContainerCallback(notif containercollection.PubSubEvent) {
// no-op
}
Loading

0 comments on commit 2b2bc24

Please sign in to comment.