Skip to content

Commit 1400d1b

Browse files
committed
Created duckdb branch for duckdb dissector PoC
1 parent 5aae843 commit 1400d1b

File tree

7 files changed

+1005
-486
lines changed

7 files changed

+1005
-486
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ fingerprints/
1717
*.log
1818
*.conf
1919
*.ini
20-
*.pcap
20+
*.pcap
21+
parquet/
22+
duckdb/

requirements.txt

+4-17
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,4 @@
1-
attrs==21.4.0
2-
certifi==2023.7.22
3-
charset-normalizer==2.0.12
4-
Deprecated==1.2.13
5-
idna==3.3
6-
jsonschema==4.4.0
7-
netaddr==0.8.0
8-
numpy==1.22.3
9-
pandas==1.4.1
10-
pymisp==2.4.155.1
11-
pyrsistent==0.18.1
12-
python-dateutil==2.8.2
13-
pytz==2021.3
14-
requests==2.31.0
15-
six==1.16.0
16-
urllib3==1.26.17
17-
wrapt==1.14.0
1+
duckdb
2+
pandas
3+
pyarrow
4+
pymisp

src/analysis.py

+164-145
Large diffs are not rendered by default.

src/attack.py

+166-80
Large diffs are not rendered by default.

src/main.py

+75-17
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1+
import ipaddress
12
import os
2-
import sys
3-
from typing import List
3+
import time
4+
import duckdb
5+
import pprint
46

5-
import pandas as pd
67
from pathlib import Path
78
from argparse import ArgumentParser, Namespace
8-
from netaddr import IPNetwork
99

1010
from logger import LOGGER
11-
from util import parse_config, print_logo, determine_filetype
1211
from misp import MispInstance
13-
from reader import read_file
12+
from reader import read_files
1413
from attack import Attack, Fingerprint
1514
from analysis import infer_target, extract_attack_vectors, compute_summary
15+
from util import parquet_files_to_view, FileType, determine_filetype, determine_source_filetype, \
16+
print_logo, parse_config
1617

1718
DOCKERIZED: bool = 'DISSECTOR_DOCKER' in os.environ
1819

@@ -30,8 +31,8 @@ def parse_arguments() -> Namespace:
3031
parser.add_argument('--nprocesses', dest='n', type=int, help='Number of processes used to concurrently read PCAPs '
3132
'(default is the number of CPU cores)',
3233
default=os.cpu_count())
33-
parser.add_argument('--target', type=IPNetwork, nargs='+', dest='targets',
34-
help='Optional: target IP address or subnet of this attack')
34+
parser.add_argument('--target', type=str, dest='target',
35+
help='Optional: target IP address of this attack (subnet currently unsupported)')
3536
parser.add_argument('--ddosdb', action='store_true', help='Optional: directly upload fingerprint to DDoS-DB')
3637
parser.add_argument('--misp', action='store_true', help='Optional: directly upload fingerprint to MISP')
3738
parser.add_argument('--noverify', action='store_true', help="Optional: Don't verify TLS certificates")
@@ -42,27 +43,82 @@ def parse_arguments() -> Namespace:
4243

4344

4445
if __name__ == '__main__':
46+
pp = pprint.PrettyPrinter(indent=4)
47+
4548
print_logo()
49+
4650
args = parse_arguments()
4751
if args.debug:
4852
LOGGER.setLevel('DEBUG')
4953

54+
if args.target:
55+
try:
56+
test = ipaddress.ip_address(args.target)
57+
except Exception as e:
58+
LOGGER.info("Malformed target specified")
59+
exit(2)
60+
5061
filetype = determine_filetype(args.files)
51-
# Read the file(s) into a dataframe
52-
data: pd.DataFrame = pd.concat([read_file(f, filetype=filetype, nr_processes=args.n) for f in args.files])
53-
attack = Attack(data, filetype) # Construct an Attack object with the DDoS data
54-
target: List[IPNetwork] = args.targets or [infer_target(attack)] # Infer attack target if not passed as argument
55-
attack.filter_data_on_target(target=target) # Keep only the traffic sent to the target
56-
attack_vectors = extract_attack_vectors(attack) # Extract the attack vectors from the attack
62+
63+
start = time.time()
64+
if filetype == FileType.PQT:
65+
# If parquet files: check all contain data from either pcap or flow, but not both
66+
LOGGER.debug("Determine source file type in parquet files")
67+
fts = [determine_source_filetype(f) for f in args.files]
68+
ft = set(fts)
69+
if len(ft) > 1:
70+
LOGGER.error("More than one source file type in these parquet files")
71+
exit(1)
72+
filetype = list(ft)[0]
73+
LOGGER.debug(f"Original file type is {filetype.value}")
74+
pqt_files = [str(f) for f in args.files]
75+
else:
76+
# Convert the file(s) to parquet
77+
dst_dir = "/tmp" if DOCKERIZED else f"{os.getcwd()}/parquet"
78+
pqt_files = read_files(args.files, dst_dir=dst_dir, filetype=filetype, nr_processes=args.n)
79+
duration = time.time()-start
80+
LOGGER.info(f"Conversion took {duration:.2f}s")
81+
LOGGER.debug(pqt_files)
82+
83+
if args.debug and not DOCKERIZED:
84+
# Store duckdb on disk in debug mode if not dockerized
85+
os.makedirs('duckdb', exist_ok=True)
86+
db_name = "duckdb/"+os.path.basename(args.files[0])+".duckdb"
87+
LOGGER.debug(f"Basename: {db_name}")
88+
if os.path.exists(db_name):
89+
os.remove(db_name)
90+
db = duckdb.connect(db_name)
91+
else:
92+
# Otherwise just an in-memory database
93+
db = duckdb.connect()
94+
95+
# Explicitly set number of threads
96+
db.execute(f"SET threads={args.n}")
97+
98+
start = time.time()
99+
100+
view = parquet_files_to_view(db, pqt_files, filetype)
101+
attack = Attack(db, view, filetype)
102+
103+
target = args.target or infer_target(attack) # Infer attack target if not passed as argument
104+
LOGGER.debug(target)
105+
if not target:
106+
LOGGER.info("No attack targets found")
107+
exit(0)
108+
109+
attack.filter_data_on_target(target)
110+
attack_vectors = extract_attack_vectors(attack)
57111
if len(attack_vectors) == 0:
58112
LOGGER.critical(f'No attack vectors found in traffic capture.')
59-
sys.exit(1)
113+
exit(1)
60114
summary = compute_summary(attack_vectors) # Compute summary statistics of the attack (e.g. average bps / Bpp / pps)
61-
# Generate fingeperint
115+
# Generate fingerprint
62116
fingerprint = Fingerprint(target=target, summary=summary, attack_vectors=attack_vectors,
63117
show_target=args.show_target)
64118

65-
if args.summary: # If the user wants a preview, show the finerprint in the terminal
119+
duration = time.time() - start
120+
LOGGER.info(f"Analysis took {duration:.2f}s")
121+
if args.summary: # If the user wants a preview, show the fingerprint in the terminal
66122
LOGGER.info(str(fingerprint))
67123

68124
args.output.mkdir(parents=True, exist_ok=True)
@@ -77,3 +133,5 @@ def parse_arguments() -> Namespace:
77133
publish=conf['publish'])
78134
if misp_instance.misp is not None:
79135
fingerprint.upload_to_misp(misp_instance)
136+
137+
db.close()

0 commit comments

Comments
 (0)