-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathblocklist.py
90 lines (77 loc) · 2.69 KB
/
blocklist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
import sys
import logging
import urllib2
import socket
import ssl
# Inserts our own modules path first in the list
# fix for bug #343821
sys.path.insert(1, "/usr/share/fail2ban")
# Now we can import our modules
from client.csocket import CSocket
logSys = logging.getLogger("fail2ban.client")
class Blocklistimporter:
def __init__(self):
self.__conf = dict()
self.__conf["socket"] = "/var/run/fail2ban/fail2ban.sock"
self.__conf["url"] = "https://api.blocklist.de/getlast.php?time=300"
self.__conf["logfile"] = "/etc/fail2ban/empty.log"
self.__conf["loglevel"] = logging.ERROR
def die(self, message="", code=0):
if message:
logSys.error(message)
sys.exit(code)
def fetch_list(self):
logSys.debug("Fetching IPs")
try:
listcontent = urllib2.urlopen(self.__conf["url"], timeout=10).readlines()
except urllib2.HTTPError as e:
self.die("Cannot fetch URL: %s" % e, 1)
except urllib2.URLError as e:
logSys.debug("Cannot fetch URL: %s", e)
self.die("", 0)
except ssl.SSLError as e:
logSys.debug("Cannot fetch URL: %s", e)
self.die("", 0)
logSys.debug("Got IPs")
return listcontent
def block_ip(self, ip):
try:
client = CSocket(self.__conf["socket"])
command = ["set", "blocklist", "banip"]
logSys.debug("Blocking %s", ip)
ret = client.send(command + [ip])
if ret[0] == 0:
logSys.debug("OK : " + str(ret[1]))
else:
print ret
logSys.debug("NOK: %s = %s", ret[1].args, ret[1])
raise Exception
except socket.error:
logSys.error("Unable to contact server. Is it running?")
raise
except Exception, e:
logSys.error(e)
raise
def start(self):
logSys.setLevel(self.__conf["loglevel"])
stdout = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(levelname)-6s %(message)s')
stdout.setFormatter(formatter)
logSys.addHandler(stdout)
listcontent = self.fetch_list()
for ip in listcontent:
try:
self.block_ip(ip.rstrip())
except Exception, e:
logSys.debug("Got exception: %s" % str(e))
return False
logSys.debug("Touching log to ban new IPs")
open(self.__conf["logfile"], 'w').close()
return True
if __name__ == "__main__":
importer = Blocklistimporter()
if importer.start():
sys.exit(0)
else:
sys.exit(-1)