From 31988a9bd8386c277871d134f6c693d0b8786868 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Marczykowski-G=C3=B3recki?= Date: Mon, 19 Jun 2017 01:55:49 +0200 Subject: [PATCH] tags support QubesOS/qubes-issues#2622 --- qubesadmin/exc.py | 4 ++ qubesadmin/tags.py | 66 +++++++++++++++++++++++ qubesadmin/tests/tags.py | 110 ++++++++++++++++++++++++++++++++++++++ qubesadmin/vm/__init__.py | 4 ++ 4 files changed, 184 insertions(+) create mode 100644 qubesadmin/tags.py create mode 100644 qubesadmin/tests/tags.py diff --git a/qubesadmin/exc.py b/qubesadmin/exc.py index 9e60cc60..54fb2247 100644 --- a/qubesadmin/exc.py +++ b/qubesadmin/exc.py @@ -116,6 +116,10 @@ class QubesFeatureNotFoundError(QubesException, KeyError): '''Feature not set for a given domain''' +class QubesTagNotFoundError(QubesException, KeyError): + '''Tag not set for a given domain''' + + class StoragePoolException(QubesException): ''' A general storage exception ''' diff --git a/qubesadmin/tags.py b/qubesadmin/tags.py new file mode 100644 index 00000000..37c09e98 --- /dev/null +++ b/qubesadmin/tags.py @@ -0,0 +1,66 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +'''VM tags interface''' + + +class Tags(object): + '''Manager of the tags. + + Tags are simple: tag either can be present on qube or not. Tag is a + simple string consisting of ASCII alphanumeric characters, plus `_` and + `-`. + ''' + # pylint: disable=too-few-public-methods + + def __init__(self, vm): + super(Tags, self).__init__() + self.vm = vm + + def remove(self, elem): + '''Remove a tag''' + self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Remove', elem) + + def add(self, elem): + '''Add a tag''' + self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Set', elem) + + def update(self, *others): + '''Add tags from iterable(s)''' + for other in others: + for elem in other: + self.add(elem) + + def discard(self, elem): + '''Remove a tag if present''' + try: + self.remove(elem) + except KeyError: + pass + + def __iter__(self): + qubesd_response = self.vm.qubesd_call(self.vm.name, + 'admin.vm.tag.List') + return iter(qubesd_response.decode('utf-8').splitlines()) + + def __contains__(self, elem): + '''Does the VM have a tag''' + response = self.vm.qubesd_call(self.vm.name, 'admin.vm.tag.Get', elem) + return response == b'1' diff --git a/qubesadmin/tests/tags.py b/qubesadmin/tests/tags.py new file mode 100644 index 00000000..00893f99 --- /dev/null +++ b/qubesadmin/tests/tags.py @@ -0,0 +1,110 @@ +# -*- encoding: utf8 -*- +# +# The Qubes OS Project, http://www.qubes-os.org +# +# Copyright (C) 2017 Marek Marczykowski-Górecki +# +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, see . + +import qubesadmin.tests +import qubesadmin.tags + +class TC_00_Tags(qubesadmin.tests.QubesTestCase): + def setUp(self): + super(TC_00_Tags, self).setUp() + self.app.expected_calls[('dom0', 'admin.vm.List', None, None)] = \ + b'0\0test-vm class=AppVM state=Running\n' \ + b'test-vm2 class=AppVM state=Running\n' \ + b'test-vm3 class=AppVM state=Running\n' + self.vm = self.app.domains['test-vm'] + self.tags = qubesadmin.tags.Tags(self.vm) + + def test_000_list(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.List', None, None)] = \ + b'0\0tag1\ntag2\n' + self.assertEqual(sorted(self.tags), + ['tag1', 'tag2']) + self.assertAllCalled() + + def test_010_get(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x001' + self.assertIn('tag1', self.tags) + self.assertAllCalled() + + def test_011_get_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Get', 'tag1', None)] = \ + b'0\x000' + self.assertNotIn('tag1', self.tags) + self.assertAllCalled() + + def test_020_set(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.tags.add('tag1') + self.assertAllCalled() + + def test_030_update(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag1', None)] = b'0\0' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag2', None)] = b'0\0' + self.tags.update(['tag1', 'tag2']) + self.assertAllCalled() + + def test_031_update_from_other(self): + self.app.expected_calls[ + ('test-vm2', 'admin.vm.tag.List', None, None)] = \ + b'0\0tag3\ntag4\n' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag3', None)] = b'0\0' + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Set', 'tag4', None)] = b'0\0' + self.tags.update(self.app.domains['test-vm2'].tags) + self.assertAllCalled() + + def test_040_remove(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'0\0' + self.tags.remove('tag1') + self.assertAllCalled() + + def test_040_remove_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \ + b'tag1\0' + with self.assertRaises(KeyError): + self.tags.remove('tag1') + self.assertAllCalled() + + def test_050_discard(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'0\0' + self.tags.discard('tag1') + self.assertAllCalled() + + def test_051_discard_missing(self): + self.app.expected_calls[ + ('test-vm', 'admin.vm.tag.Remove', 'tag1', None)] = \ + b'2\0QubesTagNotFoundError\0\0Tag not set for domain test-vm: ' \ + b'tag1\0' + self.tags.discard('tag1') + self.assertAllCalled() diff --git a/qubesadmin/vm/__init__.py b/qubesadmin/vm/__init__.py index 26f95546..605bdae2 100644 --- a/qubesadmin/vm/__init__.py +++ b/qubesadmin/vm/__init__.py @@ -27,6 +27,7 @@ import qubesadmin.features import qubesadmin.devices import qubesadmin.firewall +import qubesadmin.tags class QubesVM(qubesadmin.base.PropertyHolder): @@ -34,6 +35,8 @@ class QubesVM(qubesadmin.base.PropertyHolder): log = None + tags = None + features = None devices = None @@ -44,6 +47,7 @@ def __init__(self, app, name): super(QubesVM, self).__init__(app, 'admin.vm.property.', name) self._volumes = None self.log = logging.getLogger(name) + self.tags = qubesadmin.tags.Tags(self) self.features = qubesadmin.features.Features(self) self.devices = qubesadmin.devices.DeviceManager(self) self.firewall = qubesadmin.firewall.Firewall(self)