Skip to content

Commit

Permalink
Dynamic audio buf
Browse files Browse the repository at this point in the history
* Ugly audio buf

* Use dynamic Opus frames with config
  • Loading branch information
sergystepanov authored Dec 12, 2024
1 parent f54089e commit ed3b195
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 154 deletions.
5 changes: 5 additions & 0 deletions pkg/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,12 @@ encoder:
# audio frame duration needed for WebRTC (Opus)
# most of the emulators have ~1400 samples per a video frame,
# so we keep the frame buffer roughly half of that size or 2 RTC packets per frame
# (deprecated) due to frames
frame: 10
# dynamic frames for Opus encoder
frames:
- 10
- 5
video:
# h264, vpx (vp8) or vp9
codec: h264
Expand Down
13 changes: 9 additions & 4 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
func TestConfigEnv(t *testing.T) {
var out WorkerConfig

_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAME", "33")
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAME") }()
_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]", "10")
_ = os.Setenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]", "5")
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[0]") }()
defer func() { _ = os.Unsetenv("CLOUD_GAME_ENCODER_AUDIO_FRAMES[1]") }()

_ = os.Setenv("CLOUD_GAME_EMULATOR_LIBRETRO_CORES_LIST_PCSX_OPTIONS__PCSX_REARMED_DRC", "x")
defer func() {
Expand All @@ -22,8 +24,11 @@ func TestConfigEnv(t *testing.T) {
t.Fatal(err)
}

if out.Encoder.Audio.Frame != 33 {
t.Errorf("%v is not 33", out.Encoder.Audio.Frame)
for i, x := range []float32{10, 5} {
if out.Encoder.Audio.Frames[i] != x {
t.Errorf("%v is not [10, 5]", out.Encoder.Audio.Frames)
t.Failed()
}
}

v := out.Emulator.Libretro.Cores.List["pcsx"].Options["pcsx_rearmed_drc"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Encoder struct {
}

type Audio struct {
Frame float32
Frames []float32
}

type Video struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/worker/coordinatorhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke
}

m.AudioSrcHz = app.AudioSampleRate()
m.AudioFrame = w.conf.Encoder.Audio.Frame
m.AudioFrames = w.conf.Encoder.Audio.Frames
m.VideoW, m.VideoH = app.ViewportSize()
m.VideoScale = app.Scale()

Expand Down
119 changes: 119 additions & 0 deletions pkg/worker/media/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package media

import (
"errors"
"math"
"unsafe"
)

// buffer is a simple non-concurrent safe buffer for audio samples.
type buffer struct {
stretch bool
frameHz []int

raw samples
buckets []Bucket
cur *Bucket
}

type Bucket struct {
mem samples
ms float32
lv int
dst int
}

func newBuffer(frames []float32, hz int) (*buffer, error) {
if hz < 2000 {
return nil, errors.New("hz should be > than 2000")
}

buf := buffer{}

// preallocate continuous array
s := 0
for _, f := range frames {
s += frame(hz, f)
}
buf.raw = make(samples, s)

next := 0
for _, f := range frames {
s := frame(hz, f)
buf.buckets = append(buf.buckets, Bucket{
mem: buf.raw[next : next+s],
ms: f,
})
next += s
}
buf.cur = &buf.buckets[len(buf.buckets)-1]
return &buf, nil
}

func (b *buffer) choose(l int) {
for _, bb := range b.buckets {
if l >= len(bb.mem) {
b.cur = &bb
break
}
}
}

func (b *buffer) resample(hz int) {
b.stretch = true
for i := range b.buckets {
b.buckets[i].dst = frame(hz, float32(b.buckets[i].ms))
}
}

// write fills the buffer until it's full and then passes the gathered data into a callback.
//
// There are two cases to consider:
// 1. Underflow, when the length of the written data is less than the buffer's available space.
// 2. Overflow, when the length exceeds the current available buffer space.
//
// We overwrite any previous values in the buffer and move the internal write pointer
// by the length of the written data.
// In the first case, we won't call the callback, but it will be called every time
// when the internal buffer overflows until all samples are read.
func (b *buffer) write(s samples, onFull func(samples, float32)) (r int) {
for r < len(s) {
buf := b.cur
w := copy(buf.mem[buf.lv:], s[r:])
r += w
buf.lv += w
if buf.lv == len(buf.mem) {
if b.stretch {
onFull(buf.mem.stretch(buf.dst), buf.ms)
} else {
onFull(buf.mem, buf.ms)
}
b.choose(len(s) - r)
b.cur.lv = 0
}
}
return
}

// frame calculates an audio stereo frame size, i.e. 48k*frame/1000*2
// with round(x / 2) * 2 for the closest even number
func frame(hz int, frame float32) int {
return int(math.Round(float64(hz)*float64(frame)/1000/2) * 2 * 2)
}

// stretch does a simple stretching of audio samples.
// something like: [1,2,3,4,5,6] -> [1,2,x,x,3,4,x,x,5,6,x,x] -> [1,2,1,2,3,4,3,4,5,6,5,6]
func (s samples) stretch(size int) []int16 {
out := buf[:size]
n := len(s)
ratio := float32(size) / float32(n)
sPtr := unsafe.Pointer(&s[0])
for i, l, r := 0, 0, 0; i < n; i += 2 {
l, r = r, int(float32((i+2)>>1)*ratio)<<1 // index in src * ratio -> approximated index in dst *2 due to int16
for j := l; j < r; j += 2 {
*(*int32)(unsafe.Pointer(&out[j])) = *(*int32)(sPtr) // out[j] = s[i]; out[j+1] = s[i+1]
}
sPtr = unsafe.Add(sPtr, uintptr(4))
}
return out
}
77 changes: 77 additions & 0 deletions pkg/worker/media/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package media

import (
"reflect"
"testing"
)

type bufWrite struct {
sample int16
len int
}

func TestBufferWrite(t *testing.T) {
tests := []struct {
bufLen int
writes []bufWrite
expect samples
}{
{
bufLen: 2000,
writes: []bufWrite{
{sample: 1, len: 10},
{sample: 2, len: 20},
{sample: 3, len: 30},
},
expect: samples{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3},
},
{
bufLen: 2000,
writes: []bufWrite{
{sample: 1, len: 3},
{sample: 2, len: 18},
{sample: 3, len: 2},
},
expect: samples{2, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2},
},
}

for _, test := range tests {
var lastResult samples
buf, err := newBuffer([]float32{10, 5}, test.bufLen)
if err != nil {
t.Fatalf("oof, %v", err)
}
for _, w := range test.writes {
buf.write(samplesOf(w.sample, w.len),
func(s samples, ms float32) { lastResult = s },
)
}
if !reflect.DeepEqual(test.expect, lastResult) {
t.Errorf("not expted buffer, %v != %v, %v", lastResult, test.expect, len(buf.cur.mem))
}
}
}

func BenchmarkBufferWrite(b *testing.B) {
fn := func(_ samples, _ float32) {}
l := 2000
buf, err := newBuffer([]float32{10}, l)
if err != nil {
b.Fatalf("oof: %v", err)
}
samples1 := samplesOf(1, l/2)
samples2 := samplesOf(2, l*2)
for i := 0; i < b.N; i++ {
buf.write(samples1, fn)
buf.write(samples2, fn)
}
}

func samplesOf(v int16, len int) (s samples) {
s = make(samples, len)
for i := range s {
s[i] = v
}
return
}
Loading

0 comments on commit ed3b195

Please sign in to comment.