-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpy-learn-spam.py
186 lines (148 loc) · 5.87 KB
/
py-learn-spam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3
"""
Python 3 script to read imap folders and pipe mails into rspamd
- depending on imap folder they should be learned as ham or spam
- after rspamd training, they should me moved to an appropriate "done" folder
This software is licensed under the GNU Public License GPLv3. See LICENSE
file.
"""
import configparser
import logging
import re
import subprocess
import time
from imaplib import IMAP4_SSL, IMAP4
CONFIGFILE = '/etc/py-learn-spam.ini'
rspamd_success_string = (
"^success = true;$|^error.*has been already learned as.*$|^error = \"<.*> "
"is skipped for bayes classifier: already in class (h|sp)am.*\";$"
"|^error = \"all learn conditions denied learning (h|sp)am in default classifier*\";$"
)
rspamd_success_pattern = re.compile(rspamd_success_string)
def query_folder(
host, use_ssl, wait, user, passwd, learn, done, task, command, rhost
):
"""
queries all mails in folder named learn, passes this to rspamd
and moves mail info done folder
host: ip address or name of imapt host
wait: time in seconds to wait between two messages
user, passwd: imap credentials
learn: imap folder to read from
done: imap folder to move to after learn success
task: one of ['ham'|'spam']
command: executable to run for learning spam
"""
if use_ssl in ['NO', 'TLS']:
con = IMAP4(host)
if use_ssl == 'TLS':
con.starttls()
else:
con = IMAP4_SSL(host)
con.login(user, passwd)
# my IMAP server does not accept 'enable' if not logged in
try:
con.enable("UTF8=ACCEPT")
except IMAP4.error as e:
logging.warning('IMAP4 error: {}'.format(e))
# get number of messages to be learned
try:
typ, data = con.select(learn, readonly=False)
num_msgs = int(data[0])
logging.info("%d Messages in '%s'", num_msgs, learn)
except IMAP4.error as e:
logging.warning('IMAP4 error: {}'.format(e))
return
# get message ids as list
try:
typ, message_ids = con.search(None, 'ALL')
except IMAP4.error as e:
logging.warning('IMAP4 error: {}'.format(e))
return
# iterate over all messages in mailbox
for num in message_ids[0].split():
message = b"" # empty raw message
typ, mesg_head = con.fetch(num, '(BODY.PEEK[HEADER])')
for response_part in mesg_head:
if isinstance(response_part, tuple):
message += response_part[1] # add header lines
typ, mesg_body = con.fetch(num, '(BODY.PEEK[TEXT])')
for response_part in mesg_body:
if isinstance(response_part, tuple):
message += response_part[1] # add body lines
try:
# decode raw bytes to utf-8 string
mesg_text = "".join(message.decode('utf-8'))
except UnicodeDecodeError as e:
logging.info('unicode decoding error: {}'.format(e))
# try fallback decode latin-1
mesg_text = "".join(message.decode('latin-1'))
# pipe assembled message through rspam cli
with subprocess.Popen([
command,
'--connect',
rhost,
'learn_%s' % task,
], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as rspamc:
rspamc.stdin.write(bytearray(mesg_text, "utf-8"))
rspamc.stdin.close()
result = rspamc.stdout.read().decode("utf-8")
rspamc.stdout.close()
result_lines = result.split("\n")
# test if learning succesfull or "already learned". If either one,
# move to "done" imap folder
if re.match(rspamd_success_pattern, result_lines[1]):
logging.debug(result)
result = con.copy(num, done)
logging.info("copied mail %d to %s" % (int(num), done))
if result[0] == 'OK':
mov, data = con.store(num, '+FLAGS', '(\\Deleted)')
logging.debug("removed learned mail vom %s" % learn)
con.expunge()
logging.debug("expunged learned mail vom %s" % learn)
else:
logging.warning("mail not moved: %s", result)
time.sleep(wait)
con.logout()
return
def main():
"""main run für learning ham/spam from IMAP"""
# open and read config file
config = configparser.ConfigParser()
config.read(CONFIGFILE)
# configure basic logging
loglevel = int(config['logging'].get('level', 30))
logfile = config['logging'].get('file', '/var/log/py-learn-spam.log')
logging.basicConfig(
filename=logfile, level=loglevel,
format='%(asctime)s %(levelname)s - %(message)s')
try:
spamfolder = config['imap']['SPAMFOLDER']
spamdonefolder = config['imap']['SPAMDONEFOLDER']
hamfolder = config['imap']['HAMFOLDER']
hamdonefolder = config['imap']['HAMDONEFOLDER']
host = config['imap']['host']
user = config['imap']['user']
passwd = config['imap']['password']
command = config['spam']['rspamc']
except KeyError as e:
logging.error('reading config error: {}'.format(e))
return
use_ssl = config['imap'].get('use_ssl', 'NO').upper()
if use_ssl not in ['SSL', 'TLS', 'NO']:
logging.error(f"reading config error: use_ssl not in 'SSL', 'TLS', 'NO'")
return
rhost = config['spam'].get('host', '127.0.0.1')
rport = config['spam'].get('port', '127.0.0.1')
rhostport = "%s:%s" % (rhost, rport)
wait = int(config['imap'].get('wait', 5))
logging.info("starting with spam run")
query_folder(
host, use_ssl, wait, user, passwd, spamfolder,
spamdonefolder, "spam", command, rhostport)
logging.info("starting with ham run")
query_folder(
host, use_ssl, wait, user, passwd, hamfolder,
hamdonefolder, "ham", command, rhostport)
if __name__ == '__main__':
main()