Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/processtree #394

Merged
merged 15 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/kubescape/node-agent/pkg/objectcache/k8scache"
"github.com/kubescape/node-agent/pkg/objectcache/networkneighborhoodcache"
objectcachev1 "github.com/kubescape/node-agent/pkg/objectcache/v1"
"github.com/kubescape/node-agent/pkg/processmanager"
processmanagerv1 "github.com/kubescape/node-agent/pkg/processmanager/v1"
"github.com/kubescape/node-agent/pkg/relevancymanager"
relevancymanagerv1 "github.com/kubescape/node-agent/pkg/relevancymanager/v1"
rulebinding "github.com/kubescape/node-agent/pkg/rulebindingmanager"
Expand Down Expand Up @@ -193,26 +195,27 @@ func main() {
var networkManagerClient networkmanager.NetworkManagerClient
var dnsManagerClient dnsmanager.DNSManagerClient
var dnsResolver dnsmanager.DNSResolver
if cfg.EnableNetworkTracing {
if cfg.EnableNetworkTracing || cfg.EnableRuntimeDetection {
dnsManager := dnsmanager.CreateDNSManager()
dnsManagerClient = dnsManager
// NOTE: dnsResolver is set for threat detection.
dnsResolver = dnsManager
networkManagerClient = networkmanagerv2.CreateNetworkManager(ctx, cfg, clusterData.ClusterName, k8sClient, storageClient, dnsManager, preRunningContainersIDs, k8sObjectCache)
} else {
if cfg.EnableRuntimeDetection {
logger.L().Ctx(ctx).Fatal("Network tracing is disabled, but runtime detection is enabled. Network tracing is required for runtime detection.")
}
dnsManagerClient = dnsmanager.CreateDNSManagerMock()
dnsResolver = dnsmanager.CreateDNSManagerMock()
networkManagerClient = networkmanager.CreateNetworkManagerMock()
}

var ruleManager rulemanager.RuleManagerClient
var processManager processmanager.ProcessManagerClient
var objCache objectcache.ObjectCache
var ruleBindingNotify chan rulebinding.RuleBindingNotify

if cfg.EnableRuntimeDetection {
// create the process manager
processManager = processmanagerv1.CreateProcessManager(ctx)

// create ruleBinding cache
ruleBindingCache := rulebindingcachev1.NewCache(nodeName, k8sClient)
dWatcher.AddAdaptor(ruleBindingCache)
Expand All @@ -235,7 +238,7 @@ func main() {
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, nodeName)

// create runtimeDetection managers
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, nodeName, clusterData.ClusterName)
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, nodeName, clusterData.ClusterName, processManager)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating RuleManager", helpers.Error(err))
}
Expand All @@ -244,6 +247,7 @@ func main() {
ruleManager = rulemanager.CreateRuleManagerMock()
objCache = objectcache.NewObjectCacheMock()
ruleBindingNotify = make(chan rulebinding.RuleBindingNotify, 1)
processManager = processmanager.CreateProcessManagerMock()
}

// Create the node profile manager
Expand All @@ -269,7 +273,7 @@ func main() {
}

// Create the container handler
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil)
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil, processManager)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the container watcher", helpers.Error(err))
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
tracersshtype "github.com/kubescape/node-agent/pkg/ebpf/gadgets/ssh/types"
tracersymlink "github.com/kubescape/node-agent/pkg/ebpf/gadgets/symlink/tracer"
tracersymlinktype "github.com/kubescape/node-agent/pkg/ebpf/gadgets/symlink/types"
"github.com/kubescape/node-agent/pkg/processmanager"

"github.com/kubescape/node-agent/pkg/malwaremanager"
"github.com/kubescape/node-agent/pkg/metricsmanager"
Expand Down Expand Up @@ -153,11 +154,13 @@ type IGContainerWatcher struct {
ruleBindingPodNotify *chan rulebinding.RuleBindingNotify
// container runtime
runtime *containerutilsTypes.RuntimeConfig
// process manager
processManager processmanager.ProcessManagerClient
}

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)

func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]]) (*IGContainerWatcher, error) {
func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], processManager processmanager.ProcessManagerClient) (*IGContainerWatcher, error) {
// Use container collection to get notified for new containers
containerCollection := &containercollection.ContainerCollection{}
// Create a tracer collection instance
Expand Down Expand Up @@ -203,6 +206,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
path = event.Args[0]
}
metrics.ReportEvent(utils.ExecveEventType)
processManager.ReportEvent(utils.ExecveEventType, &event)
applicationProfileManager.ReportFileExec(k8sContainerID, path, event.Args)
relevancyManager.ReportFileExec(event.Runtime.ContainerID, k8sContainerID, path)
ruleManager.ReportEvent(utils.ExecveEventType, &event)
Expand Down Expand Up @@ -449,6 +453,7 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
runtime: runtime,
thirdPartyTracers: mapset.NewSet[containerwatcher.CustomTracer](),
thirdPartyContainerReceivers: mapset.NewSet[containerwatcher.ContainerReceiver](),
processManager: processManager,
}, nil
}

Expand Down Expand Up @@ -494,11 +499,16 @@ func (ch *IGContainerWatcher) UnregisterContainerReceiver(receiver containerwatc

func (ch *IGContainerWatcher) Start(ctx context.Context) error {
if !ch.running {

if err := ch.startContainerCollection(ctx); err != nil {
return fmt.Errorf("setting up container collection: %w", err)
}

// We want to populate the initial processes before starting the tracers but after retrieving the shims.
if err := ch.processManager.PopulateInitialProcesses(); err != nil {
ch.stopContainerCollection()
return fmt.Errorf("populating initial processes: %w", err)
}

if err := ch.startTracers(); err != nil {
ch.stopContainerCollection()
return fmt.Errorf("starting app behavior tracing: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
ch.networkManager.ContainerCallback,
ch.malwareManager.ContainerCallback,
ch.ruleManager.ContainerCallback,
ch.processManager.ContainerCallback,
}

for receiver := range ch.thirdPartyContainerReceivers.Iter() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/containerwatcher/v1/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func BenchmarkIGContainerWatcher_openEventCallback(b *testing.B) {
assert.NoError(b, err)
mockExporter := metricsmanager.NewMetricsMock()

mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil)
mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil, nil)
assert.NoError(b, err)
event := &traceropentype.Event{
Event: types.Event{
Expand Down
20 changes: 20 additions & 0 deletions pkg/processmanager/process_manager_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package processmanager

import (
apitypes "github.com/armosec/armoapi-go/armotypes"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/kubescape/node-agent/pkg/utils"
)

// ProcessManagerClient is the interface for the process manager client.
// It provides methods to get process tree for a container or a PID.
// The manager is responsible for maintaining the process tree for all containers.
type ProcessManagerClient interface {
GetProcessTreeForPID(containerID string, pid int) (apitypes.Process, error)
// PopulateInitialProcesses is called to populate the initial process tree (parsed from /proc) for all containers.
PopulateInitialProcesses() error

// ReportEvent will be called to report new exec events to the process manager.
ReportEvent(eventType utils.EventType, event utils.K8sEvent)
ContainerCallback(notif containercollection.PubSubEvent)
}
32 changes: 32 additions & 0 deletions pkg/processmanager/process_manager_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package processmanager

import (
apitypes "github.com/armosec/armoapi-go/armotypes"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
"github.com/kubescape/node-agent/pkg/utils"
)

type ProcessManagerMock struct {
}

var _ ProcessManagerClient = (*ProcessManagerMock)(nil)

func CreateProcessManagerMock() *ProcessManagerMock {
return &ProcessManagerMock{}
}

func (p *ProcessManagerMock) GetProcessTreeForPID(containerID string, pid int) (apitypes.Process, error) {
return apitypes.Process{}, nil
}

func (p *ProcessManagerMock) PopulateInitialProcesses() error {
return nil
}

func (p *ProcessManagerMock) ReportEvent(eventType utils.EventType, event utils.K8sEvent) {
// no-op
}

func (p *ProcessManagerMock) ContainerCallback(notif containercollection.PubSubEvent) {
// no-op
}
Loading
Loading