Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Dockerfile.uf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ARG SPLUNK_VERSION=latest
FROM splunk/universalforwarder:$SPLUNK_VERSION
ARG SPLUNK_VERSION=latest
ARG SPLUNK_APP_ID=TA_UNKNOWN
ARG SPLUNK_APP_PACKAGE=package
RUN echo ${SPLUNK_VERSION} $SPLUNK_APP_PACKAGE
COPY ${SPLUNK_APP_PACKAGE} /opt/splunkforwarder/etc/apps/${SPLUNK_APP_ID}
22 changes: 21 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,27 @@ services:
- SPLUNK_PASSWORD=${SPLUNK_PASSWORD}
- SPLUNK_START_ARGS=--accept-license
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}


uf:
build:
context: .
dockerfile: Dockerfile.uf
args:
SPLUNK_APP_ID: ${SPLUNK_APP_ID}
SPLUNK_APP_PACKAGE: ${SPLUNK_APP_PACKAGE}
SPLUNK_VERSION: ${SPLUNK_VERSION}
hostname: uf
ports:
- "9997"
- "8089"
links:
- splunk
environment:
- SPLUNK_PASSWORD=Chang3d!
- SPLUNK_START_ARGS=--accept-license
volumes:
- ${CURRENT_DIR}/uf_files:${CURRENT_DIR}/uf_files

volumes:
splunk-sc4s-var:
external: false
6 changes: 6 additions & 0 deletions docs/api_reference/event_ingestion.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,11 @@ HEC Raw Ingestor
SC4S Event Ingestor
~~~~~~~~~~~~~~~~~~~~
.. automodule:: standard_lib.event_ingestors.sc4s_event_ingestor
:members:
:show-inheritance:

File Monitor Ingestor
~~~~~~~~~~~~~~~~~~~~~
.. automodule:: standard_lib.event_ingestors.file_monitor_ingestor
:members:
:show-inheritance:
6 changes: 5 additions & 1 deletion docs/sample_generator.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ host_type = plugin | event
* If the value is plugin, the plugin will generate host with format of "stanza_{count}" to uniquely identify the events.
* If the value is event, the host field should be provided for a token using "token.<n>.field = host".

input_type = modinput | scripted_input | syslog_tcp | file_monitor | windows_input | default
input_type = modinput | scripted_input | syslog_tcp | file_monitor | windows_input | uf_file_monitor | default
* The input_type used in addon to ingest data of a sourcetype used in stanza.
* The way with which the sample data is ingested in Splunk depends on Splunk. The most similar ingesting approach is used for each input_type to get accurate index-time testing.
* In input_type=uf_file_monitor, universal forwarder will use file monitor to read event and then it will send data to indexer.
* For example, in an Add-on, a sourcetype "alert" is ingested through syslog in live environment, provide input_type=syslog_tcp.

.. warning::
uf_file_monitor input_type will only work with splunk-type=docker.

index = <index>
* The index used to ingest the data.
Expand Down
104 changes: 102 additions & 2 deletions pytest_splunk_addon/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import logging
import os
import shutil
from time import sleep
import json
import pytest
Expand Down Expand Up @@ -244,6 +245,34 @@ def pytest_addoption(parser):
dest="ignore_addon_errors",
help=("Path to file where list of addon related errors are suppressed."),
)
group.addoption(
"--splunk-uf-host",
action="store",
dest="splunk_uf_host",
default="uf",
help="Address of Universal Forwarder Server.",
)
group.addoption(
"--splunk-uf-port",
action="store",
dest="splunk_uf_port",
default="8089",
help="Universal Forwarder Management port. default is 8089.",
)
group.addoption(
"--splunk-uf-user",
action="store",
dest="splunk_uf_user",
default="admin",
help="Universal Forwarder login user.",
)
group.addoption(
"--splunk-uf-password",
action="store",
dest="splunk_uf_password",
default="Chang3d!",
help="Password of the Universal Forwarder user",
)


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -321,7 +350,7 @@ def ignore_internal_errors(request):


@pytest.fixture(scope="session")
def splunk(request):
def splunk(request, file_system_prerequisite):
"""
This fixture based on the passed option will provide a real fixture
for external or docker Splunk
Expand Down Expand Up @@ -380,6 +409,61 @@ def sc4s(request):

yield sc4s

@pytest.fixture(scope="session")
def uf(request):
"""
This fixture based on the passed option will provide a real fixture
for external or docker uf configuration

Returns:
dict: Details of uf which includes host, port, username and password
"""
if request.config.getoption("splunk_type") == "external":
request.fixturenames.append("uf_external")
uf = request.getfixturevalue("uf_external")
elif request.config.getoption("splunk_type") == "docker":
request.fixturenames.append("uf_docker")
uf = request.getfixturevalue("uf_docker")
else:
raise Exception
uf["uf_username"] = request.config.getoption("splunk_uf_user")
uf["uf_password"] = request.config.getoption("splunk_uf_password")
for _ in range(RESPONSIVE_SPLUNK_TIMEOUT):
if is_responsive_splunk(uf):
break
sleep(1)
yield uf

@pytest.fixture(scope="session")
def uf_docker(docker_services, tmp_path_factory, worker_id):
"""
Provides IP of the uf server and management port based on pytest-args(splunk_type)
"""
LOGGER.info("Starting docker_service=uf")
os.environ["CURRENT_DIR"] = os.getcwd()
if worker_id:
# get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent
fn = root_tmp_dir / "pytest_docker"
with FileLock(str(fn) + ".lock"):
docker_services.start("uf")
uf_info = {
"uf_host": docker_services.docker_ip,
"uf_port": docker_services.port_for("uf", 8089),
}
return uf_info

@pytest.fixture(scope="session")
def uf_external(request):
"""
Provides IP of the uf server and management port based on pytest-args(splunk_type)
"""
uf_info = {
"uf_host": request.config.getoption("splunk_uf_host"),
"uf_port": request.config.getoption("splunk_uf_port"),
}
return uf_info


@pytest.fixture(scope="session")
def splunk_docker(
Expand Down Expand Up @@ -544,7 +628,7 @@ def splunk_web_uri(request, splunk):


@pytest.fixture(scope="session")
def splunk_ingest_data(request, splunk_hec_uri, sc4s, splunk_events_cleanup):
def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup):
"""
Generates events for the add-on and ingests into Splunk.
The ingestion can be done using the following methods:
Expand All @@ -570,6 +654,10 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s, splunk_events_cleanup):
config_path = request.config.getoption("splunk_data_generator")

ingest_meta_data = {
"uf_host": uf.get("uf_host"),
"uf_port": uf.get("uf_port"),
"uf_username": uf.get("uf_username"),
"uf_password": uf.get("uf_password"),
"session_headers": splunk_hec_uri[0].headers,
"splunk_hec_uri": splunk_hec_uri[1],
"sc4s_host": sc4s[0], # for sc4s
Expand Down Expand Up @@ -608,6 +696,18 @@ def splunk_events_cleanup(request, splunk_search_util):
else:
LOGGER.info("Events cleanup was disabled.")

@pytest.fixture(scope="session")
def file_system_prerequisite():
"""
File system prerequisite before running tests.
Creating uf_files directory to write tokenized events for uf_file_monitor input.
"""
UF_FILE_MONTOR_DIR = "uf_files"
monitor_dir = os.path.join(os.getcwd(), UF_FILE_MONTOR_DIR)
if os.path.exists(monitor_dir):
shutil.rmtree(UF_FILE_MONTOR_DIR, ignore_errors=True)
os.mkdir(monitor_dir)

def is_responsive_splunk(splunk):
"""
Verify if the management port of Splunk is responsive or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .hec_metric_ingestor import HECMetricEventIngestor
from .hec_raw_ingestor import HECRawEventIngestor
from .sc4s_event_ingestor import SC4SEventIngestor
from .file_monitor_ingestor import FileMonitorEventIngestor
from .ingestor_helper import IngestorHelper
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from .base_event_ingestor import EventIngestor
import requests
import logging
import os
from time import sleep
from requests.exceptions import ConnectionError

LOGGER = logging.getLogger("pytest-splunk-addon")
MONITOR_DIR = "uf_files"

class FileMonitorEventIngestor(EventIngestor):
"""
Class to ingest event via File monitor
This ingestor will only work if splunk_type is docker and container of universal forwarder is linked with container
of splunk instance as 'splunk' service.

The format for required_configs is::

{
uf_host: Host of universal forwarder
uf_port: Management port of universal forwarder
uf_username: Name of user for universal forwarder
uf_password: Password of universal forwarder
}

Args:
required_configs (dict): Dictionary containing information about universal forwarder

"""
def __init__(self, required_configs):
self.uf_host = required_configs.get("uf_host")
self.uf_port = required_configs.get("uf_port")
self.uf_username = required_configs.get("uf_username")
self.uf_password = required_configs.get("uf_password")
# Container of universal forwarder is linked with splunk instance.
# So using splunk_host as splunk and port 9997 directly.
self.splunk_host = "splunk"
self.splunk_s2s_port = "9997"
self.uf_rest_uri = "https://{}:{}".format(self.uf_host, self.uf_port)
self.outputs_endpoint = "{}/services/data/outputs/tcp/group".format(self.uf_rest_uri)
self.inputs_endpoint = "{}/servicesNS/nobody/search/data/inputs/monitor".format(self.uf_rest_uri)

def ingest(self, events, thread_count):
"""
Ingests data into splunk via file monitor.
Args:
events (list): List of events (SampleEvent) to be ingested
"""
self.create_output_conf()
for each_event in events:
self.create_event_file(each_event)
sleep(10)
self.create_inputs_stanza(each_event)

def create_output_conf(self):
"""
Create stanza in outputs.conf file of universal forwarder to send on splunk(indexer).
"""
tcp_out_dict = {"name":"uf_monitor", "servers":"{}:{}".format(self.splunk_host, self.splunk_s2s_port)}
LOGGER.info("Making rest call to create stanza in outputs.conf file with following endpoint : {}".format(self.outputs_endpoint))
LOGGER.debug("Creating following stanza in output.conf : {}".format(tcp_out_dict))
try:
response = requests.post(self.outputs_endpoint, tcp_out_dict, auth=(self.uf_username, self.uf_password), verify=False)
if response.status_code not in (200, 201):
LOGGER.warning("Unable to create stanza in outputs.conf\nStatus code: {} \nReason: {} \ntext:{}".format(response.status_code, response.reason, response.text))
except ConnectionError as e:
LOGGER.error("Unable to connect to Universal forwarder, {}".format(e))

def create_event_file(self, event):
"""
Write each tokenized event in files with host name as name of file. The host of all events will be unique.

Args:
event (SampleEvent): Instance containing event info
"""
try:
with open(self.get_file_path(event), "w+") as fp:
LOGGER.info("Writing events file for host={}".format(event.metadata.get("host")))
fp.write(event.event)
LOGGER.debug("Wrote tokenized events file on path : {}".format(self.get_file_path(event)))
except Exception as e:
LOGGER.warning("Unable to create event file for host : {}, Reason : {}".format(event.metadata.get("host"), e))

def create_inputs_stanza(self, event):
"""
Create stanza in inputs.conf on universal forwarder for each tokenized event.

Args:
event (SampleEvent): Instance containing event info
"""
file_path = self.get_file_path(event)
sourcetype = event.metadata.get("sourcetype")
if not sourcetype:
sourcetype = event.metadata.get("sourcetype_to_search", "pytest_splunk_addon")
stanza = {
"name": file_path,
"sourcetype": sourcetype,
"index": event.metadata.get("index", "main"),
"disabled": False,
"crc-salt": "<SOURCE>"
}
if event.metadata.get("host_type") in ("plugin"):
stanza["host"] = event.metadata.get("host")
if event.metadata.get("source"):
stanza["rename-source"] = event.metadata.get("source")
LOGGER.info("Making rest call to create stanza in inputs.conf file with following endpoint : {}".format(self.inputs_endpoint))
LOGGER.debug("Creating following stanza in inputs.conf : {}".format(stanza))
try:
response = requests.post(self.inputs_endpoint, stanza, auth=(self.uf_username, self.uf_password), verify=False)
if response.status_code not in (200, 201):
LOGGER.warning("Unable to add stanza in inputs.conf\nStatus code: {} \nReason: {} \ntext:{}".format(response.status_code, response.reason, response.text))
except ConnectionError as e:
LOGGER.error("Unable to connect to Universal forwarder, {}".format(e))

def get_file_path(self, event):
"""
Returns absolute path for tokenized events.

Args:
event (SampleEvent): Instance containing event info
"""
return "{}/{}/{}".format(os.getcwd(), MONITOR_DIR, event.metadata.get("host"))
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __ingest(self, event, params):
try:
LOGGER.info("Making a HEC raw endpoint request with the following params:\nhec_uri:{}\nheaders:{}".format(
str(self.hec_uri), str(self.session_headers)))
LOGGER.debug("Creating the following sample event to be ingested via HEC RAW endpoint:\nEvents: {}\nParams:".format(
LOGGER.debug("Creating the following sample event to be ingested via HEC RAW endpoint:\nEvents: {}\nParams:{}".format(
str(event),str(params)))
response = requests.post(
"{}/{}".format(self.hec_uri, "raw"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
HECRawEventIngestor,
HECMetricEventIngestor,
SC4SEventIngestor,
FileMonitorEventIngestor,
)
import logging
from ..sample_generation import SampleXdistGenerator
Expand All @@ -21,6 +22,7 @@ def get_event_ingestor(cls, input_type, ingest_meta_data):
"modinput": HECEventIngestor,
"windows_input": HECEventIngestor,
"file_monitor": HECRawEventIngestor,
"uf_file_monitor": FileMonitorEventIngestor,
"scripted_input": HECRawEventIngestor,
"hec_metric": HECMetricEventIngestor,
"syslog_tcp": SC4SEventIngestor,
Expand Down Expand Up @@ -49,7 +51,7 @@ def ingest_events(cls, ingest_meta_data, addon_path, config_path, thread_count,
ingestor_dict = dict()
for event in tokenized_events:
input_type = event.metadata.get("input_type")
if input_type in ["modinput", "windows_input", "syslog_tcp", "syslog_udp"]:
if input_type in ["modinput", "windows_input", "syslog_tcp", "syslog_udp", "uf_file_monitor"]:
event.event = event.event.encode("utf-8").decode()
else:
event.event = event.event.encode("utf-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def _parse_meta(self, eventgen_params):
"modinput",
"windows_input",
"file_monitor",
"uf_file_monitor",
"scripted_input",
"syslog_tcp",
"syslog_udp",
Expand Down Expand Up @@ -206,6 +207,7 @@ def _get_raw_sample(self):
)
elif self.input_type in [
"file_monitor",
"uf_file_monitor",
"scripted_input",
"syslog_tcp",
"syslog_udp",
Expand Down
Loading