Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making suricata run in socket mode for pcap processing improvement #617

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions kubernetes/33-suricata-socket.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: suricata-socket-deployment
namespace: malcolm
spec:
selector:
matchLabels:
name: suricata-socket-deployment
replicas: 1
template:
metadata:
labels:
name: suricata-socket-deployment
spec:
containers:
- name: suricata-socket-container
image: ghcr.io/idaholab/malcolm/suricata:24.12.0
imagePullPolicy: Always
stdin: false
tty: true
envFrom:
- configMapRef:
name: process-env
- configMapRef:
name: ssl-env
- configMapRef:
name: upload-common-env
- configMapRef:
name: suricata-env
- configMapRef:
name: suricata-socket-env
livenessProbe:
exec:
command:
- supervisorctl
- status
- socket-suricata
initialDelaySeconds: 120
periodSeconds: 30
timeoutSeconds: 15
successThreshold: 1
failureThreshold: 10
volumeMounts:
- mountPath: /var/local/ca-trust/configmap
name: suricata-socket-var-local-catrust-volume
- mountPath: /var/log/suricata
name: suricata-socket-suricata-logs-volume
- mountPath: "/opt/suricata/rules/configmap"
name: suricata-socket-custom-rules-volume
- mountPath: "/opt/suricata/include-configs/configmap"
name: suricata-socket-custom-configs-volume
initContainers:
- name: suricata-socket-dirinit-container
image: ghcr.io/idaholab/malcolm/dirinit:24.12.0
imagePullPolicy: Always
stdin: false
tty: true
envFrom:
- configMapRef:
name: process-env
env:
- name: PUSER_MKDIR
value: "/data/suricata-logs:socket"
volumeMounts:
- name: suricata-socket-suricata-logs-volume
mountPath: "/data/suricata-logs"
volumes:
- name: suricata-socket-var-local-catrust-volume
configMap:
name: var-local-catrust
- name: suricata-socket-suricata-logs-volume
persistentVolumeClaim:
claimName: suricata-claim
- name: suricata-socket-custom-rules-volume
configMap:
name: suricata-rules
- name: suricata-socket-custom-configs-volume
configMap:
name: suricata-configs
144 changes: 60 additions & 84 deletions shared/bin/pcap_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
import malcolm_utils
from malcolm_utils import eprint, str2bool, AtomicInt, run_process
from suricata_socket import SuricataSocketClient

from multiprocessing.pool import ThreadPool
from collections import deque
Expand Down Expand Up @@ -82,7 +83,6 @@


###################################################################################################

pdbFlagged = False
args = None
scriptName = os.path.basename(__file__)
Expand Down Expand Up @@ -403,7 +403,6 @@ def suricataFileWorker(suricataWorkerArgs):
pcapBaseDir,
autoSuricata,
forceSuricata,
suricataBin,
extraTags,
autoTag,
uploadDir,
Expand All @@ -419,104 +418,82 @@ def suricataFileWorker(suricataWorkerArgs):
suricataWorkerArgs[6],
suricataWorkerArgs[7],
suricataWorkerArgs[8],
suricataWorkerArgs[9],
)

if not logger:
logger = logging

logger.info(f"{scriptName}[{scanWorkerId}]:\tstarted")

# loop forever, or until we're told to shut down
# create a single socket client for this worker
try:
suricata = SuricataSocketClient(logger=logger, output_dir=uploadDir)
except Exception as e:
logger.error(f"Failed to create Suricata socket client: {e}")
return

while not shuttingDown:
try:
# pull an item from the queue of files that need to be processed
fileInfo = newFileQueue.popleft()
except IndexError:
time.sleep(1)
else:
if isinstance(fileInfo, dict) and (FILE_INFO_DICT_NAME in fileInfo):
# Suricata this PCAP if it's tagged "AUTOSURICATA" or if the global autoSuricata flag is turned on.
# However, skip "live" PCAPs Malcolm is capturing and rotating through for Arkime capture,
# as Suricata now does its own network capture in Malcolm standalone mode.
if (
autoSuricata
or (
(FILE_INFO_DICT_TAGS in fileInfo) and SURICATA_AUTOSURICATA_TAG in fileInfo[FILE_INFO_DICT_TAGS]
)
) and (
forceSuricata
or (
not any(
os.path.basename(fileInfo[FILE_INFO_DICT_NAME]).startswith(prefix)
for prefix in ('mnetsniff', 'mtcpdump')
)
)
):
if pcapBaseDir and os.path.isdir(pcapBaseDir):
fileInfo[FILE_INFO_DICT_NAME] = os.path.join(pcapBaseDir, fileInfo[FILE_INFO_DICT_NAME])

if os.path.isfile(fileInfo[FILE_INFO_DICT_NAME]):
# finalize tags list
fileInfo[FILE_INFO_DICT_TAGS] = (
[
x
for x in fileInfo[FILE_INFO_DICT_TAGS]
if (x not in TAGS_NOSHOW) and (not x.startswith(ZEEK_AUTOCARVE_TAG_PREFIX))
]
if ((FILE_INFO_DICT_TAGS in fileInfo) and autoTag)
else list()
)
if extraTags and isinstance(extraTags, list):
fileInfo[FILE_INFO_DICT_TAGS].extend(extraTags)
fileInfo[FILE_INFO_DICT_TAGS] = list(set(fileInfo[FILE_INFO_DICT_TAGS]))
logger.info(f"{scriptName}[{scanWorkerId}]:\t🔎\t{fileInfo}")
continue

# create a temporary work directory where suricata will be executed to generate the log files
with tempfile.TemporaryDirectory() as tmpLogDir:
if os.path.isdir(tmpLogDir):
processTimeUsec = int(round(time.time() * 1000000))

# put together suricata execution command
cmd = [
suricataBin,
'-r',
fileInfo[FILE_INFO_DICT_NAME],
'-l',
tmpLogDir,
'-c',
suricataConfig,
]

# execute suricata-capture for pcap file
retcode, output = run_process(cmd, logger=logger)

eveJsonFile = os.path.join(tmpLogDir, "eve.json")
if os.path.isfile(eveJsonFile):
# relocate the .json to be processed (do it this way instead of with a shutil.move because of
# the way Docker volume mounts work, ie. avoid "OSError: [Errno 18] Invalid cross-device link").
# we don't have to explicitly delete it since this whole directory is about to leave context and be removed
shutil.copy(
eveJsonFile,
os.path.join(
uploadDir,
f"eve-{processTimeUsec}-{scanWorkerId}-({','.join(fileInfo[FILE_INFO_DICT_TAGS])}).json",
),
)
if isinstance(fileInfo, dict) and (FILE_INFO_DICT_NAME in fileInfo):
if (
autoSuricata
or (
(FILE_INFO_DICT_TAGS in fileInfo) and SURICATA_AUTOSURICATA_TAG in fileInfo[FILE_INFO_DICT_TAGS]
)
) and (
forceSuricata
or (
not any(
os.path.basename(fileInfo[FILE_INFO_DICT_NAME]).startswith(prefix)
for prefix in ('mnetsniff', 'mtcpdump')
)
)
):
if pcapBaseDir and os.path.isdir(pcapBaseDir):
fileInfo[FILE_INFO_DICT_NAME] = os.path.join(pcapBaseDir, fileInfo[FILE_INFO_DICT_NAME])

if retcode == 0:
logger.info(
f"{scriptName}[{scanWorkerId}]:\t✅\t{os.path.basename(fileInfo[FILE_INFO_DICT_NAME])}"
)
else:
logger.warning(
f"{scriptName}[{scanWorkerId}]:\t❗\t{suricataBin} {os.path.basename(fileInfo[FILE_INFO_DICT_NAME])} returned {retcode} {output}"
)
if os.path.isfile(fileInfo[FILE_INFO_DICT_NAME]):
# finalize tags list
fileInfo[FILE_INFO_DICT_TAGS] = (
[x for x in fileInfo[FILE_INFO_DICT_TAGS] if x not in TAGS_NOSHOW]
if ((FILE_INFO_DICT_TAGS in fileInfo) and autoTag)
else list()
)
if extraTags and isinstance(extraTags, list):
fileInfo[FILE_INFO_DICT_TAGS].extend(extraTags)
fileInfo[FILE_INFO_DICT_TAGS] = list(set(fileInfo[FILE_INFO_DICT_TAGS]))
logger.info(f"{scriptName}[{scanWorkerId}]:\t🔎\t{fileInfo}")

# Create unique output directory for this PCAP
processTimeUsec = int(round(time.time() * 1000000))
output_dir = os.path.join(
uploadDir,
f"suricata-{processTimeUsec}-{scanWorkerId}"
)

try:
logger.info(f"{scriptName}[{scanWorkerId}]:\t📥\tSubmitting {os.path.basename(fileInfo[FILE_INFO_DICT_NAME])} to Suricata")
if suricata.process_pcap(fileInfo[FILE_INFO_DICT_NAME], output_dir):
logger.info(f"{scriptName}[{scanWorkerId}]:\t✅\t{os.path.basename(fileInfo[FILE_INFO_DICT_NAME])}")

# Handle the eve.json output
eveJsonFile = os.path.join(output_dir, "eve.json")
if os.path.isfile(eveJsonFile):
output_name = f"eve-{processTimeUsec}-{scanWorkerId}-({','.join(fileInfo[FILE_INFO_DICT_TAGS])}).json"
shutil.copy(eveJsonFile, os.path.join(uploadDir, output_name))
logger.info(f"{scriptName}[{scanWorkerId}]:\t📄\tGenerated {output_name}")
else:
logger.warning(
f"{scriptName}[{scanWorkerId}]:\t❗\terror creating temporary directory {tmpLogDir}"
)
logger.warning(f"{scriptName}[{scanWorkerId}]:\t⚠️\tNo eve.json generated for {os.path.basename(fileInfo[FILE_INFO_DICT_NAME])}")
else:
logger.error(f"{scriptName}[{scanWorkerId}]:\t❌\tFailed to process {os.path.basename(fileInfo[FILE_INFO_DICT_NAME])}")
except Exception as e:
logger.error(f"{scriptName}[{scanWorkerId}]:\t💥\tError processing {os.path.basename(fileInfo[FILE_INFO_DICT_NAME])}: {e}")

logger.info(f"{scriptName}[{scanWorkerId}]:\tfinished")

Expand Down Expand Up @@ -859,7 +836,6 @@ def main():
args.pcapBaseDir,
args.autoSuricata,
args.forceSuricata,
args.executable,
args.extraTags,
args.autoTag,
args.suricataUploadDir,
Expand Down
Loading