-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add: Add a new GvmProtocol class that will replace the one in gvm.base
Changes compared to the gvm.base class: * Changed send_command into a private method that expects Request instances * Changed the signatures of the transform callable to expect a Response instance as argument instead of bytes * Changed the new GvmProtocol class into a Generic
- Loading branch information
1 parent
94e3998
commit a65c046
Showing
1 changed file
with
126 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# SPDX-FileCopyrightText: 2019-2024 Greenbone AG | ||
# | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from types import TracebackType | ||
from typing import Callable, Generic, Optional, Type | ||
|
||
from typing_extensions import Self, TypeVar | ||
|
||
from gvm.connections import GvmConnection | ||
|
||
from .gmp.core import Connection, Request, Response | ||
|
||
T = TypeVar("T", default=str) | ||
|
||
|
||
def str_transform(response: Response) -> str: | ||
return str(response) | ||
|
||
|
||
class GvmProtocol(Generic[T]): | ||
"""Base class for different GVM protocols | ||
Attributes: | ||
connection: Connection to use to talk with the remote daemon. See | ||
:mod:`gvm.connections` for possible connection types. | ||
transform: Optional transform `callable`_ to convert response data. | ||
After each request the callable gets passed the plain response data | ||
which can be used to check the data and/or conversion into different | ||
representations like a xml dom. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
connection: GvmConnection, | ||
*, | ||
transform: Callable[[Response], T] = str_transform, # type: ignore[assignment] # this should work with mypy 1.9.0 without an ignore | ||
): | ||
self._connection = connection | ||
self._protocol = Connection() | ||
|
||
self._connected = False | ||
|
||
self._transform_callable = transform | ||
|
||
def __enter__(self) -> Self: | ||
self.connect() | ||
return self | ||
|
||
def __exit__( | ||
self, | ||
exc_type: Optional[Type[BaseException]], | ||
exc_value: Optional[BaseException], | ||
traceback: Optional[TracebackType], | ||
) -> None: | ||
self.disconnect() | ||
|
||
def _read(self) -> bytes: | ||
"""Read a command response from gvmd | ||
Returns: | ||
str: Response from server. | ||
""" | ||
return self._connection.read() | ||
|
||
def _send(self, data: bytes) -> None: | ||
"""Send a command to the server | ||
Arguments: | ||
data (str): Data to be send over the connection to the server | ||
""" | ||
self.connect() | ||
self._connection.send(data) | ||
|
||
def is_connected(self) -> bool: | ||
"""Status of the current connection | ||
Returns: | ||
True if a connection to the remote server has been established. | ||
""" | ||
return self._connected | ||
|
||
def connect(self) -> None: | ||
"""Initiates a protocol connection | ||
Normally connect is not called directly. Either it is called | ||
automatically when sending a protocol command or when using a | ||
`with statement`_. | ||
.. _with statement: | ||
https://docs.python.org/3/reference/datamodel.html#with-statement-context-managers | ||
""" | ||
if not self.is_connected(): | ||
self._connection.connect() | ||
self._connected = True | ||
|
||
def disconnect(self) -> None: | ||
"""Disconnect the connection | ||
Ends and closes the connection. | ||
""" | ||
if self.is_connected(): | ||
self._connection.disconnect() | ||
self._connected = False | ||
|
||
self._protocol.close() | ||
|
||
def _transform(self, response: Response) -> T: | ||
transform = self._transform_callable | ||
return transform(response) | ||
|
||
def _send_command(self, cmd: Request) -> Response: | ||
try: | ||
send_data = self._protocol.send(cmd) | ||
self._send(send_data) | ||
response: Optional[Response] = None | ||
while not response: | ||
received_data = self._read() | ||
response = self._protocol.receive_data(received_data) | ||
return response | ||
except Exception as e: | ||
self.disconnect() | ||
raise e | ||
|
||
def _send_and_transform_command(self, cmd: Request) -> T: | ||
return self._transform(self._send_command(cmd)) |