Skip to content

Commit

Permalink
Replace bolt db (#104)
Browse files Browse the repository at this point in the history
* enable profiler

Signed-off-by: David Wertenteil <[email protected]>

* Update main.go

* modify gh actions

Signed-off-by: David Wertenteil <[email protected]>

* fixed wf

* ermove units

* fixed main

* add build wf

Signed-off-by: David Wertenteil <[email protected]>

* add pprof

Signed-off-by: David Wertenteil <[email protected]>

* remove env

* start profiler on 6060

* add simple db

* fixed lock

* adding mutex per bucket

* remove files after SBOM creation

* clear after filtered sbom creation

* release lock

* adding logs

* fixed lock

* change map to pointer

* add test flow

* commented out otel tracing

Signed-off-by: Amir Malka <[email protected]>

* adding e2e test

* add push

* build only on main

* update tests

* remove pointers

* fixed test file

* do not remove after 2 min

* fixed filepath

* add demo file

* update conf

* fixed container stopped logs

* cleanup code

* cleanup

* rename file handlers

* update logs

---------

Signed-off-by: David Wertenteil <[email protected]>
Signed-off-by: Amir Malka <[email protected]>
Co-authored-by: Amir Malka <[email protected]>
  • Loading branch information
David Wertenteil and amirmalka authored Jul 24, 2023
1 parent 7fcf782 commit 36512a2
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 91 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ require (
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
golang.org/x/sys v0.10.0
k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.3
k8s.io/apimachinery v0.27.4
k8s.io/client-go v0.27.4
)

require (
Expand Down Expand Up @@ -133,8 +133,8 @@ require (
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.27.3 // indirect
k8s.io/cri-api v0.27.3 // indirect
k8s.io/api v0.27.4 // indirect
k8s.io/cri-api v0.27.4 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 // indirect
Expand Down
18 changes: 9 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/s3rj1k/go-fanotify/fanotify v0.0.0-20210917134616-9c00a300bb7a h1:np2nR32/A/VcOG9Hn+IOPA8kMk1gbBzK5LpSsgq5pJI=
github.com/s3rj1k/go-fanotify/fanotify v0.0.0-20210917134616-9c00a300bb7a/go.mod h1:wiP6GQ2T378F+YIyuNw7yXtBxJZR+fqrrn1Z6UHZi0Q=
Expand Down Expand Up @@ -888,14 +888,14 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.27.3 h1:yR6oQXXnUEBWEWcvPWS0jQL575KoAboQPfJAuKNrw5Y=
k8s.io/api v0.27.3/go.mod h1:C4BNvZnQOF7JA/0Xed2S+aUyJSfTGkGFxLXz9MnpIpg=
k8s.io/apimachinery v0.27.3 h1:Ubye8oBufD04l9QnNtW05idcOe9Z3GQN8+7PqmuVcUM=
k8s.io/apimachinery v0.27.3/go.mod h1:XNfZ6xklnMCOGGFNqXG7bUrQCoR04dh/E7FprV6pb+E=
k8s.io/client-go v0.27.3 h1:7dnEGHZEJld3lYwxvLl7WoehK6lAq7GvgjxpA3nv1E8=
k8s.io/client-go v0.27.3/go.mod h1:2MBEKuTo6V1lbKy3z1euEGnhPfGZLKTS9tiJ2xodM48=
k8s.io/cri-api v0.27.3 h1:MkUcz7FMDA/BVSoC0iWI9uFjYG0Pd//gOdPKb4pKasY=
k8s.io/cri-api v0.27.3/go.mod h1:+Ts/AVYbIo04S86XbTD73UPp/DkTiYxtsFeOFEu32L0=
k8s.io/api v0.27.4 h1:0pCo/AN9hONazBKlNUdhQymmnfLRbSZjd5H5H3f0bSs=
k8s.io/api v0.27.4/go.mod h1:O3smaaX15NfxjzILfiln1D8Z3+gEYpjEpiNA/1EVK1Y=
k8s.io/apimachinery v0.27.4 h1:CdxflD4AF61yewuid0fLl6bM4a3q04jWel0IlP+aYjs=
k8s.io/apimachinery v0.27.4/go.mod h1:XNfZ6xklnMCOGGFNqXG7bUrQCoR04dh/E7FprV6pb+E=
k8s.io/client-go v0.27.4 h1:vj2YTtSJ6J4KxaC88P4pMPEQECWMY8gqPqsTgUKzvjk=
k8s.io/client-go v0.27.4/go.mod h1:ragcly7lUlN0SRPk5/ZkGnDjPknzb37TICq07WhI6Xc=
k8s.io/cri-api v0.27.4 h1:OqLsrkRpiEieMcNNqf1WxoMQyzDjOd/zUISrwjS5zAw=
k8s.io/cri-api v0.27.4/go.mod h1:+Ts/AVYbIo04S86XbTD73UPp/DkTiYxtsFeOFEu32L0=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=
Expand Down
12 changes: 9 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"context"
"log"
"net/http"
"net/url"
"node-agent/internal/validator"
"node-agent/pkg/config"
Expand All @@ -18,6 +18,8 @@ import (
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/spf13/afero"

_ "net/http/pprof"
)

func main() {
Expand Down Expand Up @@ -48,8 +50,13 @@ func main() {
logger.L().Ctx(ctx).Fatal("error during validation", helpers.Error(err))
}

if _, present := os.LookupEnv("ENABLE_PROFILER"); present {
logger.L().Info("Starting profiler on port 6060")
go http.ListenAndServe("localhost:6060", nil)
}

// Create the relevancy manager
fileHandler, err := filehandler.CreateBoltFileHandler()
fileHandler, err := filehandler.CreateInMemoryFileHandler()
if err != nil {
logger.L().Ctx(ctx).Fatal("failed to create fileDB", helpers.Error(err))
}
Expand Down Expand Up @@ -81,7 +88,6 @@ func main() {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
<-shutdown
log.Println("Shutting down...")

// Exit with success
os.Exit(0)
Expand Down
9 changes: 2 additions & 7 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
"go.opentelemetry.io/otel"
)

const (
Expand Down Expand Up @@ -60,8 +59,6 @@ func CreateIGContainerWatcher(k8sClient *k8sinterface.KubernetesApi, relevancyMa
}

func (ch *IGContainerWatcher) Start(ctx context.Context) error {
ctx, span := otel.Tracer("").Start(ctx, "IGContainerWatcher.Start")
defer span.End()

ch.relevancyManager.SetContainerHandler(ch)
ch.relevancyManager.StartRelevancyManager(ctx)
Expand Down Expand Up @@ -139,7 +136,7 @@ func (ch *IGContainerWatcher) Start(ctx context.Context) error {
}
if event.Ret > -1 {
ch.workerPool.Submit(func() {
ch.relevancyManager.ReportFileAccess(ctx, event.Namespace, event.Pod, event.Container, event.Path)
ch.relevancyManager.ReportFileAccess(ctx, event.Namespace, event.Pod, event.Container, event.FullPath)
})
}
}
Expand All @@ -166,7 +163,7 @@ func (ch *IGContainerWatcher) Start(ctx context.Context) error {
}

// Create the exec tracer
ch.tracerOpen, err = traceropen.NewTracer(&traceropen.Config{MountnsMap: openMountnsmap}, ch.containerCollection, openEventCallback)
ch.tracerOpen, err = traceropen.NewTracer(&traceropen.Config{MountnsMap: openMountnsmap, FullPath: true}, ch.containerCollection, openEventCallback)
if err != nil {
return fmt.Errorf("error creating tracerOpen: %s\n", err)
}
Expand Down Expand Up @@ -199,8 +196,6 @@ func (ch *IGContainerWatcher) Stop() {
}

func (ch *IGContainerWatcher) UnregisterContainer(ctx context.Context, container *containercollection.Container) {
_, span := otel.Tracer("").Start(ctx, "IGContainerWatcher.UnregisterContainer")
defer span.End()

event := containercollection.PubSubEvent{
Timestamp: time.Now().Format(time.RFC3339),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package filehandler

import "context"
import (
"context"
)

type FileHandler interface {
AddFile(ctx context.Context, bucket, file string) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
bolt "go.etcd.io/bbolt"
"go.opentelemetry.io/otel"
)

type BoltFileHandler struct {
Expand All @@ -26,8 +25,6 @@ func CreateBoltFileHandler() (*BoltFileHandler, error) {
}

func (b BoltFileHandler) AddFile(ctx context.Context, bucket, file string) error {
_, span := otel.Tracer("").Start(ctx, "BoltFileHandler.AddFile")
defer span.End()
return b.fileDB.Batch(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(bucket))
if err != nil {
Expand All @@ -42,8 +39,6 @@ func (b BoltFileHandler) Close() {
}

func (b BoltFileHandler) GetFiles(ctx context.Context, container string) (map[string]bool, error) {
_, span := otel.Tracer("").Start(ctx, "BoltFileHandler.GetFiles")
defer span.End()
fileList := make(map[string]bool)
err := b.fileDB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(container))
Expand All @@ -60,8 +55,6 @@ func (b BoltFileHandler) GetFiles(ctx context.Context, container string) (map[st
}

func (b BoltFileHandler) RemoveBucket(ctx context.Context, bucket string) error {
_, span := otel.Tracer("").Start(ctx, "BoltFileHandler.RemoveBucket")
defer span.End()
return b.fileDB.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket([]byte(bucket))
if err != nil {
Expand Down
106 changes: 106 additions & 0 deletions pkg/filehandler/v1/inmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package filehandler

import (
"context"
"fmt"
"node-agent/pkg/filehandler"
"sync"
)

const initFileListLength = 5000

type InMemoryFileHandler struct {
mutex sync.RWMutex
m map[string]*sync.RWMutex
files map[string]map[string]bool
}

var _ filehandler.FileHandler = (*InMemoryFileHandler)(nil)

func CreateInMemoryFileHandler() (*InMemoryFileHandler, error) {
return &InMemoryFileHandler{
m: make(map[string]*sync.RWMutex),
files: make(map[string]map[string]bool, 20),
}, nil
}

func (s *InMemoryFileHandler) AddFile(ctx context.Context, bucket, file string) error {

// Acquire a read lock first
s.mutex.RLock()
bucketLock, ok := s.m[bucket]
bucketFiles, okF := s.files[bucket]
s.mutex.RUnlock()

// If the bucket doesn't exist, acquire a write lock to create the new bucket
if !ok || !okF {
s.mutex.Lock()
// Double-check the bucket's existence to ensure another goroutine didn't already create it
bucketLock, ok = s.m[bucket]
if !ok {
bucketLock = &sync.RWMutex{}
s.m[bucket] = bucketLock
}

bucketFiles, okF = s.files[bucket]
if !okF {
bucketFiles = make(map[string]bool, initFileListLength)
s.files[bucket] = bucketFiles
}
s.mutex.Unlock()
}

// Acquire a write lock if the bucket already exists
bucketLock.Lock()
defer bucketLock.Unlock()

bucketFiles[file] = true

return nil
}

func (s *InMemoryFileHandler) Close() {
// Nothing to do
}

func shallowCopyMapStringBool(m map[string]bool) map[string]bool {
if m == nil {
return nil
}
mCopy := make(map[string]bool, len(m))
for k, v := range m {
mCopy[k] = v
}
return mCopy
}

func (s *InMemoryFileHandler) GetFiles(ctx context.Context, bucket string) (map[string]bool, error) {
s.mutex.RLock()
bucketLock, ok := s.m[bucket]
bucketFiles, okFiles := s.files[bucket]
s.mutex.RUnlock()

if !ok || !okFiles {
return map[string]bool{}, fmt.Errorf("bucket does not exist for container %s", bucket)
}

bucketLock.RLock()
defer bucketLock.RUnlock()

return shallowCopyMapStringBool(bucketFiles), nil
}
func (s *InMemoryFileHandler) RemoveBucket(ctx context.Context, bucket string) error {

s.mutex.Lock()
bucketLock, ok := s.m[bucket]
if ok {
bucketLock.Lock()
defer bucketLock.Unlock()
}

delete(s.m, bucket)
delete(s.files, bucket)
s.mutex.Unlock()

return nil
}
Loading

0 comments on commit 36512a2

Please sign in to comment.