Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
sniffer
temp
.vscode
.vscode
resources/ebpf/falco/*
7 changes: 4 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ import (
)

const (
FALCO_EBPF_ENGINE = "falco"
CILIUM_EBPF_ENGINE = "cilium"
EBPFEngineFalco = "falco"
Comment thread
rcohencyberarmor marked this conversation as resolved.
EBPFEngineCilium = "cilium"
CONFIG_ENV_VAR = "CONFIG_ENV_VAR"
Comment thread
rcohencyberarmor marked this conversation as resolved.
)

type Config struct {
data ConfigDataInterface
}

func (cfg *Config) getConfigFilePath() (string, bool) {
return os.LookupEnv("SNIFFER_CONFIG")
return os.LookupEnv(CONFIG_ENV_VAR)
}

func (cfg *Config) GetConfigurationReader() (io.Reader, error) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package config

import (
"os"
"path"
v1 "sniffer/pkg/config/v1"
"sniffer/pkg/utils"
"testing"
)

func TestConfig(t *testing.T) {
err := os.Setenv("SNIFFER_CONFIG", "../../configuration/ConfigurationFile.json")
configPath := path.Join(utils.CurrentDir(), "..", "..", "configuration", "ConfigurationFile.json")
err := os.Setenv(CONFIG_ENV_VAR, configPath)
if err != nil {
t.Fatalf("failed to set env SNIFFER_CONFIG with err %v", err)
t.Fatalf("failed to set env %s with err %v", CONFIG_ENV_VAR, err)
}

cfg := GetConfigurationConfigContext()
Expand Down
35,394 changes: 35,394 additions & 0 deletions pkg/config/testdata/mock_falco_ebpf_engine/kernel_obj_mock.o

Large diffs are not rendered by default.

Binary file not shown.
13 changes: 11 additions & 2 deletions pkg/config/v1/config_data_falco_mock.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package config

import (
"path"
"sniffer/pkg/utils"
)

type ConfigDataFalcoMock struct {
}

func CreateFalcoMockConfigData() *ConfigDataFalcoMock {
return &ConfigDataFalcoMock{}
}

func (c *ConfigDataFalcoMock) IsFalcoEbpfEngine() bool {
return true
}
Expand All @@ -12,9 +21,9 @@ func (c *ConfigDataFalcoMock) GetFalcoSyscallFilter() []string {
}

func (c *ConfigDataFalcoMock) GetFalcoKernelObjPath() string {
return "./../../resources/ebpf/mock_falco_ebpf_engine/kernel_obj_mock.o"
return path.Join(utils.CurrentDir(), "..", "testdata", "mock_falco_ebpf_engine", "kernel_obj_mock.o")
}

func (c *ConfigDataFalcoMock) GetEbpfEngineLoaderPath() string {
return "./../../resources/ebpf/mock_falco_ebpf_engine/userspace_app_mock"
return path.Join(utils.CurrentDir(), "..", "testdata", "mock_falco_ebpf_engine", "userspace_app_mock")
}
265 changes: 265 additions & 0 deletions pkg/event_data_storage/accumulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
package accumulator

import (
"fmt"
"strings"
"sync"
"time"

"sniffer/pkg/config"
"sniffer/pkg/ebpfeng"
evData "sniffer/pkg/ebpfev/v1"

logger "github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
)

const (
AccumulatorSize = 10
DROP_EVENT_OCCURRED = "drop event occurred\n"
Comment thread
rcohencyberarmor marked this conversation as resolved.
Outdated
)

type containersEventStreamer struct {
streamDataChannelForContainerID map[string]chan evData.EventData
registerMutex sync.Mutex
}

type Accumulator struct {
data []map[string][]evData.EventData
startStatus bool
syncReaderWriterData *sync.RWMutex
Comment thread
rcohencyberarmor marked this conversation as resolved.
Outdated
listOfFirstKeysInsertInEachSlot []string
cacheSize int
eventChannel chan *evData.EventData
containersData containersEventStreamer
ebpfEngine ebpfeng.EbpfEngineClient
}

type ContainerAccumulator struct {
dataChannel chan evData.EventData
containerID string
}

var nodeAgentContainerID string
var accumulatorInstance *Accumulator
var accumulatorInstanceLock = &sync.Mutex{}

func newAccumulator() *Accumulator {
accumulatorInstance = &Accumulator{
cacheSize: AccumulatorSize,
startStatus: false,
syncReaderWriterData: &sync.RWMutex{},
data: make([]map[string][]evData.EventData, AccumulatorSize),
listOfFirstKeysInsertInEachSlot: make([]string, AccumulatorSize),
eventChannel: make(chan *evData.EventData),
containersData: containersEventStreamer{
streamDataChannelForContainerID: make(map[string]chan evData.EventData),
},
}

return accumulatorInstance
}

func GetAccumulator() *Accumulator {
if accumulatorInstance == nil {
accumulatorInstanceLock.Lock()
defer accumulatorInstanceLock.Unlock()
if accumulatorInstance == nil {
logger.L().Debug("Creating accumulatorInstance now.")
accumulatorInstance = newAccumulator()
}
}

return accumulatorInstance
}

func CreateContainerAccumulator(containerID string, dataChannel chan evData.EventData) *ContainerAccumulator {
return &ContainerAccumulator{
dataChannel: dataChannel,
containerID: containerID,
}
}

func (acc *Accumulator) createNewSlotInIndex(event *evData.EventData, index int) {
Comment thread
rcohencyberarmor marked this conversation as resolved.
acc.syncReaderWriterData.Lock()
defer acc.syncReaderWriterData.Unlock()
slice := make([]evData.EventData, 0)
m := make(map[string][]evData.EventData)
m[event.GetEventContainerID()] = slice
acc.data[index] = m
acc.listOfFirstKeysInsertInEachSlot[index] = event.GetEventContainerID()
}

func (acc *Accumulator) getFirstTimestamp() (time.Time, error) {
if len(acc.data) < 1 {
return time.Time{}, fmt.Errorf("getFirstTimestamp failed the slice data has no members")
}
if len(acc.listOfFirstKeysInsertInEachSlot) < 1 {
return time.Time{}, fmt.Errorf("getFirstTimestamp failed the slice that store the first data key in the accumulator has no members")
}
if len(acc.data[0][acc.listOfFirstKeysInsertInEachSlot[0]]) < 1 {
return time.Time{}, fmt.Errorf("getFirstTimestamp failed the slice of events in the accumulator has no members")
}
return acc.data[0][acc.listOfFirstKeysInsertInEachSlot[0]][0].GetEventTimestamp(), nil
}

func (acc *Accumulator) findIndexByTimestampWhenAccumulatorDataIsFull(event *evData.EventData) (int, bool, error) {
index := 0
minTimestamp, err := acc.getFirstTimestamp()
if err != nil {
return -1, false, err
}
for i := 1; i < len(acc.data); i++ {
if acc.data[i][acc.listOfFirstKeysInsertInEachSlot[i]][0].GetEventTimestamp().Before(minTimestamp) {
minTimestamp = acc.data[i][acc.listOfFirstKeysInsertInEachSlot[i]][0].GetEventTimestamp()
index = i
}
}
return index, true, nil
}

func (acc *Accumulator) findIndexByTimestamp(event *evData.EventData) (int, bool, error) {
for i := range acc.data {
if len(acc.data[i]) == 0 {
return i, true, nil
}
if i < len(acc.listOfFirstKeysInsertInEachSlot) {
firstKey := acc.listOfFirstKeysInsertInEachSlot[i]
if event.GetEventTimestamp().Sub((acc.data[i])[firstKey][0].GetEventTimestamp()) < time.Second {
return i, false, nil
}
} else {
return -1, false, fmt.Errorf("findIndexByTimestamp: trying to access slice of first accumulator keys to index out of range")
}
}
return acc.findIndexByTimestampWhenAccumulatorDataIsFull(event)
}

func (acc *Accumulator) removeAllStreamedContainers(event *evData.EventData) {
acc.containersData.registerMutex.Lock()
defer acc.containersData.registerMutex.Unlock()
if len(acc.containersData.streamDataChannelForContainerID) > 0 {
for contID := range acc.containersData.streamDataChannelForContainerID {
acc.containersData.streamDataChannelForContainerID[contID] <- *event
}
}
}

func (acc *Accumulator) addEventToCache(event *evData.EventData, index int) {
acc.syncReaderWriterData.Lock()
defer acc.syncReaderWriterData.Unlock()
acc.data[index][event.GetEventContainerID()] = append(acc.data[index][event.GetEventContainerID()], *event)
}

func (acc *Accumulator) streamEventToRegisterContainer(event *evData.EventData) {
acc.containersData.registerMutex.Lock()
defer acc.containersData.registerMutex.Unlock()
if containerAccumulatorChan, exist := acc.containersData.streamDataChannelForContainerID[event.GetEventContainerID()]; exist {
containerAccumulatorChan <- *event
}
}

/*
accumulateEbpfEngineData get events from the ebpf engine and insert them into 2 place in memory:
1. store event in the accumulator (the accumulator has a memory of the last 10 seconds of events - order'd by containerIDs)
2. stream the event into channel of any new container
*/

func (acc *Accumulator) accumulateEbpfEngineData() {
for {
event := <-acc.eventChannel
if nodeAgentContainerID != "" && strings.Contains(event.GetEventContainerID(), nodeAgentContainerID) {
continue
}
if event != nil {
if event.GetEventCMD() == DROP_EVENT_OCCURRED {
acc.removeAllStreamedContainers(event)
} else {
index, newSlotIsNeeded, err := acc.findIndexByTimestamp(event)
if err != nil {
logger.L().Warning("findIndexByTimestamp fail to find the index to insert the event, fail with error", helpers.Error(err))
logger.L().Warning("event that didn't store ", helpers.String("", fmt.Sprintf("%v", event)))
continue
}
if newSlotIsNeeded {
acc.createNewSlotInIndex(event, index)
}
acc.addEventToCache(event, index)
acc.streamEventToRegisterContainer(event)
}
}
}
}

func (acc *Accumulator) getEbpfEngineData() {
acc.ebpfEngine.GetData(acc.eventChannel)
}

func (acc *Accumulator) getEbpfEngineError(errChan chan error) {
errChan <- acc.ebpfEngine.GetEbpfEngineError()
}

func (acc *Accumulator) StartAccumulator(errChan chan error) error {
if !accumulatorInstance.startStatus {
accumulatorInstanceLock.Lock()
defer accumulatorInstanceLock.Unlock()
if !accumulatorInstance.startStatus {
accumulatorInstance.startStatus = true
falcoEbpfEngine := ebpfeng.CreateFalcoEbpfEngine(config.GetConfigurationConfigContext().GetSyscallFilter(), false, false, "")
acc.ebpfEngine = falcoEbpfEngine

err := acc.ebpfEngine.StartEbpfEngine()
if err != nil {
logger.L().Error("fail to create ebpf engine %v", helpers.Error(err))
return err
}

go acc.accumulateEbpfEngineData()
go acc.getEbpfEngineData()
go acc.getEbpfEngineError(errChan)
}
}
return nil
}

func (acc *ContainerAccumulator) registerContainerAccumulator() {
accumulatorInstance.containersData.registerMutex.Lock()
defer accumulatorInstance.containersData.registerMutex.Unlock()
accumulatorInstance.containersData.streamDataChannelForContainerID[acc.containerID] = acc.dataChannel
}

func (acc *ContainerAccumulator) unregisterContainerAccumulator() {
accumulatorInstance.containersData.registerMutex.Lock()
defer accumulatorInstance.containersData.registerMutex.Unlock()
delete(accumulatorInstance.containersData.streamDataChannelForContainerID, acc.containerID)
}

func (acc *ContainerAccumulator) StartContainerAccumulator() {
acc.registerContainerAccumulator()
}

func (acc *ContainerAccumulator) StopContainerAccumulator() {
acc.unregisterContainerAccumulator()
}

func GetCacheAccumulator() *Accumulator {
return accumulatorInstance
}

func AccumulatorByContainerID(aggregationData *[]evData.EventData, containerID string) {
accumulatorInstance.syncReaderWriterData.Lock()
defer accumulatorInstance.syncReaderWriterData.Unlock()
for i := range accumulatorInstance.data {
logger.L().Debug("", helpers.String("data in index ", fmt.Sprintf("%d:%v", i, accumulatorInstance.data[i])))
}
for i := range accumulatorInstance.data {
for j := range accumulatorInstance.data[i][containerID] {
*aggregationData = append(*aggregationData, accumulatorInstance.data[i][containerID][j])
}
}
logger.L().Debug("full aggregation data ", helpers.String("of containerID ", fmt.Sprintf("%s is: : aggregationData %v ", containerID, aggregationData)))
}

func SetMyContainerID(mycid string) {
nodeAgentContainerID = mycid
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package accumulator

Comment thread
rcohencyberarmor marked this conversation as resolved.
type AcccumulatorClient interface {
type AccumulatorClient interface {
// this function StartAccumulator will store the data from the ebpf engine
StartAccumulator() error
GetAccumulator() error
StartContainerAccumulator() error
StopContainerAccumulator() error
}
Loading