diff --git a/gvm/protocols/gmp/requests/_resource_names.py b/gvm/protocols/gmp/requests/_resource_names.py index cd9d20cab..54960a12a 100644 --- a/gvm/protocols/gmp/requests/_resource_names.py +++ b/gvm/protocols/gmp/requests/_resource_names.py @@ -42,8 +42,9 @@ class ResourceType(Enum): class ResourceNames: - @staticmethod + @classmethod def get_resource_names( + cls, resource_type: Union[ResourceType, str], *, filter_string: Optional[str] = None, @@ -61,6 +62,12 @@ def get_resource_names( """ cmd = XmlCommand("get_resource_names") + if not resource_type: + raise RequiredArgument( + function=cls.get_resource_names.__name__, + argument="resource_type", + ) + if not isinstance(resource_type, ResourceType): resource_type = ResourceType(resource_type) diff --git a/tests/protocols/gmp/requests/test_auth.py b/tests/protocols/gmp/requests/test_auth.py new file mode 100644 index 000000000..f28e38b5d --- /dev/null +++ b/tests/protocols/gmp/requests/test_auth.py @@ -0,0 +1,92 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest + +from gvm.errors import RequiredArgument +from gvm.protocols.core import Request +from gvm.protocols.gmp.requests import Authentication + + +class AuthenticationTestCase(unittest.TestCase): + def test_authenticate(self) -> None: + request = Authentication.authenticate("admin", "admin") + + self.assertIsInstance(request, Request) + self.assertEqual( + bytes(request), + b"admin" + b"admin", + ) + + def test_authenticate_missing_username(self) -> None: + with self.assertRaises(RequiredArgument): + Authentication.authenticate(None, "foo") # type: ignore + + with self.assertRaises(RequiredArgument): + Authentication.authenticate("", "foo") + + def test_authenticate_missing_password(self) -> None: + with self.assertRaises(RequiredArgument): + Authentication.authenticate("bar", None) # type: ignore + + with self.assertRaises(RequiredArgument): + Authentication.authenticate("bar", "") + + def test_describe_auth(self) -> None: + request = Authentication.describe_auth() + + self.assertIsInstance(request, Request) + self.assertEqual(bytes(request), b"") + + def test_modify_auth(self) -> None: + request = Authentication.modify_auth( + "foo", dict([("foo", "bar"), ("lorem", "ipsum")]) + ) + + self.assertIsInstance(request, Request) + self.assertEqual( + bytes(request), + b"" + b'' + b"" + b"foo" + b"bar" + b"" + b"" + b"lorem" + b"ipsum" + b"" + b"" + b"", + ) + + def test_modify_auth_missing_group_name(self) -> None: + with self.assertRaises(RequiredArgument): + Authentication.modify_auth( + group_name=None, auth_conf_settings={"foo": "bar"} # type: ignore + ) + + with self.assertRaises(RequiredArgument): + Authentication.modify_auth( + group_name="", auth_conf_settings={"foo": "bar"} + ) + + with self.assertRaises(RequiredArgument): + Authentication.modify_auth("", auth_conf_settings={"foo": "bar"}) + + def test_modify_auth_auth_conf_settings(self) -> None: + with self.assertRaises(RequiredArgument): + Authentication.modify_auth( + group_name="foo", auth_conf_settings=None # type: ignore + ) + + with self.assertRaises(RequiredArgument): + Authentication.modify_auth(group_name="foo", auth_conf_settings="") # type: ignore + + with self.assertRaises(RequiredArgument): + Authentication.modify_auth("foo", "") # type: ignore + + with self.assertRaises(RequiredArgument): + Authentication.modify_auth("foo", {}) diff --git a/tests/protocols/gmp/requests/test_port_lists.py b/tests/protocols/gmp/requests/test_port_lists.py new file mode 100644 index 000000000..06b72a58f --- /dev/null +++ b/tests/protocols/gmp/requests/test_port_lists.py @@ -0,0 +1,338 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.gmp.requests import PortList, PortRangeType + + +class PortListsTestCase(unittest.TestCase): + def test_clone(self) -> None: + request = PortList.clone_port_list("a1") + + self.assertEqual( + bytes(request), + b"a1", + ) + + def test_clone_missing_id(self): + with self.assertRaises(RequiredArgument): + PortList.clone_port_list("") + + with self.assertRaises(RequiredArgument): + PortList.clone_port_list(None) + + def test_create_port_list_missing_name(self): + with self.assertRaises(RequiredArgument): + PortList.create_port_list(name=None, port_range="T:1-1234") + + with self.assertRaises(RequiredArgument): + PortList.create_port_list(name="", port_range="T:1-1234") + + def test_create_port_list_missing_port_range(self): + with self.assertRaises(RequiredArgument): + PortList.create_port_list(name="foo", port_range=None) + + with self.assertRaises(RequiredArgument): + PortList.create_port_list(name="foo", port_range="") + + def test_create_port_list(self): + request = PortList.create_port_list(name="foo", port_range="T:1-1234") + + self.assertEqual( + bytes(request), + b"" + b"foo" + b"T:1-1234" + b"", + ) + + def test_create_port_list_with_comment(self): + request = PortList.create_port_list( + name="foo", port_range="T:1-1234", comment="lorem" + ) + + self.assertEqual( + bytes(request), + b"" + b"foo" + b"T:1-1234" + b"lorem" + b"", + ) + + def test_create_port_range_missing_port_list_id(self): + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id=None, + start=1, + end=1234, + port_range_type=PortRangeType.TCP, + ) + + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id="", + start=1, + end=1234, + port_range_type=PortRangeType.TCP, + ) + + def test_create_port_range_missing_start(self): + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id="pl1", + start=None, + end=1234, + port_range_type=PortRangeType.TCP, + ) + + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id="pl1", + start="", + end=1234, + port_range_type=PortRangeType.TCP, + ) + + def test_create_port_range_missing_end(self): + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id="pl1", + start=1, + end=None, + port_range_type=PortRangeType.TCP, + ) + + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id="pl1", + start=1, + end="", + port_range_type=PortRangeType.TCP, + ) + + def test_create_port_range_missing_port_range_type(self): + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id="pl1", start=1, end=1234, port_range_type=None + ) + + with self.assertRaises(RequiredArgument): + PortList.create_port_range( + port_list_id="pl1", start=1, end=1234, port_range_type="" + ) + + def test_create_port_range_invalid_port_range_type(self): + with self.assertRaises(InvalidArgument): + PortList.create_port_range( + port_list_id="pl1", start=1, end=1234, port_range_type="blubb" + ) + + def test_create_port_range(self): + request = PortList.create_port_range( + port_list_id="pl1", + start=1, + end=1234, + port_range_type=PortRangeType.TCP, + ) + + self.assertEqual( + bytes(request), + b"" + b'' + b"1" + b"1234" + b"TCP" + b"", + ) + + request = PortList.create_port_range( + port_list_id="pl1", + start=1, + end=1234, + port_range_type=PortRangeType.UDP, + ) + + self.assertEqual( + bytes(request), + b"" + b'' + b"1" + b"1234" + b"UDP" + b"", + ) + + request = PortList.create_port_range( + port_list_id="pl1", + start="1", + end="1234", + port_range_type=PortRangeType.TCP, + ) + + self.assertEqual( + bytes(request), + b"" + b'' + b"1" + b"1234" + b"TCP" + b"", + ) + + def test_create_port_range_with_comment(self): + request = PortList.create_port_range( + port_list_id="pl1", + start=1, + end=1234, + port_range_type=PortRangeType.TCP, + comment="lorem", + ) + + self.assertEqual( + bytes(request), + b"" + b'' + b"1" + b"1234" + b"TCP" + b"lorem" + b"", + ) + + def test_delete(self): + request = PortList.delete_port_list("a1") + + self.assertEqual( + bytes(request), + b'', + ) + + def test_delete_ultimate(self): + request = PortList.delete_port_list("a1", ultimate=True) + + self.assertEqual( + bytes(request), + b'', + ) + + def test_delete_missing_id(self): + with self.assertRaises(RequiredArgument): + PortList.delete_port_list(None) + + with self.assertRaises(RequiredArgument): + PortList.delete_port_list("") + + def test_delete_port_range(self): + request = PortList.delete_port_range("a1") + + self.assertEqual( + bytes(request), b'' + ) + + def test_delete_port_range_missing_id(self): + with self.assertRaises(RequiredArgument): + PortList.delete_port_range(None) + + with self.assertRaises(RequiredArgument): + PortList.delete_port_range("") + + def test_get_port_lists(self): + request = PortList.get_port_lists() + + self.assertEqual(bytes(request), b"") + + def test_get_port_lists_with_filter_string(self): + request = PortList.get_port_lists(filter_string="foo=bar") + + self.assertEqual(bytes(request), b'') + + def test_get_port_lists_with_filter_id(self): + request = PortList.get_port_lists(filter_id="f1") + + self.assertEqual(bytes(request), b'') + + def test_get_port_lists_with_trash(self): + request = PortList.get_port_lists(trash=True) + + self.assertEqual(bytes(request), b'') + + request = PortList.get_port_lists(trash=False) + + self.assertEqual(bytes(request), b'') + + def test_get_port_lists_with_details(self): + request = PortList.get_port_lists(details=True) + + self.assertEqual(bytes(request), b'') + + request = PortList.get_port_lists(details=False) + + self.assertEqual(bytes(request), b'') + + def test_get_port_lists_with_targets(self): + request = PortList.get_port_lists(targets=True) + + self.assertEqual(bytes(request), b'') + + request = PortList.get_port_lists(targets=False) + + self.assertEqual(bytes(request), b'') + + def test_get_port_list(self): + request = PortList.get_port_list(port_list_id="port_list_id") + + self.assertEqual( + bytes(request), + b'', + ) + + def test_get_port_list_missing_port_list_id(self): + with self.assertRaises(RequiredArgument): + PortList.get_port_list(port_list_id=None) + + with self.assertRaises(RequiredArgument): + PortList.get_port_list(port_list_id="") + + with self.assertRaises(RequiredArgument): + PortList.get_port_list("") + + def test_modify_port_list(self): + request = PortList.modify_port_list(port_list_id="p1") + + self.assertEqual( + bytes(request), b'' + ) + + def test_modify_port_list_missing_port_list_id(self): + with self.assertRaises(RequiredArgument): + PortList.modify_port_list(port_list_id=None) + + with self.assertRaises(RequiredArgument): + PortList.modify_port_list(port_list_id="") + + with self.assertRaises(RequiredArgument): + PortList.modify_port_list("") + + def test_modify_port_list_with_comment(self): + request = PortList.modify_port_list(port_list_id="p1", comment="foo") + + self.assertEqual( + bytes(request), + b'' + b"foo" + b"", + ) + + def test_modify_port_list_with_name(self): + request = PortList.modify_port_list(port_list_id="p1", name="foo") + + self.assertEqual( + bytes(request), + b'' + b"foo" + b"", + ) diff --git a/tests/protocols/gmp/requests/test_resource_names.py b/tests/protocols/gmp/requests/test_resource_names.py new file mode 100644 index 000000000..346151b47 --- /dev/null +++ b/tests/protocols/gmp/requests/test_resource_names.py @@ -0,0 +1,417 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest + +from gvm.errors import InvalidArgument, RequiredArgument +from gvm.protocols.core import Request +from gvm.protocols.gmp.requests import ResourceNames, ResourceType + + +class ResourceNamesTestCase(unittest.TestCase): + def test_get_resource_names(self) -> None: + request = ResourceNames.get_resource_names(ResourceType.ALERT) + + self.assertIsInstance(request, Request) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.CERT_BUND_ADV) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.CONFIG) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names( + resource_type=ResourceType.CPE + ) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.CREDENTIAL) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.CVE) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.DFN_CERT_ADV) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.FILTER) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.GROUP) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.HOST) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.NOTE) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.NVT) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.OS) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.OVERRIDE) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.PERMISSION) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.PORT_LIST) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.REPORT_FORMAT) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.REPORT) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.RESULT) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.ROLE) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.SCANNER) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.SCHEDULE) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.TARGET) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.TASK) + + self.assertEqual(bytes(request), b'') + + request = ResourceNames.get_resource_names(ResourceType.TLS_CERTIFICATE) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_names(ResourceType.USER) + + self.assertEqual(bytes(request), b'') + + def test_get_resource_names_missing_resource_type(self) -> None: + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_names(resource_type=None) # type: ignore + + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_names(resource_type="") + + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_names("") + + def test_get_resource_names_invalid_resource_type(self) -> None: + with self.assertRaises(InvalidArgument): + ResourceNames.get_resource_names(resource_type="foo") + + def test_get_resource_names_with_filter_string(self) -> None: + request = ResourceNames.get_resource_names( + ResourceType.CPE, filter_string="foo=bar" + ) + + self.assertEqual( + bytes(request), b'' + ) + + def test_get_resource_name(self): + request = ResourceNames.get_resource_name( + resource_type=ResourceType.ALERT, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.CERT_BUND_ADV, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.CONFIG, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.CPE, resource_id="i1" + ) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.CREDENTIAL, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.CVE, resource_id="i1" + ) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.DFN_CERT_ADV, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.FILTER, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.GROUP, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.HOST, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.NOTE, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.NVT, resource_id="i1" + ) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.OS, resource_id="i1" + ) + + self.assertEqual( + bytes(request), b'' + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.PERMISSION, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.PORT_LIST, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.REPORT_FORMAT, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.REPORT, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.RESULT, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.ROLE, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.SCANNER, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.SCHEDULE, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.TARGET, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.TASK, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.TLS_CERTIFICATE, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + request = ResourceNames.get_resource_name( + resource_type=ResourceType.USER, resource_id="i1" + ) + + self.assertEqual( + bytes(request), + b'', + ) + + def test_get_resource_name_missing_resource_type(self): + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_name( + resource_id="i1", resource_type=None + ) + + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_name(resource_id="i1", resource_type="") + + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_name("i1", "") + + def test_get_resource_name_invalid_resource_type(self): + with self.assertRaises(InvalidArgument): + ResourceNames.get_resource_name( + resource_id="i1", resource_type="foo" + ) + + def test_get_resource_name_missing_resource_id(self): + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_name( + resource_id="", resource_type=ResourceType.CPE + ) + + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_name("", resource_type=ResourceType.CPE) + + with self.assertRaises(RequiredArgument): + ResourceNames.get_resource_name( + resource_id=None, resource_type=ResourceType.CPE + ) diff --git a/tests/protocols/gmp/requests/test_version.py b/tests/protocols/gmp/requests/test_version.py new file mode 100644 index 000000000..751f25be9 --- /dev/null +++ b/tests/protocols/gmp/requests/test_version.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest + +from gvm.protocols.core import Request +from gvm.protocols.gmp.requests import Version + + +class VersionTestCase(unittest.TestCase): + def test_version(self) -> None: + request = Version.get_version() + + self.assertIsInstance(request, Request) + self.assertEqual(bytes(request), b"")