Skip to content

Commit

Permalink
Add: Finalize GMP Connection class and add a test case
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoernricks authored and greenbonebot committed Jun 14, 2024
1 parent 9f8a6df commit 66362ed
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 3 deletions.
3 changes: 2 additions & 1 deletion gvm/protocols/gmp/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later

from ._connection import Connection
from ._connection import Connection, InvalidStateError
from ._request import Request
from ._response import Response, StatusError

__all__ = (
"Connection",
"InvalidStateError",
"Request",
"Response",
"StatusError",
Expand Down
35 changes: 33 additions & 2 deletions gvm/protocols/gmp/core/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def feed_xml(self, data: AnyStr) -> None:


class InvalidStateError(GvmError):
"""
Error raised if the Connection would be moved into an invalid state
"""

def __init__(self, message: str = "Invalid State", *args):
super().__init__(message, *args)

Expand Down Expand Up @@ -148,21 +152,48 @@ def receive_data(self, data: bytes) -> Optional[Response]:

class Connection:
"""
This is a [SansIO]() connection and not a socket connection
This is a [SansIO](https://sans-io.readthedocs.io) connection for GMP
It is responsible for
It is responsible for creating bytes from GMP XML requests and transforming
XML response data into GMP responses.
"""

def __init__(self) -> None:
self.__set_state__(InitialState())

def send(self, request: Request) -> bytes:
"""
Create data from a request to be send
Returns:
The data for a request that can be send for example over a socket
Raises:
An InvalidStateError if no request can be send currently. For
example when waiting for a response to a previous request.
"""
return self._state.send(request)

def receive_data(self, data: bytes) -> Optional[Response]:
"""
Feed received data a response is complete
Returns:
A Response if the response data is complete and None if data is
still to be received.
Raises:
An InvalidStateError if no data can be received currently. For
example if not request is send yet.
"""
return self._state.receive_data(data)

def close(self) -> None:
"""
Close the connection and reset the state of the protocol
Afterwards the connection can be reused for sending a new request.
"""
return self._state.close()

def __set_state__(self, state: State) -> None:
Expand Down
146 changes: 146 additions & 0 deletions tests/protocols/gmp/core/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

import unittest

from gvm.errors import GvmError
from gvm.protocols.gmp.core import Connection, InvalidStateError


class RequestMock:
def __init__(self, data: bytes) -> None:
self._data = data

def __bytes__(self) -> bytes:
return self._data


class ConnectionTestCase(unittest.TestCase):
def test_send_request(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

data = connection.send(request)
self.assertEqual(data, b"<request/>")

def test_receive_data(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

connection.send(request)
response = connection.receive_data(b"<response")

self.assertIsNone(response)
response = connection.receive_data(b' status="200"/>')

self.assertIsNotNone(response)
self.assertTrue(response.is_success) # type: ignore

another_request = RequestMock(b"<another_request/>")
connection.send(another_request)

response = connection.receive_data(b"<another_response")

self.assertIsNone(response)
response = connection.receive_data(b' status="200"/>')

self.assertIsNotNone(response)
self.assertTrue(response.is_success) # type: ignore

def test_receive_invalid_data(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

connection.send(request)
with self.assertRaisesRegex(
GvmError,
"^Cannot parse XML response. Response data read b'<response<>'$",
):
connection.receive_data(b"<response<>")

def test_error_state_close(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

connection.send(request)
with self.assertRaisesRegex(
GvmError,
"^Cannot parse XML response. Response data read b'<response<>'$",
):
connection.receive_data(b"<response<>")

with self.assertRaisesRegex(
InvalidStateError,
"^The connection is in an error state. Please close the connection.$",
):
connection.receive_data(b"<response/>")

with self.assertRaisesRegex(
InvalidStateError,
"^The connection is in an error state. Please close the connection.$",
):
connection.send(request)

connection.close()
connection.send(request)

def test_send_when_receiving_data(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

connection.send(request)
connection.receive_data(b"<response")

with self.assertRaisesRegex(
InvalidStateError,
"^Invalid State$",
):
connection.send(request)

def test_close_before_send(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

connection.close()

data = connection.send(request)
self.assertEqual(data, b"<request/>")

def test_close_after_send(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

data = connection.send(request)
self.assertEqual(data, b"<request/>")

connection.close()

with self.assertRaisesRegex(
InvalidStateError,
"^Invalid State$",
):
connection.receive_data(b"<response")

data = connection.send(request)
self.assertEqual(data, b"<request/>")

def test_close_after_receive_data(self) -> None:
request = RequestMock(b"<request/>")
connection = Connection()

data = connection.send(request)
self.assertEqual(data, b"<request/>")

connection.receive_data(b"<response")

connection.close()

with self.assertRaisesRegex(
InvalidStateError,
"^Invalid State$",
):
connection.receive_data(b' status="200"/>')

data = connection.send(request)
self.assertEqual(data, b"<request/>")

0 comments on commit 66362ed

Please sign in to comment.