From 7bcb7625c31cfd7be0708307280d39554a06a198 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Mon, 7 Oct 2024 19:37:50 -0300 Subject: [PATCH 1/4] fix(player): use skip idle flag and adjust max value --- lib/player/player.go | 26 ++++++++++++++++---------- lib/player/player_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/lib/player/player.go b/lib/player/player.go index fa52f790d8611..65ed5b0422970 100644 --- a/lib/player/player.go +++ b/lib/player/player.go @@ -81,7 +81,12 @@ type Player struct { translator sessionPrintTranslator } -const normalPlayback = math.MinInt64 +const ( + normalPlayback = math.MinInt64 + // MaxIdleTimeMilliseconds defines the max idle time when skipping idle + // periods on the recording. + MaxIdleTimeMilliseconds = 500.0 +) // Streamer is the underlying streamer that provides // access to recorded session events. @@ -134,14 +139,15 @@ func New(cfg *Config) (*Player, error) { } p := &Player{ - clock: clk, - log: log, - sessionID: cfg.SessionID, - streamer: cfg.Streamer, - emit: make(chan events.AuditEvent, 1024), - playPause: make(chan chan struct{}, 1), - wake: make(chan int64), - done: make(chan struct{}), + clock: clk, + log: log, + sessionID: cfg.SessionID, + streamer: cfg.Streamer, + emit: make(chan events.AuditEvent, 1024), + playPause: make(chan chan struct{}, 1), + wake: make(chan int64), + done: make(chan struct{}), + skipIdleTime: cfg.SkipIdleTime, } p.speed.Store(float64(defaultPlaybackSpeed)) @@ -340,7 +346,7 @@ loop: speed := p.speed.Load().(float64) scaled := float64(currentDelay-lastDelay) / speed if p.skipIdleTime { - scaled = min(scaled, 500.0*float64(time.Millisecond)) + scaled = min(scaled, MaxIdleTimeMilliseconds) } timer := p.clock.NewTimer(time.Duration(scaled) * time.Millisecond) diff --git a/lib/player/player_test.go b/lib/player/player_test.go index 836b58a506f89..ff56f94aa0c38 100644 --- a/lib/player/player_test.go +++ b/lib/player/player_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" apievents "github.com/gravitational/teleport/api/types/events" @@ -321,6 +322,34 @@ func TestUseDatabaseTranslator(t *testing.T) { }) } +func TestSkipIdlePeriods(t *testing.T) { + eventCount := 3 + delayMilliseconds := 60000 + clk := clockwork.NewFakeClock() + p, err := player.New(&player.Config{ + Clock: clk, + SessionID: "test-session", + SkipIdleTime: true, + Streamer: &simpleStreamer{count: int64(eventCount), delay: int64(delayMilliseconds)}, + }) + require.NoError(t, err) + require.NoError(t, p.Play()) + + for i := range eventCount { + // Consume events in an eventually loop to avoid firing the clock + // events before the timer is set. + require.EventuallyWithT(t, func(t *assert.CollectT) { + clk.Advance(time.Duration(player.MaxIdleTimeMilliseconds) * time.Millisecond) + select { + case evt := <-p.C(): + assert.Equal(t, int64(i), evt.GetIndex()) + default: + assert.Fail(t, "expected to receive event after short period, but got nothing") + } + }, time.Second, 100*time.Millisecond) + } +} + // simpleStreamer streams a fake session that contains // count events, emitted at a particular interval type simpleStreamer struct { From 7c57a6efd128f8a467dde9ad3ac9f7dc84e96bca Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Mon, 7 Oct 2024 20:40:34 -0300 Subject: [PATCH 2/4] test(player): increase timeout --- lib/player/player_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/player/player_test.go b/lib/player/player_test.go index ff56f94aa0c38..8290b2ad56f09 100644 --- a/lib/player/player_test.go +++ b/lib/player/player_test.go @@ -346,7 +346,7 @@ func TestSkipIdlePeriods(t *testing.T) { default: assert.Fail(t, "expected to receive event after short period, but got nothing") } - }, time.Second, 100*time.Millisecond) + }, 3*time.Second, 100*time.Millisecond) } } From 5915feac28b5f9f62edd16082354e6d0fe9c1061 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Mon, 4 Nov 2024 10:10:24 -0300 Subject: [PATCH 3/4] refactor(player): use time.Duration instead of float64 for timings --- lib/player/player.go | 44 +++++++++++++++++++-------------------- lib/player/player_test.go | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/lib/player/player.go b/lib/player/player.go index 65ed5b0422970..5c8e875281bba 100644 --- a/lib/player/player.go +++ b/lib/player/player.go @@ -62,7 +62,7 @@ type Player struct { advanceTo atomic.Int64 emit chan events.AuditEvent - wake chan int64 + wake chan time.Duration done chan struct{} // playPause holds a channel to be closed when @@ -82,10 +82,10 @@ type Player struct { } const ( - normalPlayback = math.MinInt64 - // MaxIdleTimeMilliseconds defines the max idle time when skipping idle + normalPlayback = time.Duration(0) + // MaxIdleTime defines the max idle time when skipping idle // periods on the recording. - MaxIdleTimeMilliseconds = 500.0 + MaxIdleTime = 500 * time.Millisecond ) // Streamer is the underlying streamer that provides @@ -145,13 +145,13 @@ func New(cfg *Config) (*Player, error) { streamer: cfg.Streamer, emit: make(chan events.AuditEvent, 1024), playPause: make(chan chan struct{}, 1), - wake: make(chan int64), + wake: make(chan time.Duration), done: make(chan struct{}), skipIdleTime: cfg.SkipIdleTime, } p.speed.Store(float64(defaultPlaybackSpeed)) - p.advanceTo.Store(normalPlayback) + p.advanceTo.Store(normalPlayback.Milliseconds()) // start in a paused state p.playPause <- make(chan struct{}) @@ -189,7 +189,7 @@ func (p *Player) stream() { defer cancel() eventsC, errC := p.streamer.StreamSessionEvents(ctx, p.sessionID, 0) - lastDelay := int64(0) + lastDelay := time.Duration(0) for { select { case <-p.done: @@ -221,7 +221,7 @@ func (p *Player) stream() { currentDelay := getDelay(evt) if currentDelay > 0 && currentDelay >= lastDelay { - switch adv := p.advanceTo.Load(); { + switch adv := time.Duration(p.advanceTo.Load()) * time.Millisecond; { case adv >= currentDelay: // no timing delay necessary, we are fast forwarding break @@ -229,12 +229,12 @@ func (p *Player) stream() { // any negative value other than normalPlayback means // we rewind (by restarting the stream and seeking forward // to the rewind point) - p.advanceTo.Store(adv * -1) + p.advanceTo.Store(adv.Milliseconds() * -1) go p.stream() return default: if adv != normalPlayback { - p.advanceTo.Store(normalPlayback) + p.advanceTo.Store(normalPlayback.Milliseconds()) // we're catching back up to real time, so the delay // is calculated not from the last event but from the @@ -262,7 +262,7 @@ func (p *Player) stream() { // // TODO: consider a select with a timeout to detect blocked readers? p.emit <- evt - p.lastPlayed.Store(currentDelay) + p.lastPlayed.Store(currentDelay.Milliseconds()) } } } @@ -321,7 +321,7 @@ func (p *Player) SetPos(d time.Duration) error { // try to wake up the player if it's waiting to emit an event select { - case p.wake <- d.Milliseconds(): + case p.wake <- d: default: } @@ -338,18 +338,18 @@ func (p *Player) SetPos(d time.Duration) error { // // A nil return value indicates that the delay has elapsed and that // the next even can be emitted. -func (p *Player) applyDelay(lastDelay, currentDelay int64) error { +func (p *Player) applyDelay(lastDelay, currentDelay time.Duration) error { loop: for { // TODO(zmb3): changing play speed during a long sleep // will not apply until after the sleep completes speed := p.speed.Load().(float64) - scaled := float64(currentDelay-lastDelay) / speed + scaled := time.Duration(float64(currentDelay-lastDelay) / speed) if p.skipIdleTime { - scaled = min(scaled, MaxIdleTimeMilliseconds) + scaled = min(scaled, MaxIdleTime) } - timer := p.clock.NewTimer(time.Duration(scaled) * time.Millisecond) + timer := p.clock.NewTimer(scaled) defer timer.Stop() start := time.Now() @@ -363,7 +363,7 @@ loop: case newPos == interruptForPause: // the user paused playback while we were waiting to emit the next event: // 1) figure out much of the sleep we completed - dur := float64(time.Since(start).Milliseconds()) * speed + dur := time.Duration(float64(time.Since(start)) * speed) // 2) wait here until the user resumes playback if err := p.waitWhilePaused(); errors.Is(err, errSeekWhilePaused) { @@ -375,7 +375,7 @@ loop: // now that we're playing again, update our delay to account // for the portion that was already satisfied and apply the // remaining delay - lastDelay += int64(dur) + lastDelay += dur timer.Stop() continue loop case newPos > currentDelay: @@ -496,13 +496,13 @@ var databaseTranslators = map[string]newSessionPrintTranslatorFunc{ // player. var SupportedDatabaseProtocols = maps.Keys(databaseTranslators) -func getDelay(e events.AuditEvent) int64 { +func getDelay(e events.AuditEvent) time.Duration { switch x := e.(type) { case *events.DesktopRecording: - return x.DelayMilliseconds + return time.Duration(x.DelayMilliseconds) * time.Millisecond case *events.SessionPrint: - return x.DelayMilliseconds + return time.Duration(x.DelayMilliseconds) * time.Millisecond default: - return int64(0) + return time.Duration(0) } } diff --git a/lib/player/player_test.go b/lib/player/player_test.go index 8290b2ad56f09..8ede201a4c3f0 100644 --- a/lib/player/player_test.go +++ b/lib/player/player_test.go @@ -339,7 +339,7 @@ func TestSkipIdlePeriods(t *testing.T) { // Consume events in an eventually loop to avoid firing the clock // events before the timer is set. require.EventuallyWithT(t, func(t *assert.CollectT) { - clk.Advance(time.Duration(player.MaxIdleTimeMilliseconds) * time.Millisecond) + clk.Advance(player.MaxIdleTime) select { case evt := <-p.C(): assert.Equal(t, int64(i), evt.GetIndex()) From 3810f9a0c423316f5dc9799879abcaed21b5eef1 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Mon, 4 Nov 2024 12:05:06 -0300 Subject: [PATCH 4/4] refactor(player): store duration values in nanoseconds --- lib/client/api.go | 6 ++---- lib/player/player.go | 20 ++++++++++---------- lib/player/player_test.go | 2 +- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/lib/client/api.go b/lib/client/api.go index a3f33038d0f8d..c4e2d3f916c0c 100644 --- a/lib/client/api.go +++ b/lib/client/api.go @@ -2267,13 +2267,11 @@ func playSession(ctx context.Context, sessionID string, speed float64, streamer } playing = !playing case keyLeft, keyDown: - current := time.Duration(player.LastPlayed() * int64(time.Millisecond)) - player.SetPos(max(current-skipDuration, 0)) // rewind + player.SetPos(max(player.LastPlayed()-skipDuration, 0)) // rewind term.Clear() term.SetCursorPos(1, 1) case keyRight, keyUp: - current := time.Duration(player.LastPlayed() * int64(time.Millisecond)) - player.SetPos(current + skipDuration) // advance forward + player.SetPos(player.LastPlayed() + skipDuration) // advance forward } } }() diff --git a/lib/player/player.go b/lib/player/player.go index 5c8e875281bba..d29bacc17acbe 100644 --- a/lib/player/player.go +++ b/lib/player/player.go @@ -151,7 +151,7 @@ func New(cfg *Config) (*Player, error) { } p.speed.Store(float64(defaultPlaybackSpeed)) - p.advanceTo.Store(normalPlayback.Milliseconds()) + p.advanceTo.Store(int64(normalPlayback)) // start in a paused state p.playPause <- make(chan struct{}) @@ -189,7 +189,7 @@ func (p *Player) stream() { defer cancel() eventsC, errC := p.streamer.StreamSessionEvents(ctx, p.sessionID, 0) - lastDelay := time.Duration(0) + var lastDelay time.Duration for { select { case <-p.done: @@ -221,7 +221,7 @@ func (p *Player) stream() { currentDelay := getDelay(evt) if currentDelay > 0 && currentDelay >= lastDelay { - switch adv := time.Duration(p.advanceTo.Load()) * time.Millisecond; { + switch adv := time.Duration(p.advanceTo.Load()); { case adv >= currentDelay: // no timing delay necessary, we are fast forwarding break @@ -229,12 +229,12 @@ func (p *Player) stream() { // any negative value other than normalPlayback means // we rewind (by restarting the stream and seeking forward // to the rewind point) - p.advanceTo.Store(adv.Milliseconds() * -1) + p.advanceTo.Store(int64(adv) * -1) go p.stream() return default: if adv != normalPlayback { - p.advanceTo.Store(normalPlayback.Milliseconds()) + p.advanceTo.Store(int64(normalPlayback)) // we're catching back up to real time, so the delay // is calculated not from the last event but from the @@ -262,7 +262,7 @@ func (p *Player) stream() { // // TODO: consider a select with a timeout to detect blocked readers? p.emit <- evt - p.lastPlayed.Store(currentDelay.Milliseconds()) + p.lastPlayed.Store(int64(currentDelay)) } } } @@ -314,10 +314,10 @@ func (p *Player) SetPos(d time.Duration) error { if d == 0 { d = 1 * time.Millisecond } - if d.Milliseconds() < p.lastPlayed.Load() { + if d < time.Duration(p.lastPlayed.Load()) { d = -1 * d } - p.advanceTo.Store(d.Milliseconds()) + p.advanceTo.Store(int64(d)) // try to wake up the player if it's waiting to emit an event select { @@ -460,8 +460,8 @@ func (p *Player) waitWhilePaused() error { // LastPlayed returns the time of the last played event, // expressed as milliseconds since the start of the session. -func (p *Player) LastPlayed() int64 { - return p.lastPlayed.Load() +func (p *Player) LastPlayed() time.Duration { + return time.Duration(p.lastPlayed.Load()) } // translateEvent translates events if applicable and return if they should be diff --git a/lib/player/player_test.go b/lib/player/player_test.go index 8ede201a4c3f0..83fac3bb32d97 100644 --- a/lib/player/player_test.go +++ b/lib/player/player_test.go @@ -170,7 +170,7 @@ func TestClose(t *testing.T) { _, ok := <-p.C() require.False(t, ok, "player channel should have been closed") require.NoError(t, p.Err()) - require.Equal(t, int64(1000), p.LastPlayed()) + require.Equal(t, time.Second, p.LastPlayed()) } func TestSeekForward(t *testing.T) {