Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 123 additions & 20 deletions lib/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Player struct {
advanceTo atomic.Int64

emit chan events.AuditEvent
wake chan int64
done chan struct{}

// playPause holds a channel to be closed when
Expand Down Expand Up @@ -119,6 +120,7 @@ func New(cfg *Config) (*Player, error) {
streamer: cfg.Streamer,
emit: make(chan events.AuditEvent, 1024),
playPause: make(chan chan struct{}, 1),
wake: make(chan int64),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -207,7 +209,13 @@ func (p *Player) stream() {
// time we were advanced to
lastDelay = adv
}
if err := p.applyDelay(time.Duration(currentDelay-lastDelay) * time.Millisecond); err != nil {

switch err := p.applyDelay(lastDelay, currentDelay); {
case errors.Is(err, errSeekWhilePaused):
p.log.Debug("seeked during pause, will restart stream")
go p.stream()
return
case err != nil:
close(p.emit)
return
}
Expand Down Expand Up @@ -269,32 +277,113 @@ func (p *Player) Play() error {
// from the beginning. A duration greater than the length of the session
// will cause playback to rapidly advance to the end of the recording.
func (p *Player) SetPos(d time.Duration) error {
// we use a negative value to indicate rewinding, which means we can't
// rewind to position 0 (there is no negative 0)
if d == 0 {
d = 1 * time.Millisecond
}
if d.Milliseconds() < p.lastPlayed.Load() {
// if we're rewinding we store a negative value
d = -1 * d
}
p.advanceTo.Store(d.Milliseconds())

// try to wake up the player if it's waiting to emit an event
select {
case p.wake <- d.Milliseconds():
default:
}

return nil
}

// applyDelay "sleeps" for d in a manner that
// can be canceled
func (p *Player) applyDelay(d time.Duration) error {
scaled := float64(d) / p.speed.Load().(float64)
select {
case <-p.done:
return errClosed
case <-p.clock.After(time.Duration(scaled)):
return nil
// applyDelay applies the timing delay between the last emitted event
// (lastDelay) and the event that will be emitted next (currentDelay).
//
// The delay can be interrupted by:
// 1. The player being closed.
// 2. The user pausing playback.
// 3. The user seeking to a new position in the playback (SetPos)
//
// A nil return value indicates that the delay has elapsed and that
// the next even can be emitted.
func (p *Player) applyDelay(lastDelay, currentDelay int64) error {
loop:
for {
// TODO(zmb3): changing play speed during a long sleep
// will not apply until after the sleep completes
speed := p.speed.Load().(float64)
scaled := float64(currentDelay-lastDelay) / speed

timer := p.clock.NewTimer(time.Duration(scaled) * time.Millisecond)
defer timer.Stop()

start := time.Now()

select {
case <-p.done:
return errClosed
case newPos := <-p.wake:
// the sleep was interrupted due to the user changing playback controls
switch {
case newPos == interruptForPause:
// the user paused playback while we were waiting to emit the next event:
// 1) figure out much of the sleep we completed
dur := float64(time.Since(start).Milliseconds()) * speed

// 2) wait here until the user resumes playback
if err := p.waitWhilePaused(); errors.Is(err, errSeekWhilePaused) {
// the user changed the playback position, so consider the delay
// applied and let the player pick up from the new position
return errSeekWhilePaused
}

// now that we're playing again, update our delay to account
// for the portion that was already satisfied and apply the
// remaining delay
lastDelay += int64(dur)
timer.Stop()
continue loop
case newPos > currentDelay:
// the user scrubbed forward in time past the current event,
// so we can return as if the delay has elapsed naturally
return nil
case newPos < 0:
// the user has rewinded playback, which means we need to restart
// the stream and can consider this delay as having elapsed naturally
return nil
case newPos < currentDelay:
// the user has scrubbed forward in time, but not enough to
// emit the next event - we need to delay more
lastDelay = newPos
timer.Stop()
continue loop
default:
return nil
}

case <-timer.Chan():
return nil
}
}
}

// interruptForPause is a special value used to interrupt the player's
// sleep due to the user pausing playback.
const interruptForPause = math.MaxInt64

func (p *Player) setPlaying(play bool) {
ch := <-p.playPause
alreadyPlaying := ch == nil

if alreadyPlaying && !play {
ch = make(chan struct{})

// try to wake up the player if it's waiting to emit an event
select {
case p.wake <- interruptForPause:
default:
}

} else if !alreadyPlaying && play {
// signal waiters who are paused that it's time to resume playing
close(ch)
Expand All @@ -304,20 +393,34 @@ func (p *Player) setPlaying(play bool) {
p.playPause <- ch
}

var errSeekWhilePaused = errors.New("player seeked during pause")

// waitWhilePaused blocks while the player is in a paused state.
// It returns immediately if the player is currently playing.
func (p *Player) waitWhilePaused() error {
ch := <-p.playPause
p.playPause <- ch

if alreadyPlaying := ch == nil; !alreadyPlaying {
select {
case <-p.done:
return errClosed
case <-ch:
seeked := false
for {
ch := <-p.playPause
p.playPause <- ch

if alreadyPlaying := ch == nil; !alreadyPlaying {
select {
case <-p.done:
return errClosed
case <-p.wake:
// seek while paused, this can happen an unlimited number of times,
// we just keep waiting until we're unpaused
seeked = true
continue
case <-ch:
// we have been unpaused
}
}
if seeked {
return errSeekWhilePaused
}
return nil
}
return nil
}

// LastPlayed returns the time of the last played event,
Expand Down
52 changes: 52 additions & 0 deletions lib/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func TestClose(t *testing.T) {

func TestSeekForward(t *testing.T) {
clk := clockwork.NewFakeClock()

p, err := player.New(&player.Config{
Clock: clk,
SessionID: "test-session",
Expand Down Expand Up @@ -210,6 +211,57 @@ func TestSeekForward(t *testing.T) {
}
}

func TestSeekForwardTwice(t *testing.T) {
clk := clockwork.NewRealClock()
p, err := player.New(&player.Config{
Clock: clk,
SessionID: "test-session",
Streamer: &simpleStreamer{count: 1, delay: 6000},
})
require.NoError(t, err)
t.Cleanup(func() { p.Close() })
require.NoError(t, p.Play())

time.Sleep(100 * time.Millisecond)
p.SetPos(500 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
p.SetPos(5900 * time.Millisecond)

select {
case <-p.C():
case <-time.After(5 * time.Second):
require.FailNow(t, "event not emitted on time")
}
}

// TestInterruptsDelay tests that the player responds to playback
// controls even when it is waiting to emit an event.
func TestInterruptsDelay(t *testing.T) {
clk := clockwork.NewFakeClock()
p, err := player.New(&player.Config{
Clock: clk,
SessionID: "test-session",
Streamer: &simpleStreamer{count: 3, delay: 5000},
})
require.NoError(t, err)
require.NoError(t, p.Play())

t.Cleanup(func() { p.Close() })

clk.BlockUntil(1) // player is now waiting to emit event 0

// emulate the user seeking forward while the player is waiting..
p.SetPos(10_001 * time.Millisecond)

// expect event 0 and event 1 to be emitted right away
// even without advancing the clock
evt0 := <-p.C()
evt1 := <-p.C()

require.Equal(t, int64(0), evt0.GetIndex())
require.Equal(t, int64(1), evt1.GetIndex())
}

func TestRewind(t *testing.T) {
clk := clockwork.NewFakeClock()
p, err := player.New(&player.Config{
Expand Down
2 changes: 2 additions & 0 deletions lib/web/tty_playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ func (h *Handler) ttyPlaybackHandle(
return
}

case *events.SessionLeave: // do nothing

default:
h.log.Debugf("unexpected event type %T", evt)
}
Expand Down
2 changes: 1 addition & 1 deletion web/packages/teleport/src/Player/DesktopPlayer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ export const DesktopPlayer = ({
onRestart={reload}
onStartMove={() => playerClient.suspendTimeUpdates()}
move={pos => {
playerClient.seekTo(pos);
playerClient.resumeTimeUpdates();
playerClient.seekTo(pos);
}}
onPlaySpeedChange={s => playerClient.setPlaySpeed(s)}
toggle={() => playerClient.togglePlayPause()}
Expand Down
2 changes: 1 addition & 1 deletion web/packages/teleport/src/Player/SshPlayer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ export default function Player({ sid, clusterId, durationMs }) {
onRestart={() => window.location.reload()}
onStartMove={() => tty.suspendTimeUpdates()}
move={pos => {
tty.move(pos);
tty.resumeTimeUpdates();
tty.move(pos);
}}
toggle={() => {
isPlaying ? tty.stop() : tty.play();
Expand Down
Loading