Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 48 additions & 43 deletions pytest_splunk_addon/event_ingestors/sc4s_event_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,76 +15,81 @@
#
import socket
from time import sleep
import os
import re
import concurrent.futures
from .base_event_ingestor import EventIngestor
import logging
from .base_event_ingestor import EventIngestor

LOGGER = logging.getLogger("pytest-splunk-addon")


class SC4SEventIngestor(EventIngestor):
"""
Class to Ingest Events via SC4S
The format for required_configs is::
{
sc4s_host (str): Address of the Splunk Server. Do not provide http scheme in the host.
sc4s_port (int): Port number of the above host address
}
Args:
required_configs (dict): Dictionary containing splunk host and sc4s port
Class to ingest events via SC4S (supports both IPv4 and IPv6)
"""

def __init__(self, required_configs):
self.sc4s_host = required_configs["sc4s_host"]
self.sc4s_port = required_configs["sc4s_port"]
self.server_address = (
required_configs["sc4s_host"],
required_configs["sc4s_port"],
)

def _create_socket(self):
"""Try all addresses (IPv4 and IPv6) and return a connected socket."""
last_exc = None
for res in socket.getaddrinfo(
self.sc4s_host, self.sc4s_port, socket.AF_UNSPEC, socket.SOCK_STREAM
):
af, socktype, proto, canonname, sa = res
try:
sock = socket.socket(af, socktype, proto)
if af == socket.AF_INET6:
# Attempt dual-stack if supported
try:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
except (AttributeError, OSError):
pass
sock.connect(sa)
return sock
except Exception as e:
last_exc = e
LOGGER.debug(f"Failed to connect to {sa}: {e}")
try:
sock.close()
except Exception:
pass
continue
raise ConnectionError(
f"Could not connect to SC4S at {self.sc4s_host}:{self.sc4s_port} via IPv4 or IPv6"
) from last_exc

def ingest(self, events, thread_count):
"""
Ingests events in the splunk via sc4s (Single/Batch of Events)
Ingests events in Splunk via SC4S (single/batch of events)
Args:
events (list): Events with newline character or LineBreaker as separator
"""

# This loop just checks for a viable remote connection
# Retry loop to establish connection
tried = 0
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
sock.connect(self.server_address)
sock = self._create_socket()
break
except Exception as e:
tried += 1
LOGGER.debug("Attempt {} to ingest data with SC4S".format(str(tried)))
LOGGER.debug(f"Attempt {tried} to ingest data with SC4S")
if tried > 90:
LOGGER.error(
"Failed to ingest event with SC4S {} times".format(str(tried))
)
LOGGER.error(f"Failed to ingest event with SC4S {tried} times")
raise e
sleep(1)
finally:
sock.close()

raw_events = list()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(self.server_address)
for event in events:
# raw_events.extend()
for se in event.event.splitlines():
try:
sock.sendall(str.encode(se + "\n"))
except Exception as e:
LOGGER.debug("Attempt ingest data with SC4S=".format(se))
LOGGER.exception(e)
sleep(1)
sock.close()
# Send events
try:
for event in events:
for se in event.event.splitlines():
try:
sock.sendall(str.encode(se + "\n"))
except Exception as e:
LOGGER.debug(f"Attempt ingest data with SC4S: {se}")
LOGGER.exception(e)
sleep(1)
finally:
sock.close()
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@ def socket_mock(monkeypatch):
"socket.socket",
socket_mock,
)
monkeypatch.setattr(
"socket.AF_INET",
"AF_INET",
)
monkeypatch.setattr(
"socket.SOCK_STREAM",
"SOCK_STREAM",
)
return socket_mock


Expand All @@ -46,31 +38,29 @@ def sleep_mock(monkeypatch):

def test_sc4s_data_can_be_ingested(socket_mock, sc4s_ingestor, sc4s_events):
sc4s_ingestor.ingest(sc4s_events, 20)
assert socket_mock.call_count == 2
socket_mock.assert_has_calls(
[call("AF_INET", "SOCK_STREAM"), call("AF_INET", "SOCK_STREAM")], any_order=True
)
assert socket_mock.connect.call_count == 2
socket_mock.connect.assert_has_calls(
[call(("127.0.0.1", 55730)), call(("127.0.0.1", 55730))]
)
assert socket_mock.sendall.call_count == 2
assert socket_mock.close.call_count == 2
assert socket_mock.call_count == 1 # Socket created once
assert socket_mock.connect.call_count == 1
socket_mock.connect.assert_called_with(("127.0.0.1", 55730))
assert socket_mock.sendall.call_count == len(sc4s_events)
assert socket_mock.close.call_count == 1


def test_exception_raised_when_sc4s_socket_can_not_be_opened(
socket_mock, sleep_mock, sc4s_ingestor, sc4s_events, caplog
):
socket_mock.connect.side_effect = Exception
pytest.raises(Exception, sc4s_ingestor.ingest, *(sc4s_events, 20))
assert "Failed to ingest event with SC4S 91 times" in caplog.messages
assert socket_mock.connect.call_count == socket_mock.close.call_count == 91
socket_mock.connect.side_effect = Exception("Connection failed")
with pytest.raises(ConnectionError):
sc4s_ingestor.ingest(sc4s_events, 20)
assert "Failed to ingest event with SC4S 91 times" in caplog.text
assert socket_mock.connect.call_count == 91
assert socket_mock.close.call_count == 91


def test_exception_raised_when_sc4s_event_sent(
socket_mock, sleep_mock, sc4s_ingestor, sc4s_events, caplog
):
socket_mock.sendall.side_effect = Exception("Send data fail")
sc4s_ingestor.ingest(sc4s_events, 20)
assert "Send data fail" in caplog.messages
assert socket_mock.connect.call_count == socket_mock.close.call_count == 2
assert "Send data fail" in caplog.text
assert socket_mock.connect.call_count == 1
assert socket_mock.close.call_count == 1
Loading