Skip to content

Commit

Permalink
Add code to do intiial rule processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomi Richards committed Jun 12, 2015
1 parent c1730e3 commit ac33463
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 0 deletions.
112 changes: 112 additions & 0 deletions gmailfilter/_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@

"""Code for loading rules."""

import collections.abc
import os.path
import importlib
from textwrap import dedent


class RuleLoadError(Exception):
pass


def load_rules(path=None):
"""Load the users ruleset.
Returns a Ruleset object, or raises an exception.
If the rules file is not found, a default one will be written, and a
RuleLoadError will be raised.
"""
path = path or default_rules_path()
loader = importlib.machinery.SourceFileLoader('rules', path)
# We may want to catch the exception here and provide a more user-friendly
# exception.
try:
rules = loader.load_module()
except FileNotFoundError:
write_default_rules_file(path)
raise RuleLoadError(
"No rules file found. "
"A default one has been written at {}.".format(path)
)
try:
return RuleSet(rules.RULES)
except AttributeError:
raise RuleLoadError(
"Rules file {} has no attribute 'RULES'".format(path)
)


def write_default_rules_file(path=None):
path = path or default_rules_path()
with open(path, 'w') as rules_file:
rules_file.write(dedent(
'''
# Sample email rules file. Edit this file to set up your email
# filtering rules. There are a few things to remember:
#
# 1. This file is python3, so you can do whatever you want.
# 2. There are a number of pre-supplied tests and actions:
from gmailfilter import test, actions
# 3. The only requirement is there is a variable named 'RULES'
# that must be an iterable of rules. See below:
RULES = (
# each line is a new rule.
# The first item is a test to run.
# All subsequent items are actions to perform.
(test.SubjectContains('test email'), actions.Move('Junk/')),
)
'''
))

def default_rules_path():
return os.path.expanduser('~/.config/gmailfilter/rules.py')


class RuleSet(object):

def __init__(self, rules):
RuleSet.check_rules(rules)
self._rules = rules

def __iter__(self):
yield from self._rules

@staticmethod
def check_rules(rules):
"""Check rule validity. Raise RuleLoadError if any are invalid."""
# check entire ruleset first:
if not isinstance(rules, collections.abc.Iterable):
raise RuleLoadError('RULES data structure must be an iterable')
for rule in rules:
rule_repr = repr(rule)
if not isinstance(rule, collections.abc.Iterable) or len(rule) < 2:
raise RuleLoadError(
'rule {} must be an iterable with at least '
'two items'.format(rule_repr)
)
test = rule[0]
if not callable(getattr(test, 'match', None)):
raise RuleLoadError('Test for rule {} does not have a '
'callable "match" method.'.format(rule_repr)
)


class SimpleRuleProcessor(object):

def __init__(self, ruleset, connection):
self._ruleset = ruleset
self._connection = connection

def process_message(self, message):
for test, *actions in self._ruleset:
if test.match(message):
for action in actions:
action.process(self._connection._client, str(message.uid()))
break
45 changes: 45 additions & 0 deletions gmailfilter/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@


"""Classes that manipulate mails."""


class Action(object):
"""
All actions must implement this interface (though they need not inherit
from this class).
"""

def process(self, client_conn, message_uid):
"""Run the action.
'client_conn' will be an IMAPClient.IMAPClient object, possibly with
access to dangerous methods removed (TODO: Document this interface
explicitly).
'message_uid' will be the message uid, as a string.
If this method raises any exceptions, action processing will stop, and
an error will be logged (TODO: Actually do that somewhere).
"""


class Move(Action):

def __init__(self, target_folder):
self._target_folder = target_folder

def process(self, conn, uid):
# TODO: optimise this by trying the copy, and if we get 'NO' with
# 'TRYCREATE' then, and only then try and create the folder. Removes the
# overhead of the existance check for every message,
if not conn.folder_exists(self._target_folder):
status = conn.create_folder(self._target_folder)

assert status.lower() == "success", "Unable to create folder %s" % self._target_folder

conn.copy(uid, self._target_folder)
# TODO: Maybe provide logging facilities in parent 'Action' class?
# logging.info("Deleting %s" % uid)
conn.delete_messages(uid)

0 comments on commit ac33463

Please sign in to comment.