diff --git a/qubes/api/__init__.py b/qubes/api/__init__.py index 700eca39b..138442551 100644 --- a/qubes/api/__init__.py +++ b/qubes/api/__init__.py @@ -24,10 +24,13 @@ import functools import io import os +import re import shutil import socket import struct import traceback +from typing import Union +import uuid import qubes.exc @@ -94,6 +97,33 @@ def apply_filters(iterable, filters): iterable = filter(selector, iterable) return iterable +# pylint: disable=line-too-long +_uuid_regex = re.compile(rb"\A@uuid:[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\Z") +_qube_name_regex = re.compile(rb"\A[A-Za-z][A-Za-z0-9_.-]*\Z") +def decode_vm( + untrusted_input: bytes, domains: qubes.app.VMCollection +) -> qubes.vm.qubesvm.QubesVM: + lookup: Union[uuid.UUID, str] + vm = untrusted_input.decode("ascii", "strict") + if untrusted_input.startswith(b"@uuid:"): + if (len(untrusted_input) != 42 or + not _uuid_regex.match(untrusted_input)): + raise qubes.exc.QubesVMInvalidUUIDError(vm[6:]) + lookup = uuid.UUID(vm[6:]) + else: + if (len(untrusted_input) > 31 or + not _qube_name_regex.match(untrusted_input)): + raise qubes.exc.QubesVMInvalidNameError(vm) + lookup = vm + try: + return domains[lookup] + except KeyError: + # normally this should filtered out by qrexec policy, but there are + # two cases it might not be: + # 1. The call comes from dom0, which bypasses qrexec policy + # 2. Domain was removed between checking the policy and here + # we inform the client accordingly + raise qubes.exc.QubesVMNotFoundError(vm) class AbstractQubesAPI: '''Common code for Qubes Management Protocol handling @@ -114,25 +144,17 @@ class AbstractQubesAPI: #: the preferred socket location (to be overridden in child's class) SOCKNAME = None - def __init__(self, app, src, method_name, dest, arg, send_event=None): + app: qubes.Qubes + src: qubes.vm.qubesvm.QubesVM + def __init__(self, app: qubes.Qubes, src: bytes, method_name, dest: bytes, arg, send_event=None): #: :py:class:`qubes.Qubes` object self.app = app - try: - vm = src.decode('ascii') - #: source qube - self.src = self.app.domains[vm] - - vm = dest.decode('ascii') - #: destination qube - self.dest = self.app.domains[vm] - except KeyError: - # normally this should filtered out by qrexec policy, but there are - # two cases it might not be: - # 1. The call comes from dom0, which bypasses qrexec policy - # 2. Domain was removed between checking the policy and here - # we inform the client accordingly - raise qubes.exc.QubesVMNotFoundError(vm) + #: source qube + self.src = decode_vm(src, app.domains) + + #: destination qube + self.dest = decode_vm(dest, app.domains) #: argument self.arg = arg.decode('ascii') diff --git a/qubes/api/admin.py b/qubes/api/admin.py index 6c612c308..77c6bc4a4 100644 --- a/qubes/api/admin.py +++ b/qubes/api/admin.py @@ -119,10 +119,11 @@ async def vm_list(self): else: domains = self.fire_event_for_filter([self.dest]) - return ''.join('{} class={} state={}\n'.format( + return ''.join('{} class={} state={} uuid={}\n'.format( vm.name, vm.__class__.__name__, - vm.get_power_state()) + vm.get_power_state(), + vm.uuid) for vm in sorted(domains)) @qubes.api.method('admin.vm.property.List', no_payload=True, @@ -1134,10 +1135,14 @@ async def _vm_create(self, vm_type, allow_pool=False, raise self.app.save() - @qubes.api.method('admin.vm.CreateDisposable', no_payload=True, + @qubes.api.method('admin.vm.CreateDisposable', scope='global', write=True) - async def create_disposable(self): + async def create_disposable(self, untrusted_payload): self.enforce(not self.arg) + if untrusted_payload not in (b"", b"uuid"): + raise qubes.exc.QubesValueError( + "Invalid payload for admin.vm.CreateDisposable: " + "expected the empty string or 'uuid'") if self.dest.name == 'dom0': dispvm_template = self.src.default_dispvm @@ -1150,7 +1155,7 @@ async def create_disposable(self): # TODO: move this to extension (in race-free fashion, better than here) dispvm.tags.add('disp-created-by-' + str(self.src)) - return dispvm.name + return str(dispvm.uuid) if untrusted_payload else dispvm.name @qubes.api.method('admin.vm.Remove', no_payload=True, scope='global', write=True) diff --git a/qubes/api/internal.py b/qubes/api/internal.py index 3812764d1..587a61b0c 100644 --- a/qubes/api/internal.py +++ b/qubes/api/internal.py @@ -43,6 +43,7 @@ def get_system_info(app): 'guivm': (domain.guivm.name if getattr(domain, 'guivm', None) else None), 'power_state': domain.get_power_state(), + 'uuid': str(domain.uuid), } for domain in app.domains }} return system_info diff --git a/qubes/exc.py b/qubes/exc.py index 5139a5724..0cb3afef7 100644 --- a/qubes/exc.py +++ b/qubes/exc.py @@ -37,6 +37,23 @@ def __str__(self): # KeyError overrides __str__ method return QubesException.__str__(self) +class QubesVMInvalidNameError(QubesVMNotFoundError): + """Domain name is invalid""" + # pylint: disable = super-init-not-called + def __init__(self, vmname: str) -> None: + # QubesVMNotFoundError overrides __init__ method + # pylint: disable = non-parent-init-called + QubesException.__init__(self, f"VM name is not valid: {vmname!r}") + self.vmname = vmname + +class QubesVMInvalidUUIDError(QubesException): + """Domain UUID is invalid""" + # pylint: disable = super-init-not-called + def __init__(self, uuid: str) -> None: + # QubesVMNotFoundError overrides __init__ method + # pylint: disable = non-parent-init-called + QubesException.__init__(self, f"VM UUID is not valid: {uuid!r}") + self.vmname = uuid class QubesVMError(QubesException): '''Some problem with domain state.''' diff --git a/qubes/tests/api_admin.py b/qubes/tests/api_admin.py index e6f6fd8fb..dab5cc0f2 100644 --- a/qubes/tests/api_admin.py +++ b/qubes/tests/api_admin.py @@ -77,6 +77,7 @@ def setUp(self): app.save = unittest.mock.Mock() self.vm = app.add_new_vm('AppVM', label='red', name='test-vm1', template='test-template') + self.uuid = b"@uuid:" + str(self.vm.uuid).encode("ascii", "strict") self.app = app libvirt_attrs = { 'libvirt_conn.lookupByUUID.return_value.isActive.return_value': @@ -123,19 +124,35 @@ def call_internal_mgmt_func(self, method, dest, arg=b'', payload=b''): mgmt_obj.execute(untrusted_payload=payload)) return response - class TC_00_VMs(AdminAPITestCase): def test_000_vm_list(self): value = self.call_mgmt_func(b'admin.vm.List', b'dom0') self.assertEqual(value, - 'dom0 class=AdminVM state=Running\n' - 'test-template class=TemplateVM state=Halted\n' - 'test-vm1 class=AppVM state=Halted\n') + 'dom0 class=AdminVM state=Running uuid=00000000-0000-0000-0000-000000000000\n' + f'test-template class=TemplateVM state=Halted uuid={self.template.uuid}\n' + f'test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n') def test_001_vm_list_single(self): value = self.call_mgmt_func(b'admin.vm.List', b'test-vm1') self.assertEqual(value, - 'test-vm1 class=AppVM state=Halted\n') + f"test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n") + + def test_001_vm_list_single_uuid(self): + value = self.call_mgmt_func(b'admin.vm.List', self.uuid) + self.assertEqual(value, + f"test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n") + + def test_001_vm_list_single_invalid_name(self): + with self.assertRaisesRegex(qubes.exc.QubesVMInvalidNameError, + r"\AVM name is not valid: '.test-vm1'\Z"): + self.call_mgmt_func(b'admin.vm.CreateDisposable', b'.test-vm1') + self.assertFalse(self.app.save.called) + + def test_001_vm_list_single_invalid_uuid(self): + with self.assertRaisesRegex(qubes.exc.QubesVMInvalidUUIDError, + r"\AVM UUID is not valid: ''\Z"): + self.call_mgmt_func(b'admin.vm.CreateDisposable', b"@uuid:") + self.assertFalse(self.app.save.called) def test_002_vm_list_filter(self): with tempfile.TemporaryDirectory() as tmpdir: @@ -152,8 +169,8 @@ def test_002_vm_list_filter(self): value = loop.run_until_complete( mgmt_obj.execute(untrusted_payload=b'')) self.assertEqual(value, - 'dom0 class=AdminVM state=Running\n' - 'test-vm1 class=AppVM state=Halted\n') + 'dom0 class=AdminVM state=Running uuid=00000000-0000-0000-0000-000000000000\n' + f"test-vm1 class=AppVM state=Halted uuid={self.vm.uuid}\n") def test_010_vm_property_list(self): # this test is kind of stupid, but at least check if appropriate diff --git a/qubes/tests/api_internal.py b/qubes/tests/api_internal.py index 653f20fdb..cf06f4e29 100644 --- a/qubes/tests/api_internal.py +++ b/qubes/tests/api_internal.py @@ -23,6 +23,7 @@ import qubes.vm.adminvm from unittest import mock import json +import uuid def mock_coro(f): async def coro_f(*args, **kwargs): @@ -150,6 +151,7 @@ def test_010_get_system_info(self): self.dom0.template_for_dispvms = False self.dom0.label.icon = 'icon-dom0' self.dom0.get_power_state.return_value = 'Running' + self.dom0.uuid = uuid.UUID("00000000-0000-0000-0000-000000000000") del self.dom0.guivm vm = mock.NonCallableMock(spec=qubes.vm.qubesvm.QubesVM) @@ -160,6 +162,7 @@ def test_010_get_system_info(self): vm.label.icon = 'icon-vm' vm.guivm = vm vm.get_power_state.return_value = 'Halted' + vm.uuid = uuid.uuid4() self.domains['vm'] = vm ret = json.loads(self.call_mgmt_func(b'internal.GetSystemInfo')) @@ -173,6 +176,7 @@ def test_010_get_system_info(self): 'icon': 'icon-dom0', 'guivm': None, 'power_state': 'Running', + 'uuid': "00000000-0000-0000-0000-000000000000", }, 'vm': { 'tags': ['tag3', 'tag4'], @@ -182,6 +186,7 @@ def test_010_get_system_info(self): 'icon': 'icon-vm', 'guivm': 'vm', 'power_state': 'Halted', + "uuid": str(vm.uuid) } } })