Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/dedup-testing/after_cpu_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dedup-testing/after_memory_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dedup-testing/before_cpu_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dedup-testing/before_memory_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
91 changes: 91 additions & 0 deletions docs/dedup-testing/benchmark-results.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# eBPF Event Dedup Benchmark Results

Benchmark comparing `node-agent:v0.3.71` (baseline) vs the local build with the dedup cache (`feature/ebpf-event-dedup`).

## Setup

- **Cluster**: kind (2 nodes: control-plane + worker)
- **Prometheus**: kube-prometheus-stack with 10s scrape interval
- **Kubescape**: kubescape-operator chart with runtimeDetection + runtimeObservability enabled
- **Load simulator**: DaemonSet generating events at configurable rates
- **Duration**: 2 min warmup + 10 min load per run
- **CPU rate window**: `rate(...[1m])` for responsive measurement

### Load Simulator Config

| Parameter | Value |
|-----------|-------|
| openRate | 1000/sec |
| httpRate | 100/sec |
| execRate | 10/sec |
| networkRate | 10/sec |
| dnsRate | 2/sec |
| hardlinkRate | 10/sec |
| symlinkRate | 10/sec |
| cpuLoadMs | 500 |
| numberParallelCPUs | 2 |

## Resource Usage

| Metric | BEFORE (v0.3.71) | AFTER (dedup) | Delta |
|--------|-------------------|---------------|-------|
| Avg CPU (cores) | 0.178 | 0.150 | **-15.9%** |
| Peak CPU (cores) | 0.220 | 0.156 | **-29.1%** |
| Avg Memory (MiB) | 339.5 | 335.9 | -1.1% |
| Peak Memory (MiB) | 345.5 | 338.4 | -2.1% |

### CPU Usage

| BEFORE (v0.3.71) | AFTER (dedup) |
|---|---|
| ![before cpu](before_cpu_usage.png) | ![after cpu](after_cpu_usage.png) |

### Memory Usage

| BEFORE (v0.3.71) | AFTER (dedup) |
|---|---|
| ![before memory](before_memory_usage.png) | ![after memory](after_memory_usage.png) |

## Dedup Effectiveness

Events processed by the dedup cache during the AFTER run:

| Event Type | Passed | Deduped | Dedup Ratio |
|------------|--------|---------|-------------|
| http | 1,701 | 119,453 | **98.6%** |
| network | 900 | 77,968 | **98.9%** |
| open | 59,569 | 626,133 | **91.3%** |
| syscall | 998 | 1,967 | **66.3%** |
| dns | 1,197 | 0 | 0.0% |
| hardlink | 6,000 | 0 | 0.0% |
| symlink | 6,000 | 0 | 0.0% |

## Event Counters (cumulative, both runs)

| Metric | BEFORE | AFTER |
|--------|--------|-------|
| open_counter | 801,868 | 816,637 |
| network_counter | 92,197 | 93,735 |
| exec_counter | 7,009 | 7,130 |
| syscall_counter | 3,628 | 3,735 |
| dns_counter | 1,401 | 1,422 |
| capability_counter | 9 | 9 |

Event counters are consistent between runs, confirming the load simulator produced comparable workloads.

## Analysis

- The dedup cache reduces **avg CPU by ~16%** and **peak CPU by ~29%** under sustained load (~1,100 events/sec).
- Memory impact is negligible (~1%) since the dedup cache uses a fixed-size, lock-free array (2 MiB for 2^18 slots at 8 bytes each).
Comment thread
slashben marked this conversation as resolved.
Outdated
- High-frequency event types benefit most: **network (98.9%)**, **http (98.6%)**, and **open (91.3%)** dedup ratios.
- Events with unique keys per occurrence (dns, hardlink, symlink) show 0% dedup, which is expected.
- The CPU savings come from skipping CEL rule evaluation on deduplicated events. The eBPF ingestion and event enrichment cost (which dominates baseline CPU) is unchanged.

## Reproducing

```bash
cd perfornamce
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
./dedup-bench.sh quay.io/kubescape/node-agent:v0.3.71 quay.io/kubescape/node-agent:test
```

Requires: kind, helm, kubectl, docker, python3. Estimated runtime: ~35 minutes.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cenkalti/backoff/v5 v5.0.3
github.com/cespare/xxhash/v2 v2.3.0
github.com/cilium/ebpf v0.20.0
github.com/crewjam/rfc5424 v0.1.0
github.com/cyphar/filepath-securejoin v0.6.0
Expand Down Expand Up @@ -56,6 +57,7 @@ require (
golang.org/x/sys v0.42.0
gonum.org/v1/plot v0.14.0
google.golang.org/grpc v1.79.3
google.golang.org/protobuf v1.36.11
gopkg.in/mcuadros/go-syslog.v2 v2.3.0
istio.io/pkg v0.0.0-20231221211216-7635388a563e
k8s.io/api v0.35.0
Expand Down Expand Up @@ -155,7 +157,6 @@ require (
github.com/bodgit/windows v1.0.1 // indirect
github.com/briandowns/spinner v1.23.2 // indirect
github.com/campoy/embedmd v1.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/charmbracelet/colorprofile v0.3.1 // indirect
github.com/charmbracelet/lipgloss v1.1.0 // indirect
github.com/charmbracelet/x/ansi v0.9.3 // indirect
Expand Down Expand Up @@ -433,7 +434,6 @@ require (
google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ const NodeNameEnvVar = "NODE_NAME"
const PodNameEnvVar = "POD_NAME"
const NamespaceEnvVar = "NAMESPACE_NAME"

// EventDedupConfig controls eBPF event deduplication before CEL rule evaluation.
type EventDedupConfig struct {
Enabled bool `mapstructure:"enabled"`
SlotsExponent uint8 `mapstructure:"slotsExponent"`
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

type Config struct {
BlockEvents bool `mapstructure:"blockEvents"`
CelConfigCache cache.FunctionCacheConfig `mapstructure:"celConfigCache"`
Expand Down Expand Up @@ -70,6 +76,7 @@ type Config struct {
StandaloneMonitoringEnabled bool `mapstructure:"standaloneMonitoringEnabled"`
SeccompProfileBackend string `mapstructure:"seccompProfileBackend"`
EventBatchSize int `mapstructure:"eventBatchSize"`
EventDedup EventDedupConfig `mapstructure:"eventDedup"`
ExcludeJsonPaths []string `mapstructure:"excludeJsonPaths"`
ExcludeLabels map[string][]string `mapstructure:"excludeLabels"`
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
Expand Down Expand Up @@ -183,6 +190,8 @@ func LoadConfig(path string) (Config, error) {
viper.SetDefault("celConfigCache::ttl", 1*time.Minute)
viper.SetDefault("ignoreRuleBindings", false)

viper.SetDefault("eventDedup::enabled", true)
viper.SetDefault("eventDedup::slotsExponent", 18)
viper.SetDefault("dnsCacheSize", 50000)
viper.SetDefault("seccompProfileBackend", "storage") // "storage" or "crd"
viper.SetDefault("containerEolNotificationBuffer", 100)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestLoadConfig(t *testing.T) {
},
WorkerPoolSize: 3000,
EventBatchSize: 15000,
EventDedup: EventDedupConfig{Enabled: true, SlotsExponent: 18},
WorkerChannelSize: 750000,
BlockEvents: false,
ProcfsScanInterval: 30 * time.Second,
Expand Down
9 changes: 9 additions & 0 deletions pkg/containerwatcher/v2/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/containerprofilemanager"
"github.com/kubescape/node-agent/pkg/containerwatcher"
"github.com/kubescape/node-agent/pkg/dedupcache"
"github.com/kubescape/node-agent/pkg/containerwatcher/v2/tracers"
"github.com/kubescape/node-agent/pkg/dnsmanager"
"github.com/kubescape/node-agent/pkg/ebpf/events"
Expand Down Expand Up @@ -137,6 +138,12 @@ func CreateContainerWatcher(

rulePolicyReporter := rulepolicy.NewRulePolicyReporter(ruleManager, containerProfileManager)

// Create dedup cache if enabled
var dedupCache *dedupcache.DedupCache
if cfg.EventDedup.Enabled {
dedupCache = dedupcache.NewDedupCache(cfg.EventDedup.SlotsExponent)
}

// Create event handler factory
eventHandlerFactory := NewEventHandlerFactory(
cfg,
Expand All @@ -150,6 +157,7 @@ func CreateContainerWatcher(
thirdPartyTracers.ThirdPartyEventReceivers,
thirdPartyEnricher,
rulePolicyReporter,
dedupCache,
)

// Create event enricher
Expand Down Expand Up @@ -462,6 +470,7 @@ func (cw *ContainerWatcher) processQueueBatch() {

func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry) {
enrichedEvent := cw.eventEnricher.EnrichEvents(entry)
enrichedEvent.DedupBucket = uint16(time.Now().UnixNano() / (64 * 1_000_000))
Comment thread
coderabbitai[bot] marked this conversation as resolved.

select {
case cw.workerChan <- enrichedEvent:
Expand Down
138 changes: 138 additions & 0 deletions pkg/containerwatcher/v2/event_handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/containerprofilemanager"
"github.com/kubescape/node-agent/pkg/containerwatcher"
"github.com/kubescape/node-agent/pkg/dedupcache"
"github.com/kubescape/node-agent/pkg/dnsmanager"
"github.com/kubescape/node-agent/pkg/ebpf/events"
"github.com/kubescape/node-agent/pkg/eventreporters/rulepolicy"
Expand Down Expand Up @@ -41,6 +42,20 @@ func (ma *ManagerAdapter) ReportEvent(eventType utils.EventType, event utils.K8s
ma.reportEventFunc(eventType, event)
}

// TTL constants for dedup windows in 64ms buckets.
const (
dedupTTLOpen uint16 = 156 // 10s
dedupTTLNetwork uint16 = 78 // 5s
dedupTTLDNS uint16 = 156 // 10s
dedupTTLCapabilities uint16 = 156 // 10s
dedupTTLHTTP uint16 = 31 // 2s
dedupTTLSSH uint16 = 156 // 10s
dedupTTLSymlink uint16 = 156 // 10s
dedupTTLHardlink uint16 = 156 // 10s
dedupTTLPtrace uint16 = 156 // 10s
dedupTTLSyscall uint16 = 78 // 5s
)

// EventHandlerFactory manages the mapping of event types to their managers
type EventHandlerFactory struct {
handlers map[utils.EventType][]Manager
Expand All @@ -49,6 +64,9 @@ type EventHandlerFactory struct {
cfg config.Config
containerCollection *containercollection.ContainerCollection
containerCache *maps.SafeMap[string, *containercollection.Container] // Cache for container lookups
dedupCache *dedupcache.DedupCache
metrics metricsmanager.MetricsManager
dedupSkipSet map[Manager]struct{} // Managers to skip when event is duplicate
}

// NewEventHandlerFactory creates a new event handler factory
Expand All @@ -64,6 +82,7 @@ func NewEventHandlerFactory(
thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.GenericEventReceiver]],
thirdPartyEnricher containerwatcher.TaskBasedEnricher,
rulePolicyReporter *rulepolicy.RulePolicyReporter,
dedupCache *dedupcache.DedupCache,
) *EventHandlerFactory {
factory := &EventHandlerFactory{
handlers: make(map[utils.EventType][]Manager),
Expand All @@ -72,6 +91,9 @@ func NewEventHandlerFactory(
cfg: cfg,
containerCollection: containerCollection,
containerCache: &maps.SafeMap[string, *containercollection.Container]{},
dedupCache: dedupCache,
metrics: metrics,
dedupSkipSet: make(map[Manager]struct{}),
}

// Create adapters for managers that don't implement the Manager interface directly
Expand Down Expand Up @@ -168,9 +190,108 @@ func NewEventHandlerFactory(
rulePolicyAdapter,
)

// Populate dedupSkipSet: managers that skip processing when event is duplicate.
// RuleManager checks enrichedEvent.Duplicate internally.
factory.dedupSkipSet[containerProfileAdapter] = struct{}{}
factory.dedupSkipSet[malwareManager] = struct{}{}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return factory
}

// computeEventDedupKey computes a dedup key and TTL for the given event.
// Returns shouldDedup=false for event types that must not be deduplicated.
func computeEventDedupKey(enrichedEvent *events.EnrichedEvent) (key uint64, ttl uint16, shouldDedup bool) {
event := enrichedEvent.Event
mntns := enrichedEvent.MountNamespaceID
if mntns == 0 {
if ee, ok := event.(utils.EnrichEvent); ok {
mntns = ee.GetMountNsID()
}
}

switch event.GetEventType() {
case utils.OpenEventType:
if e, ok := event.(utils.OpenEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeOpenKey(mntns, pid, e.GetPath(), e.GetFlagsRaw()), dedupTTLOpen, true
}
case utils.NetworkEventType:
if e, ok := event.(utils.NetworkEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
dst := e.GetDstEndpoint()
return dedupcache.ComputeNetworkKey(mntns, pid, dst.Addr, e.GetDstPort(), e.GetProto()), dedupTTLNetwork, true
}
case utils.DnsEventType:
if e, ok := event.(utils.DNSEvent); ok {
return dedupcache.ComputeDNSKey(mntns, e.GetDNSName()), dedupTTLDNS, true
}
case utils.CapabilitiesEventType:
if e, ok := event.(utils.CapabilitiesEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeCapabilitiesKey(mntns, pid, e.GetCapability(), e.GetSyscall()), dedupTTLCapabilities, true
}
case utils.HTTPEventType:
if e, ok := event.(utils.HttpEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
req := e.GetRequest()
if req == nil {
return 0, 0, false
}
return dedupcache.ComputeHTTPKey(mntns, pid, string(e.GetDirection()), req.Method, req.Host, req.URL.Path, req.URL.RawQuery), dedupTTLHTTP, true
}
case utils.SSHEventType:
if e, ok := event.(utils.SshEvent); ok {
return dedupcache.ComputeSSHKey(mntns, e.GetDstIP(), e.GetDstPort()), dedupTTLSSH, true
}
case utils.SymlinkEventType:
if e, ok := event.(utils.LinkEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeSymlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLSymlink, true
}
case utils.HardlinkEventType:
if e, ok := event.(utils.LinkEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeHardlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLHardlink, true
}
case utils.PtraceEventType:
if e, ok := event.(utils.PtraceEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputePtraceKey(mntns, pid, e.GetExePath()), dedupTTLPtrace, true
}
case utils.SyscallEventType:
if e, ok := event.(utils.SyscallEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeSyscallKey(mntns, pid, e.GetSyscall()), dedupTTLSyscall, true
}
}
// exec, exit, fork, randomx, kmod, bpf, unshare, iouring — no dedup
return 0, 0, false
}

// ProcessEvent processes an event through all registered handlers
func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent) {
if enrichedEvent.ContainerID == "" {
Expand All @@ -187,6 +308,18 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent
return
}

// Dedup check: compute key and check cache before dispatching to handlers
if ehf.dedupCache != nil {
key, ttl, shouldDedup := computeEventDedupKey(enrichedEvent)
if shouldDedup {
duplicate := ehf.dedupCache.CheckAndSet(key, ttl, enrichedEvent.DedupBucket)
if duplicate {
enrichedEvent.Duplicate = true
}
ehf.metrics.ReportDedupEvent(enrichedEvent.Event.GetEventType(), duplicate)
}
}

// Get handlers for this event type
eventType := enrichedEvent.Event.GetEventType()
handlers, exists := ehf.handlers[eventType]
Expand All @@ -196,6 +329,11 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent

// Process event through each handler
for _, handler := range handlers {
if enrichedEvent.Duplicate {
if _, skip := ehf.dedupSkipSet[handler]; skip {
continue
}
}
if enrichedHandler, ok := handler.(containerwatcher.EnrichedEventReceiver); ok {
enrichedHandler.ReportEnrichedEvent(enrichedEvent)
} else if handler, ok := handler.(containerwatcher.EventReceiver); ok {
Expand Down
Loading
Loading