diff --git a/gvm/protocols/gmp/__init__.py b/gvm/protocols/gmp/__init__.py new file mode 100644 index 000000000..b38ab7582 --- /dev/null +++ b/gvm/protocols/gmp/__init__.py @@ -0,0 +1,12 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from ._gmp import GMP + +Gmp = GMP # for backwards compatibility + +__all__ = ( + "GMP", + "Gmp", +) diff --git a/gvm/protocols/gmp.py b/gvm/protocols/gmp/_gmp.py similarity index 68% rename from gvm/protocols/gmp.py rename to gvm/protocols/gmp/_gmp.py index d5b04483d..869621586 100644 --- a/gvm/protocols/gmp.py +++ b/gvm/protocols/gmp/_gmp.py @@ -1,29 +1,23 @@ # SPDX-FileCopyrightText: 2019-2024 Greenbone AG # # SPDX-License-Identifier: GPL-3.0-or-later -# -""" -Module for communication with gvmd -""" from types import TracebackType -from typing import Any, Callable, Optional, Type, Union +from typing import Callable, Optional, Type, Union +from gvm.connections import GvmConnection from gvm.errors import GvmError -from gvm.protocols.base import GvmConnection, GvmProtocol -from gvm.protocols.gmpv208 import Gmp as Gmpv208 -from gvm.protocols.gmpv214 import Gmp as Gmpv214 -from gvm.protocols.gmpv224 import Gmp as Gmpv224 -from gvm.protocols.gmpv225 import Gmp as Gmpv225 -from gvm.transforms import EtreeCheckCommandTransform -from gvm.xml import XmlCommand -SUPPORTED_GMP_VERSIONS = Union[ # pylint: disable=invalid-name - Gmpv208, Gmpv214, Gmpv224, Gmpv225 -] +from .._protocol import GvmProtocol, T, str_transform +from ._gmp224 import GMPv224 +from ._gmp225 import GMPv225 +from .core import Response +from .core.requests import Version + +SUPPORTED_GMP_VERSIONS = Union[GMPv224, GMPv225] -class Gmp(GvmProtocol): +class GMP(GvmProtocol[T]): """Dynamically select supported GMP protocol of the remote manager daemon. Must be used as a `Context Manager @@ -59,19 +53,18 @@ def __init__( self, connection: GvmConnection, *, - transform: Optional[Callable[[str], Any]] = None, + transform: Callable[[Response], T] = str_transform, # type: ignore[assignment] # this should work with mypy 1.9.0 without an ignore ): - super().__init__(connection, transform=EtreeCheckCommandTransform()) - self._gmp_transform = transform + super().__init__(connection, transform=transform) def determine_remote_gmp_version(self) -> str: """Determine the supported GMP version of the remote daemon""" self.connect() - resp = self._send_xml_command(XmlCommand("get_version")) + resp = self._send_command(Version.get_version()) self.disconnect() - version_el = resp.find("version") - if version_el is None: + version_el = resp.xml().find("version") + if version_el is None or not version_el.text: raise GvmError( "Invalid response from manager daemon while requesting the " "version information." @@ -86,21 +79,19 @@ def determine_supported_gmp(self) -> SUPPORTED_GMP_VERSIONS: version_str = self.determine_remote_gmp_version().split(".", 1) major_version = int(version_str[0]) minor_version = int(version_str[1]) - if major_version == 20: - gmp_class = Gmpv208 - elif major_version == 21 and minor_version == 4: - gmp_class = Gmpv214 - elif major_version == 22 and minor_version == 4: - gmp_class = Gmpv224 + # if major_version == 22 and minor_version == 4: + # gmp_class = Gmpv224 + if major_version == 22 and minor_version == 4: + gmp_class = GMPv224 elif major_version == 22 and minor_version == 5: - gmp_class = Gmpv225 + gmp_class = GMPv225 else: raise GvmError( "Remote manager daemon uses an unsupported version of GMP. " f"The GMP version was {major_version}.{minor_version}" ) - return gmp_class(self._connection, transform=self._gmp_transform) + return gmp_class(self._connection, transform=self._transform_callable) def __enter__(self): self._gmp = self.determine_supported_gmp() @@ -114,6 +105,6 @@ def __exit__( exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], - ) -> Any: + ) -> None: self._gmp.disconnect() self._gmp = None diff --git a/gvm/protocols/gmp/_gmp224.py b/gvm/protocols/gmp/_gmp224.py new file mode 100644 index 000000000..9f15a56ae --- /dev/null +++ b/gvm/protocols/gmp/_gmp224.py @@ -0,0 +1,149 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import Optional, Union + +from .._protocol import GvmProtocol, T +from .core.requests import ( + Authentication, + PortList, + PortRangeType, + Version, +) + + +class GMPv224(GvmProtocol[T]): + _authenticated = False + + def is_authenticated(self) -> bool: + """Checks if the user is authenticated + + If the user is authenticated privileged GMP commands like get_tasks + may be send to gvmd. + + Returns: + bool: True if an authenticated connection to gvmd has been + established. + """ + return self._authenticated + + def authenticate(self, username: str, password: str) -> T: + """Authenticate to gvmd. + + The generated authenticate command will be send to server. + Afterwards the response is read, transformed and returned. + + Arguments: + username: Username + password: Password + """ + response = self._send_command( + Authentication.authenticate(username=username, password=password) + ) + + if response.is_success: + self._authenticated = True + + return self._transform(response) + + def describe_auth(self) -> T: + """Describe authentication methods + + Returns a list of all used authentication methods if such a list is + available. + + Returns: + The response. See :py:meth:`send_command` for details. + """ + return self._send_and_transform_command(Authentication.describe_auth()) + + def modify_auth( + self, group_name: str, auth_conf_settings: dict[str, str] + ) -> T: + """Modifies an existing auth. + + Arguments: + group_name: Name of the group to be modified. + auth_conf_settings: The new auth config. + """ + return self._send_and_transform_command( + Authentication.modify_auth(group_name, auth_conf_settings) + ) + + def get_version(self) -> T: + return self._send_and_transform_command(Version.get_version()) + + def clone_port_list(self, port_list_id: str) -> T: + return self._send_and_transform_command( + PortList.clone_port_list(port_list_id) + ) + + def create_port_list( + self, name: str, port_range: str, *, comment: Optional[str] = None + ) -> T: + return self._send_and_transform_command( + PortList.create_port_list(name, port_range, comment=comment) + ) + + def create_port_range( + self, + port_list_id: str, + start: int, + end: int, + port_range_type: Union[str, PortRangeType], + *, + comment: Optional[str] = None, + ) -> T: + return self._send_and_transform_command( + PortList.create_port_range( + port_list_id, start, end, port_range_type, comment=comment + ) + ) + + def delete_port_list( + self, port_list_id: str, *, ultimate: bool = False + ) -> T: + return self._send_and_transform_command( + PortList.delete_port_list(port_list_id, ultimate=ultimate) + ) + + def delete_port_range(self, port_range_id: str) -> T: + return self._send_and_transform_command( + PortList.delete_port_range(port_range_id) + ) + + def get_port_lists( + self, + *, + filter_string: Optional[str] = None, + filter_id: Optional[str] = None, + details: Optional[bool] = None, + targets: Optional[bool] = None, + trash: Optional[bool] = None, + ) -> T: + return self._send_and_transform_command( + PortList.get_port_lists( + filter_string=filter_string, + filter_id=filter_id, + details=details, + targets=targets, + trash=trash, + ) + ) + + def get_port_list(self, port_list_id: str) -> T: + return self._send_and_transform_command( + PortList.get_port_list(port_list_id) + ) + + def modify_port_list( + self, + port_list_id: str, + *, + comment: Optional[str] = None, + name: Optional[str] = None, + ) -> T: + return self._send_and_transform_command( + PortList.modify_port_list(port_list_id, comment=comment, name=name) + ) diff --git a/gvm/protocols/gmp/_gmp225.py b/gvm/protocols/gmp/_gmp225.py new file mode 100644 index 000000000..982cfe7f0 --- /dev/null +++ b/gvm/protocols/gmp/_gmp225.py @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import Optional + +from .._protocol import T +from ._gmp224 import GMPv224 +from .core.requests import ( + ResourceNames, + ResourceType, +) + + +class GMPv225(GMPv224[T]): + def get_resource_names( + self, + resource_type: ResourceType, + *, + filter_string: Optional[str] = None, + ) -> T: + """Request a list of resource names and IDs + + Arguments: + resource_type: Type must be either ALERT, CERT_BUND_ADV, + CONFIG, CPE, CREDENTIAL, CVE, DFN_CERT_ADV, FILTER, + GROUP, HOST, NOTE, NVT, OS, OVERRIDE, PERMISSION, + PORT_LIST, REPORT_FORMAT, REPORT, RESULT, ROLE, + SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE + or USER + filter_string: Filter term to use for the query + """ + return self._send_and_transform_command( + ResourceNames.get_resource_names( + resource_type, filter_string=filter_string + ) + ) + + def get_resource_name( + self, resource_id: str, resource_type: ResourceType + ) -> T: + """Request a single resource name + + Arguments: + resource_id: ID of an existing resource + resource_type: Type must be either ALERT, CERT_BUND_ADV, + CONFIG, CPE, CREDENTIAL, CVE, DFN_CERT_ADV, FILTER, + GROUP, HOST, NOTE, NVT, OS, OVERRIDE, PERMISSION, + PORT_LIST, REPORT_FORMAT, REPORT, RESULT, ROLE, + SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE + or USER + """ + return self._send_and_transform_command( + ResourceNames.get_resource_name(resource_id, resource_type) + )