Skip to content

Commit

Permalink
tests/integ/audio.py: add test for audio-intput in QubesDB
Browse files Browse the repository at this point in the history
  • Loading branch information
fepitre committed Jul 20, 2024
1 parent 77aa7ce commit 226805d
Showing 1 changed file with 143 additions and 37 deletions.
180 changes: 143 additions & 37 deletions qubes/tests/integ/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import numpy as np

import qubes.vm
import qubes.devices
from qubes.tests.integ.vm_qrexec_gui import TC_00_AppVMMixin, in_qemu


Expand Down Expand Up @@ -142,7 +143,7 @@ def assert_pacat_running(self, audiovm, testvm, expected=True):
def check_audio_sample(self, sample, sfreq):
rec = np.fromstring(sample, dtype=np.float32)
# determine sample size using silence threshold
threshold = 10**-3
threshold = 10 ** -3
rec_size = np.count_nonzero((rec > threshold) | (rec < -threshold))
if not rec_size:
self.fail('only silence detected, no useful audio data')
Expand All @@ -151,34 +152,34 @@ def check_audio_sample(self, sample, sfreq):
# be less strict on HVM tests in nested virt, the test environment
# has huge overhead already
margin = 0.80
if rec_size < margin*441000:
if rec_size < margin * 441000:
fname = f"/tmp/audio-sample-{self.id()}.raw"
with open(fname, "wb") as f:
f.write(sample)
self.fail(f'too short audio, expected 10s, got {rec_size/44100}, saved to {fname}')
self.fail(f'too short audio, expected 10s, got {rec_size / 44100}, saved to {fname}')
# find zero crossings
crossings = np.nonzero((rec[1:] > threshold) &
(rec[:-1] < -threshold))[0]
(rec[:-1] < -threshold))[0]
np.seterr('raise')
# compare against sine wave frequency
rec_freq = 44100/np.mean(np.diff(crossings))
if not sfreq*0.8 < rec_freq < sfreq*1.2:
rec_freq = 44100 / np.mean(np.diff(crossings))
if not sfreq * 0.8 < rec_freq < sfreq * 1.2:
fname = f"/tmp/audio-sample-{self.id()}.raw"
with open(fname, "wb") as f:
f.write(sample)
self.fail('frequency {} not in specified range, saved to {}'
.format(rec_freq, fname))
.format(rec_freq, fname))

def common_audio_playback(self):
# sine frequency
sfreq = 4400
# generate signal
audio_in = np.sin(2*np.pi*np.arange(441000)*sfreq/44100)
audio_in = np.sin(2 * np.pi * np.arange(441000) * sfreq / 44100)
# Need to use .snd extension so that pw-play (really libsndfile)
# recognizes the file as raw audio.
self.loop.run_until_complete(
self.testvm1.run_for_stdio('cat > audio_in.snd',
input=audio_in.astype(np.float32).tobytes()))
input=audio_in.astype(np.float32).tobytes()))
local_user = grp.getgrnam('qubes').gr_mem[0]
if self.testvm1.features['service.pipewire']:
cmd = 'timeout 20s pw-play --format=f32 --rate=44100 --channels=1 - < audio_in.snd'
Expand All @@ -188,9 +189,10 @@ def common_audio_playback(self):
with tempfile.NamedTemporaryFile() as recorded_audio:
os.chmod(recorded_audio.name, 0o666)
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
'parecord', '-d', '@DEFAULT_MONITOR@', '--raw',
'--format=float32le', '--rate=44100', '--channels=1',
recorded_audio.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
'parecord', '-d', '@DEFAULT_MONITOR@', '--raw',
'--format=float32le', '--rate=44100', '--channels=1',
recorded_audio.name], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
self.loop.run_until_complete(self.testvm1.run_for_stdio(cmd))
except subprocess.CalledProcessError as err:
Expand All @@ -199,17 +201,29 @@ def common_audio_playback(self):
self.loop.run_until_complete(asyncio.sleep(2))
if p.returncode is not None:
self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format(
p.returncode, p.stderr.read()))
p.returncode, p.stderr.read()))
p.send_signal(signal.SIGINT)
p.wait()
self.check_audio_sample(recorded_audio.file.read(), sfreq)

def _configure_audio_recording(self, vm):
"""Connect VM's output-source to sink monitor instead of mic"""
"""Connect VM's source-output to sink monitor instead of mic"""
local_user = grp.getgrnam("qubes").gr_mem[0]
audiovm = vm.audiovm

sudo = ["sudo", "-E", "-u", local_user]
source_outputs = json.loads(subprocess.check_output(
sudo + ["pactl", "-f", "json", "list", "source-outputs"]))

source_outputs_cmd = ["pactl", "-f", "json", "list", "source-outputs"]
if audiovm.name != "dom0":
stdout, _ = self.loop.run_until_complete(
audiovm.run_for_stdio(" ".join(source_outputs_cmd)))
source_outputs = json.loads(stdout)
else:
source_outputs = json.loads(subprocess.check_output(sudo + source_outputs_cmd))

if not source_outputs:
self.fail("no source-output found in {}".format(audiovm.name))
assert False

try:
output_index = [s["index"] for s in source_outputs
Expand All @@ -220,8 +234,17 @@ def _configure_audio_recording(self, vm):
# self.fail never returns
assert False

sources = json.loads(subprocess.check_output(
sudo + ["pactl", "-f", "json", "list", "sources"]))
sources_cmd = ["pactl", "-f", "json", "list", "sources"]
if audiovm.name != "dom0":
res, _ = self.loop.run_until_complete(audiovm.run_for_stdio(" ".join(sources_cmd)))
sources = json.loads(res)
else:
sources = json.loads(subprocess.check_output(sudo + sources_cmd))

if not sources:
self.fail("no sources found in {}".format(audiovm.name))
assert False

try:
source_index = [s["index"] for s in sources
if s["name"].endswith(".monitor")][0]
Expand All @@ -230,8 +253,36 @@ def _configure_audio_recording(self, vm):
# self.fail never returns
assert False

subprocess.check_call(sudo +
["pactl", "move-source-output", str(output_index), str(source_index)])
cmd = ["pactl", "move-source-output", str(output_index), str(source_index)]
if audiovm.name != "dom0":
self.loop.run_until_complete(audiovm.run(" ".join(cmd)))
else:
subprocess.check_call(sudo + cmd)

async def retrieve_audio_input(self, vm, status):
try:
await asyncio.wait_for(self._check_audio_input_status(vm, status), timeout=2)
except asyncio.TimeoutError:
self.fail("Failed to get mic attach/detach status!")

@staticmethod
async def _check_audio_input_status(vm, status):
while vm.audiovm.untrusted_qdb.read("/audio-input/{}".format(vm.name)) != status:
await asyncio.sleep(0.5)

def attach_mic(self):
deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic')
self.loop.run_until_complete(
self.testvm1.devices['mic'].attach(deva)
)
self.loop.run_until_complete(self.retrieve_audio_input(self.testvm1, b"1"))

def detach_mic(self):
deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic')
self.loop.run_until_complete(
self.testvm1.devices['mic'].detach(deva)
)
self.loop.run_until_complete(self.retrieve_audio_input(self.testvm1, b"0"))

def common_audio_record_muted(self):
# connect VM's recording source output monitor (instead of mic)
Expand All @@ -240,6 +291,7 @@ def common_audio_record_muted(self):
# generate some "audio" data
audio_in = b'\x20' * 4 * 44100
local_user = grp.getgrnam('qubes').gr_mem[0]
sudo = ["sudo", "-E", "-u", local_user]
# Need to use .snd extension so that pw-play (really libsndfile)
# recognizes the file as raw audio.
if self.testvm1.features['service.pipewire']:
Expand All @@ -249,18 +301,28 @@ def common_audio_record_muted(self):
cmd = 'parecord --raw audio_rec.snd'
kill_cmd = 'pkill --signal SIGINT parecord'
record = self.loop.run_until_complete(self.testvm1.run(cmd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE))
stdout=subprocess.PIPE,
stderr=subprocess.PIPE))
# give it time to start recording
self.loop.run_until_complete(asyncio.sleep(0.5))
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
'paplay', '--raw'],
stdin=subprocess.PIPE)
p.communicate(audio_in)

play_cmd = ['paplay', '--raw']
if self.testvm1.audiovm.name != "dom0":
self.loop.run_until_complete(
self.testvm1.audiovm.run_for_stdio(
" ".join(play_cmd),
input=audio_in
)
)
else:
p = subprocess.Popen(sudo + play_cmd, stdin=subprocess.PIPE)
p.communicate(audio_in)

# wait for possible parecord buffering
self.loop.run_until_complete(asyncio.sleep(2))
if record.returncode is not None:
self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format(
record.returncode, self.loop.run_until_complete(record.stderr.read())))
record.returncode, self.loop.run_until_complete(record.stderr.read())))
try:
self.loop.run_until_complete(
self.testvm1.run_for_stdio(kill_cmd))
Expand All @@ -273,15 +335,20 @@ def common_audio_record_muted(self):
if audio_in[:32] in recorded_audio:
self.fail('VM recorded something, even though mic disabled')

def common_audio_record_unmuted(self):
deva = qubes.device_protocol.DeviceAssignment(self.app.domains[0], 'mic')
self.loop.run_until_complete(
self.testvm1.devices['mic'].attach(deva))
def common_audio_record_unmuted(self, attach_mic=True, detach_mic=True):
if attach_mic:
try:
self.detach_mic()
except qubes.devices.DeviceNotAssigned:
pass
self.attach_mic()
# connect VM's recording source output monitor (instead of mic)
self._configure_audio_recording(self.testvm1)
sfreq = 4400
audio_in = np.sin(2*np.pi*np.arange(441000)*sfreq/44100)
audio_in = np.sin(2 * np.pi * np.arange(441000) * sfreq / 44100)
local_user = grp.getgrnam('qubes').gr_mem[0]
sudo = ["sudo", "-E", "-u", local_user]

# Need to use .snd extension so that pw-play (really libsndfile)
# recognizes the file as raw audio.
if self.testvm1.features['service.pipewire']:
Expand All @@ -295,16 +362,28 @@ def common_audio_record_unmuted(self):
record = self.loop.run_until_complete(self.testvm1.run(record_cmd))
# give it time to start recording
self.loop.run_until_complete(asyncio.sleep(0.5))
p = subprocess.Popen(['sudo', '-E', '-u', local_user,
'paplay', '--raw', '--format=float32le',
'--rate=44100', '--channels=1'],
stdin=subprocess.PIPE)
p.communicate(audio_in.astype(np.float32).tobytes())

# play sound that will be used as source-output
play_cmd = ['paplay', '--raw', '--format=float32le', '--rate=44100', '--channels=1']
if self.testvm1.audiovm.name != "dom0":
self.loop.run_until_complete(
self.testvm1.audiovm.run_for_stdio(
" ".join(play_cmd),
input=audio_in.astype(np.float32).tobytes()
)
)
else:
p = subprocess.Popen(
sudo + play_cmd,
stdin=subprocess.PIPE
)
p.communicate(audio_in.astype(np.float32).tobytes())

# wait for possible parecord buffering
self.loop.run_until_complete(asyncio.sleep(2))
if record.returncode is not None:
self.fail("Recording process ended prematurely: exit code {}, stderr: {}".format(
record.returncode, self.loop.run_until_complete(record.stderr.read())))
record.returncode, self.loop.run_until_complete(record.stderr.read())))
try:
self.loop.run_until_complete(self.testvm1.run_for_stdio(kill_cmd))
except subprocess.CalledProcessError:
Expand All @@ -317,6 +396,8 @@ def common_audio_record_unmuted(self):
recorded_audio, _ = self.loop.run_until_complete(
self.testvm1.run_for_stdio('cat audio_rec.snd'))
self.check_audio_sample(recorded_audio, sfreq)
if detach_mic:
self.detach_mic()


class TC_20_AudioVM_Pulse(TC_00_AudioMixin):
Expand Down Expand Up @@ -459,6 +540,31 @@ def test_251_audio_playback_audiovm_pipewire_late_start(self):
self.assert_pacat_running(self.app.domains[0], self.testvm1, False)
self.common_audio_playback()

@unittest.skipUnless(spawn.find_executable('parecord'),
"pulseaudio-utils not installed in dom0")
def test_260_audio_mic_enabled_switch_audiovm(self):
self.create_audio_vm('pipewire', start=False)
self.testvm1.audiovm = self.audiovm
self.prepare_audio_test('pipewire')
self.loop.run_until_complete(self.audiovm.start())

# check mic is enabled in first audiovm
self.assert_pacat_running(self.audiovm, self.testvm1, True)
self.common_audio_record_unmuted(detach_mic=False)

# check mic is enabled in second audiovm, admin ext will
# allow mic during switch as it was previously enabled
self.testvm1.audiovm = self.app.domains[0]
self.assert_pacat_running(self.testvm1.audiovm, self.testvm1, True)
self.common_audio_record_unmuted(attach_mic=False, detach_mic=False)

# detach mic, switch to original audiovm and check there
# is no sound as we disabled mic
self.detach_mic()
self.testvm1.audiovm = self.audiovm
self.assert_pacat_running(self.audiovm, self.testvm1, True)
self.common_audio_record_muted()


def create_testcases_for_templates():
yield from qubes.tests.create_testcases_for_templates(
Expand Down

0 comments on commit 226805d

Please sign in to comment.