-
Notifications
You must be signed in to change notification settings - Fork 185
/
fragattack.py
executable file
·277 lines (230 loc) · 12.3 KB
/
fragattack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#!/usr/bin/env python3
# Copyright (c) 2020, Mathy Vanhoef <[email protected]>
#
# This code may be distributed under the terms of the BSD license.
# See README for more details.
# Note that tests_*.py files are imported automatically
import glob, importlib, argparse
from fraginternals import *
# ----------------------------------- Main Function -----------------------------------
def cleanup():
daemon.stop()
def char2trigger(c):
if c == 'S': return Action.StartAuth
elif c == 'B': return Action.BeforeAuth
elif c == 'A': return Action.AfterAuth
elif c == 'C': return Action.Connected
else: raise Exception("Unknown trigger character " + c)
def stract2action(stract):
"""Parse a single trigger and action pair"""
if len(stract) == 1:
trigger = Action.Connected
c = stract[0]
else:
trigger = char2trigger(stract[0])
c = stract[1]
if c == 'I':
return Action(trigger, action=Action.GetIp)
elif c == 'F':
return Action(trigger, action=Action.Rekey)
elif c == 'R':
return Action(trigger, action=Action.Reconnect)
elif c == 'P':
return Action(trigger, enc=False)
elif c == 'E':
return Action(trigger, enc=True)
elif c == 'D':
# Note: the trigger condition of MetaDrop is ignored
return Action(meta_action=Action.MetaDrop)
raise Exception("Unrecognized action")
def str2actions(stractions, default):
"""Parse a list of trigger and action pairs"""
if stractions != None:
return [stract2action(stract) for stract in stractions.split(",")]
else:
return default
def prepare_tests(opt):
# --------------- Main Tests ---------------
stractions = opt.actions
if opt.testname == "ping":
actions = str2actions(stractions,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True)])
test = PingTest(REQ_ICMP, actions, opt=opt)
elif opt.testname == "ping-before":
test = PingBefore(REQ_ICMP, opt)
elif opt.testname == "ping-frag-sep":
# Check if we can send frames in between fragments. The seperator by default uses a different
# QoS TID. The second fragment of the ping request will NOT have an incremental PN compared
# to the first fragment. So this test fails if the receivers checks for consecutive PNs. By adding
# `--pn-per-qos` both ping fragments will use consecutive PNs, but this will only be accepted if
# the receiver checks PNs for each QoS TID separately.
# By overriding the TID to 2 you can check whether fragments are cached for multiple sequence numbers
# in one TID (since all fragments use the same TID but the ping fragments use a different sequence
# number compared to the seperator).
tid = 1 if stractions == None else int(stractions)
separator = Dot11(type="Data", subtype=8, SC=(33 << 4) | 0)/Dot11QoS(TID=tid)/LLC()/SNAP()
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True)],
separate_with=separator, opt=opt)
elif opt.testname in ["eapol-inject", "eapol-inject-large"]:
large = opt.testname.endswith("-large")
test = ForwardTest(eapol=True, dst=stractions, large=large)
elif opt.testname in ["eapol-amsdu", "eapol-amsdu-bad"]:
freebsd = opt.testname.endswith("-bad")
actions = str2actions(stractions,
[Action(Action.StartAuth, enc=False),
Action(Action.StartAuth, enc=False)])
test = EapolAmsduTest(REQ_ICMP, actions, freebsd, opt)
elif opt.testname == "linux-plain":
decoy_tid = None if stractions == None else int(stractions)
test = LinuxTest(REQ_ICMP, decoy_tid)
elif opt.testname in ["amsdu-inject", "amsdu-inject-bad"]:
malformed = opt.testname.endswith("-bad")
test = AmsduInject(REQ_ICMP, malformed)
elif opt.testname == "eapfrag":
actions = str2actions(stractions,
[Action(Action.StartAuth, enc=False),
Action(Action.StartAuth, enc=False)])
test = BcastEapFragTest(REQ_ICMP, actions, opt.bcast_dst)
elif opt.testname == "wep-mixed-key":
log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange")
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
# On a WEP key rotation we get a Connected event. So wait for that.
Action(Action.AfterAuth, enc=True),
])
# --------------- Research Tests ---------------
elif opt.testname == "forward":
test = ForwardTest(eapol=False, dst=stractions)
elif opt.testname == "qca-test":
test = QcaDriverTest()
elif opt.testname == "qca-split":
test = QcaTestSplit()
elif opt.testname == "qca-rekey":
test = QcaDriverRekey()
# No valid test ID/name was given
else: return None
# If requested, override delay and inc_pn parameters in the test.
test.set_general_options(opt.delay, opt.inc_pn, opt.pre_test_delay)
# If requested, override the ptype
if opt.ptype != None:
if not hasattr(test, "ptype"):
log(WARNING, "Cannot override request type of the selected test.")
quit(1)
test.ptype = opt.ptype
return test
def args2ptype(args):
# Only one of these should be given
if args.arp + args.dhcp + args.icmp + args.ipv6 > 1:
log(STATUS, "You cannot combine --arp, --dhcp, --ipv6, or --icmp. Please only supply one of them.")
quit(1)
if args.arp: return REQ_ARP
if args.dhcp: return REQ_DHCP
if args.icmp: return REQ_ICMP
if args.ipv6: return REQ_ICMPv6_RA
if args.udp: return REQ_UDP
return None
def args2msdu(args):
# Only one of these should be given
if args.amsdu + args.amsdu_fake > 1:
log(STATUS, "You cannot combine --amsdu and --amsdu-fake. Please only supply one of them.")
quit(1)
if args.amsdu: return 1
if args.amsdu_fake: return 2
return None
def get_expected_scapy_ver():
for line in open("requirements.txt"):
if line.startswith("scapy=="):
return line[7:].strip()
return None
if __name__ == "__main__":
log(STATUS, "This is FragAttack version {}.".format(FRAGVERSION))
parser = argparse.ArgumentParser(description="Test for fragmentation vulnerabilities (version {}).".format(FRAGVERSION))
parser.add_argument('iface', help="Interface to use for the tests.")
parser.add_argument('testname', help="Name or identifier of the test to run.")
parser.add_argument('actions', nargs='?', help="Optional textual descriptions of actions")
parser.add_argument('--inject', default=None, help="Interface to use to inject frames.")
parser.add_argument('--inject-test', default=None, help="Use given interface to test injection through monitor interface.")
parser.add_argument('--inject-test-postauth', default=None, help="Same as --inject-test but run the test after authenticating.")
parser.add_argument('--hwsim', default=None, help="Use provided interface in monitor mode, and simulate AP/client through hwsim.")
parser.add_argument('--ip', help="IP we as a sender should use.")
parser.add_argument('--peerip', help="IP of the device we will test.")
parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.")
parser.add_argument('--debug', type=int, default=0, help="Debug output level.")
parser.add_argument('--delay', type=float, default=0, help="Delay between fragments in certain tests.")
parser.add_argument('--inc-pn', type=int, help="To test non-sequential packet number in fragments.")
parser.add_argument('--amsdu', default=False, action='store_true', help="Encapsulate pings in an A-MSDU frame.")
parser.add_argument('--amsdu-fake', default=False, action='store_true', help="Set A-MSDU flag but include normal payload.")
parser.add_argument('--amsdu-spp', '--amsdu-ssp', default=False, action='store_true', help="Force authentication of QoS A-MSDU flag.")
parser.add_argument('--arp', default=False, action='store_true', help="Override default request with ARP request.")
parser.add_argument('--dhcp', default=False, action='store_true', help="Override default request with DHCP discover.")
parser.add_argument('--icmp', default=False, action='store_true', help="Override default request with ICMP ping request.")
parser.add_argument('--ipv6', default=False, action='store_true', help="Override default request with ICMPv6 router advertisement.")
# TODO: Test the --udp option more
parser.add_argument('--udp', type=int, default=None, help="Override default request with UDP packet to the given port.")
parser.add_argument('--no-dhcp', default=False, action='store_true', help="Do not reply to DHCP requests as an AP.")
parser.add_argument('--icmp-size', type=int, default=None, help="Size of the ICMP ping request to send.")
parser.add_argument('--padding', type=int, default=None, help="Add padding data to ARP/DHCP/ICMP requests.")
parser.add_argument('--rekey-request', default=False, action='store_true', help="Actively request PTK rekey as client.")
parser.add_argument('--rekey-plaintext', default=False, action='store_true', help="Do PTK rekey with plaintext EAPOL frames.")
parser.add_argument('--rekey-early-install', default=False, action='store_true', help="Install PTK after sending Msg3 during rekey.")
parser.add_argument('--full-reconnect', default=False, action='store_true', help="Reconnect by deauthenticating first.")
parser.add_argument('--bcast-ra', default=False, action='store_true', help="Send pings using broadcast *receiver* address (= addr1).")
parser.add_argument('--bcast-dst', default=False, action='store_true', help="Send pings using broadcast *destination* when to AP ().")
# TODO: Properly test the --bad-mic option
parser.add_argument('--bad-mic', default=False, action='store_true', help="Send pings using an invalid authentication tag.")
parser.add_argument('--pn-per-qos', default=False, action='store_true', help="Use separate Tx packet counter for each QoS TID.")
parser.add_argument('--no-qos', default=False, action='store_true', help="Don't send QoS data frames (experimental - may break some tests).")
parser.add_argument('--freebsd-cache', default=False, action='store_true', help="Sent EAP(OL) frames as (malformed) broadcast EAPOL/A-MSDUs.")
parser.add_argument('--connected-delay', type=float, default=1, help="Second to wait after AfterAuth before triggering Connected event")
parser.add_argument('--pre-test-delay', type=int, default=0, help="Delay before launching the test")
parser.add_argument('--to-self', default=False, action='store_true', help="Send ARP/DHCP/ICMP with same src and dst MAC address.")
parser.add_argument('--no-drivercheck', default=False, action='store_true', help="Don't check if patched drivers are being used.")
parser.add_argument('--stay-up', default=False, action='store_true', help="Don't quit when test has finished.")
options = parser.parse_args()
# Check if we're using the expected scapy version
expected_ver = get_expected_scapy_ver()
if expected_ver!= None and scapy.VERSION != expected_ver:
log(WARNING, "You are using scapy version {} instead of the expected {}".format(scapy.VERSION, expected_ver))
log(WARNING, "Are you executing the script from inside the correct python virtual environment?")
# Default value for options that should not be command line parameters
options.inject_mf_workaround = False
# Sanity check and convert some arguments to more usable form
options.ptype = args2ptype(options)
options.as_msdu = args2msdu(options)
if options.pn_per_qos and options.no_qos:
log(STATUS, "Cannot specify option --pn-per-qos and --no-qos simultaneously.")
quit(1)
# Make the --inject-test-postauth flags easier to check
if options.inject_test_postauth != None:
options.inject_test = options.inject_test_postauth
options.inject_test_postauth = True
else:
options.inject_test_postauth = False
# Dynamically import tests depending on their availability in the directory,
# but ignore private variables. See https://stackoverflow.com/a/44492879
for test in glob("tests_*.py"):
module = importlib.import_module(test[:-3])
globals().update(
{n: getattr(module, n) for n in module.__all__} if hasattr(module, '__all__')
else
{k: v for (k, v) in module.__dict__.items() if not k.startswith('_')
})
# Construct the test
options.test = prepare_tests(options)
if options.test == None:
log(STATUS, "Test name '{}' not recognized. Specify a valid test case.".format(options.testname))
quit(1)
# Parse remaining options
change_log_level(-options.debug)
# Now start the tests --- TODO: Inject Deauths before connecting with client...
if options.ap:
daemon = Authenticator(options)
else:
daemon = Supplicant(options)
atexit.register(cleanup)
daemon.run()