diff --git a/pytest_splunk_addon/event_ingestors/sc4s_event_ingestor.py b/pytest_splunk_addon/event_ingestors/sc4s_event_ingestor.py index 284dae8b..3ff5e081 100644 --- a/pytest_splunk_addon/event_ingestors/sc4s_event_ingestor.py +++ b/pytest_splunk_addon/event_ingestors/sc4s_event_ingestor.py @@ -15,76 +15,87 @@ # import socket from time import sleep -import os -import re -import concurrent.futures -from .base_event_ingestor import EventIngestor import logging +from typing import Dict + +from .base_event_ingestor import EventIngestor + LOGGER = logging.getLogger("pytest-splunk-addon") class SC4SEventIngestor(EventIngestor): """ - Class to Ingest Events via SC4S - - The format for required_configs is:: - - { - sc4s_host (str): Address of the Splunk Server. Do not provide http scheme in the host. - sc4s_port (int): Port number of the above host address - } + Class to ingest events via SC4S (supports both IPv4 and IPv6) Args: required_configs (dict): Dictionary containing splunk host and sc4s port """ - def __init__(self, required_configs): + def __init__(self, required_configs: Dict[str, str]) -> None: self.sc4s_host = required_configs["sc4s_host"] self.sc4s_port = required_configs["sc4s_port"] - self.server_address = ( - required_configs["sc4s_host"], - required_configs["sc4s_port"], - ) + + def _create_socket(self): + """Try all addresses (IPv4 and IPv6) and return a connected socket.""" + last_exc = None + for res in socket.getaddrinfo( + self.sc4s_host, self.sc4s_port, socket.AF_UNSPEC, socket.SOCK_STREAM + ): + af, socktype, proto, _, sa = res + try: + sock = socket.socket(af, socktype, proto) + if af == socket.AF_INET6: + # Attempt dual-stack if supported + try: + sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + except (AttributeError, OSError): + pass + sock.connect(sa) + return sock + except Exception as e: + last_exc = e + LOGGER.debug(f"Failed to connect to {sa}: {e}") + try: + sock.close() + except Exception: + pass + continue + raise ConnectionError( + f"Could not connect to SC4S at {self.sc4s_host}:{self.sc4s_port} via IPv4 or IPv6" + ) from last_exc def ingest(self, events, thread_count): """ - Ingests events in the splunk via sc4s (Single/Batch of Events) + Ingests events in Splunk via SC4S (single/batch of events) Args: events (list): Events with newline character or LineBreaker as separator - """ - # This loop just checks for a viable remote connection + # Retry loop to establish connection tried = 0 - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while True: try: - sock.connect(self.server_address) + sock = self._create_socket() break except Exception as e: tried += 1 - LOGGER.debug("Attempt {} to ingest data with SC4S".format(str(tried))) + LOGGER.debug(f"Attempt {tried} to ingest data with SC4S") if tried > 90: - LOGGER.error( - "Failed to ingest event with SC4S {} times".format(str(tried)) - ) + LOGGER.error(f"Failed to ingest event with SC4S {tried} times") raise e sleep(1) - finally: - sock.close() - raw_events = list() - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(self.server_address) - for event in events: - # raw_events.extend() - for se in event.event.splitlines(): - try: - sock.sendall(str.encode(se + "\n")) - except Exception as e: - LOGGER.debug("Attempt ingest data with SC4S=".format(se)) - LOGGER.exception(e) - sleep(1) - sock.close() + # Send events + try: + for event in events: + for se in event.event.splitlines(): + try: + sock.sendall(str.encode(se + "\n")) + except Exception as e: + LOGGER.debug(f"Attempt ingest data with SC4S: {se}") + LOGGER.exception(e) + sleep(1) + finally: + sock.close() diff --git a/tests/unit/tests_standard_lib/test_event_ingestors/test_sc4s_event_ingestor.py b/tests/unit/tests_standard_lib/test_event_ingestors/test_sc4s_event_ingestor.py index 0e4246e1..c69a961b 100644 --- a/tests/unit/tests_standard_lib/test_event_ingestors/test_sc4s_event_ingestor.py +++ b/tests/unit/tests_standard_lib/test_event_ingestors/test_sc4s_event_ingestor.py @@ -25,14 +25,6 @@ def socket_mock(monkeypatch): "socket.socket", socket_mock, ) - monkeypatch.setattr( - "socket.AF_INET", - "AF_INET", - ) - monkeypatch.setattr( - "socket.SOCK_STREAM", - "SOCK_STREAM", - ) return socket_mock @@ -46,25 +38,22 @@ def sleep_mock(monkeypatch): def test_sc4s_data_can_be_ingested(socket_mock, sc4s_ingestor, sc4s_events): sc4s_ingestor.ingest(sc4s_events, 20) - assert socket_mock.call_count == 2 - socket_mock.assert_has_calls( - [call("AF_INET", "SOCK_STREAM"), call("AF_INET", "SOCK_STREAM")], any_order=True - ) - assert socket_mock.connect.call_count == 2 - socket_mock.connect.assert_has_calls( - [call(("127.0.0.1", 55730)), call(("127.0.0.1", 55730))] - ) - assert socket_mock.sendall.call_count == 2 - assert socket_mock.close.call_count == 2 + assert socket_mock.call_count == 1 # Socket created once + assert socket_mock.connect.call_count == 1 + socket_mock.connect.assert_called_with(("127.0.0.1", 55730)) + assert socket_mock.sendall.call_count == len(sc4s_events) + assert socket_mock.close.call_count == 1 def test_exception_raised_when_sc4s_socket_can_not_be_opened( socket_mock, sleep_mock, sc4s_ingestor, sc4s_events, caplog ): - socket_mock.connect.side_effect = Exception - pytest.raises(Exception, sc4s_ingestor.ingest, *(sc4s_events, 20)) - assert "Failed to ingest event with SC4S 91 times" in caplog.messages - assert socket_mock.connect.call_count == socket_mock.close.call_count == 91 + socket_mock.connect.side_effect = Exception("Connection failed") + with pytest.raises(ConnectionError): + sc4s_ingestor.ingest(sc4s_events, 20) + assert "Failed to ingest event with SC4S 91 times" in caplog.text + assert socket_mock.connect.call_count == 91 + assert socket_mock.close.call_count == 91 def test_exception_raised_when_sc4s_event_sent( @@ -72,5 +61,6 @@ def test_exception_raised_when_sc4s_event_sent( ): socket_mock.sendall.side_effect = Exception("Send data fail") sc4s_ingestor.ingest(sc4s_events, 20) - assert "Send data fail" in caplog.messages - assert socket_mock.connect.call_count == socket_mock.close.call_count == 2 + assert "Send data fail" in caplog.text + assert socket_mock.connect.call_count == 1 + assert socket_mock.close.call_count == 1