Skip to content

Commit

Permalink
tetragon: Factor grpc exec events test
Browse files Browse the repository at this point in the history
Replace sleeps with waiting that can be interrupted by event
delivery in notifier.

Adding bigger timeout to prevent flakes on slow machines.

Signed-off-by: Jiri Olsa <[email protected]>
  • Loading branch information
olsajiri committed Sep 26, 2024
1 parent 5c4652c commit a0b0c2a
Showing 1 changed file with 84 additions and 36 deletions.
120 changes: 84 additions & 36 deletions pkg/grpc/exec/exec_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,42 @@ var (
)

type DummyNotifier[EXEC notify.Message, EXIT notify.Message] struct {
t *testing.T
t *testing.T
ch chan bool
}

func NewDummyNotifier[EXEC notify.Message, EXIT notify.Message](t *testing.T) DummyNotifier[EXEC, EXIT] {
ch := make(chan bool)
return DummyNotifier[EXEC, EXIT]{t: t, ch: ch}
}

// Wait for specified number of events from notifier
func (n DummyNotifier[EXEC, EXIT]) WaitNotifier(events int) {
// Leave extra 100ms timeout for slow servers hiccups
ms := time.Duration((option.Config.EventCacheNumRetries + 100) * CacheTimerMs)

ticker := time.NewTicker(time.Millisecond * ms)
defer ticker.Stop()

for {
select {
case <-ticker.C:
return
case <-n.ch:
events--
if events == 0 {
return
}
}
}
}

// Kick from notifier that unblocks one event for WaitNotifier
func (n DummyNotifier[EXEC, EXIT]) KickNotifier() {
select {
case n.ch <- true:
default:
}
}

func (n DummyNotifier[EXEC, EXIT]) AddListener(_ server.Listener) {}
Expand All @@ -71,6 +106,7 @@ func (n DummyNotifier[EXEC, EXIT]) NotifyListener(original interface{}, processe
case EXEC, EXIT:
if processed != nil {
AllEvents = append(AllEvents, processed)
n.KickNotifier()
} else {
n.t.Fatalf("Processed arg is nil in NotifyListener with type %T", v)
}
Expand Down Expand Up @@ -277,21 +313,23 @@ func CreateCloneEvents[CLONE notify.Message, EXIT notify.Message](Pid uint32, Kt
return &cloneMsg, &exitMsg
}

func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, cancelWg *sync.WaitGroup, watcher watcher.K8sResourceWatcher) context.CancelFunc {
func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, cancelWg *sync.WaitGroup,
watcher watcher.K8sResourceWatcher) (context.CancelFunc, DummyNotifier[EXEC, EXIT]) {

ctx, cancel := context.WithCancel(context.Background())

if err := process.InitCache(watcher, 65536); err != nil {
t.Fatalf("failed to call process.InitCache %s", err)
}

dn := DummyNotifier[EXEC, EXIT]{t}
dn := NewDummyNotifier[EXEC, EXIT](t)
dr := rthooks.DummyHookRunner{}
lServer := server.NewServer(ctx, cancelWg, dn, &server.FakeObserver{}, dr)

// Exec cache is always needed to ensure events have an associated Process{}
eventcache.NewWithTimer(lServer, time.Millisecond*CacheTimerMs)

return cancel
return cancel, dn
}

func GetProcessRefcntFromCache(t *testing.T, Pid uint32, Ktime uint64) uint32 {
Expand Down Expand Up @@ -388,7 +426,7 @@ func GrpcExecOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T)

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand All @@ -415,7 +453,7 @@ func GrpcExecOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T)
AllEvents = append(AllEvents, e)
}

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(4)
CheckExecEvents(t, AllEvents, parentPid, currentPid)
}

Expand All @@ -424,7 +462,7 @@ func GrpcExecInOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) {

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -459,7 +497,7 @@ func GrpcExecMisingParent[EXEC notify.Message, EXIT notify.Message](t *testing.T

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand All @@ -474,9 +512,11 @@ func GrpcExecMisingParent[EXEC notify.Message, EXIT notify.Message](t *testing.T
AllEvents = append(AllEvents, e)
}

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(1)

assert.Equal(t, len(AllEvents), 1)
if !assert.Equal(t, 1, len(AllEvents)) {
t.FailNow()
}
execEv := AllEvents[0].GetProcessExec()
assert.NotNil(t, execEv)
assert.Equal(t, GetProcessRefcntFromCache(t, currentPid, 21034975089403), uint32(1))
Expand All @@ -488,7 +528,7 @@ func GrpcMissingExec[EXEC notify.Message, EXIT notify.Message](t *testing.T) {

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand All @@ -503,7 +543,7 @@ func GrpcMissingExec[EXEC notify.Message, EXIT notify.Message](t *testing.T) {
AllEvents = append(AllEvents, e)
}

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(2)

assert.Equal(t, len(AllEvents), 1)
ev := AllEvents[0]
Expand All @@ -522,7 +562,7 @@ func GrpcExecParentOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *testi

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -589,7 +629,7 @@ func GrpcExecCloneInOrder[EXEC notify.Message, CLONE notify.Message, EXIT notify

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -628,7 +668,7 @@ func GrpcExecCloneOutOfOrder[EXEC notify.Message, CLONE notify.Message, EXIT not

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -659,7 +699,7 @@ func GrpcExecCloneOutOfOrder[EXEC notify.Message, CLONE notify.Message, EXIT not
AllEvents = append(AllEvents, e)
}

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(3)

CheckCloneEvents(t, AllEvents, currentPid, clonePid)
}
Expand All @@ -669,7 +709,7 @@ func GrpcParentInOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) {

AllEvents = nil
watcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
cancel, _ := InitEnv[EXEC, EXIT](t, &cancelWg, watcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -737,7 +777,9 @@ func GrpcParentInOrder[EXEC notify.Message, EXIT notify.Message](t *testing.T) {
}

func CheckPodEvents(t *testing.T, events []*tetragon.GetEventsResponse) {
assert.Equal(t, len(events), 2)
if !assert.Equal(t, 2, len(events)) {
t.FailNow()
}

execEv, exitEv := GetEvents(t, events)

Expand All @@ -764,7 +806,7 @@ func GrpcExecPodInfoInOrder[EXEC notify.Message, EXIT notify.Message](t *testing
AllEvents = nil
option.Config.EnableK8s = true // enable Kubernetes
fakeWatcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -793,8 +835,8 @@ func GrpcExecPodInfoInOrder[EXEC notify.Message, EXIT notify.Message](t *testing
AllEvents = append(AllEvents, e)
}

fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return
time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return
dn.WaitNotifier(2) // wait for cache to do it's work
CheckPodEvents(t, AllEvents)
}

Expand All @@ -808,7 +850,7 @@ func GrpcExecPodInfoOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *test
AllEvents = nil
option.Config.EnableK8s = true // enable Kubernetes
fakeWatcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -838,7 +880,7 @@ func GrpcExecPodInfoOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *test
}

fakeWatcher.AddPod(dummyPod)
time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(2) // wait for cache to do it's work
CheckPodEvents(t, AllEvents)
}

Expand All @@ -855,7 +897,7 @@ func GrpcExecPodInfoInOrderAfter[EXEC notify.Message, EXIT notify.Message](t *te
AllEvents = nil
option.Config.EnableK8s = true // enable Kubernetes
fakeWatcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -886,7 +928,7 @@ func GrpcExecPodInfoInOrderAfter[EXEC notify.Message, EXIT notify.Message](t *te
AllEvents = append(AllEvents, e)
}

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(2) // wait for cache to do it's work
CheckPodEvents(t, AllEvents)
}

Expand All @@ -903,7 +945,7 @@ func GrpcExecPodInfoOutOfOrderAfter[EXEC notify.Message, EXIT notify.Message](t
AllEvents = nil
option.Config.EnableK8s = true // enable Kubernetes
fakeWatcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -933,7 +975,7 @@ func GrpcExecPodInfoOutOfOrderAfter[EXEC notify.Message, EXIT notify.Message](t
AllEvents = append(AllEvents, e)
}

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(2) // wait for cache to do it's work
CheckPodEvents(t, AllEvents)
}

Expand All @@ -948,7 +990,7 @@ func GrpcExecPodInfoDelayedOutOfOrder[EXEC notify.Message, EXIT notify.Message](
AllEvents = nil
option.Config.EnableK8s = true // enable Kubernetes
fakeWatcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -979,11 +1021,13 @@ func GrpcExecPodInfoDelayedOutOfOrder[EXEC notify.Message, EXIT notify.Message](

time.Sleep(time.Millisecond * (5 * CacheTimerMs)) // wait for cache to do it's work (but less than eventcache.CacheStrikes iterations)

assert.Equal(t, len(AllEvents), 0) // here we should still not have any events as we don't have the podinfo yet
if !assert.Equal(t, 0, len(AllEvents)) { // here we should still not have any events as we don't have the podinfo yet
t.FailNow()
}

fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(2) // wait for cache to do it's work

CheckPodEvents(t, AllEvents)
}
Expand All @@ -997,7 +1041,7 @@ func GrpcExecPodInfoDelayedInOrder[EXEC notify.Message, EXIT notify.Message](t *
AllEvents = nil
option.Config.EnableK8s = true // enable Kubernetes
fakeWatcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -1028,11 +1072,13 @@ func GrpcExecPodInfoDelayedInOrder[EXEC notify.Message, EXIT notify.Message](t *

time.Sleep(time.Millisecond * (5 * CacheTimerMs)) // wait for cache to do it's work (but less than eventcache.CacheStrikes iterations)

assert.Equal(t, len(AllEvents), 0) // here we should still not have any events as we don't have the podinfo yet
if !assert.Equal(t, 0, len(AllEvents)) { // here we should still not have any events as we don't have the podinfo yet
t.FailNow()
}

fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(2) // wait for cache to do it's work

CheckPodEvents(t, AllEvents)
}
Expand All @@ -1045,7 +1091,7 @@ func GrpcDelayedExecK8sOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *t
AllEvents = nil
option.Config.EnableK8s = true // enable Kubernetes
fakeWatcher := watcher.NewFakeK8sWatcher(nil)
cancel := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
cancel, dn := InitEnv[EXEC, EXIT](t, &cancelWg, fakeWatcher)
defer func() {
cancel()
cancelWg.Wait()
Expand Down Expand Up @@ -1073,13 +1119,15 @@ func GrpcDelayedExecK8sOutOfOrder[EXEC notify.Message, EXIT notify.Message](t *t
fakeWatcher.AddPod(dummyPod) // setup some dummy pod to return

time.Sleep(time.Millisecond * (5 * CacheTimerMs)) // wait for cache to do it's work (but less than eventcache.CacheStrikes iterations)
assert.Equal(t, len(AllEvents), 0) // here we should still not have any events as we don't have the podinfo yet
if !assert.Equal(t, len(AllEvents), 0) { // here we should still not have any events as we don't have the podinfo yet
t.FailNow()
}

if e := (*execMsg).HandleMessage(); e != nil {
AllEvents = append(AllEvents, e)
}

time.Sleep(time.Millisecond * time.Duration((option.Config.EventCacheNumRetries+4)*CacheTimerMs)) // wait for cache to do it's work
dn.WaitNotifier(2) // wait for cache to do it's work

CheckPodEvents(t, AllEvents)
}

0 comments on commit a0b0c2a

Please sign in to comment.