-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cron #60
Merged
Merged
Cron #60
Changes from all commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
6eb6eaf
Initial implementation of a cron plugin: supports scheduling of one-o…
barrucadu 65e5312
Add scheduleAt method to cron, to allow scheduling by datetimes
barrucadu 6b98700
Initial implementation of regular events
barrucadu a7248c6
Everything is running in Twisted's thread now, so no need for locks
barrucadu 9431443
Better handling of exceptions in cron callbacks
barrucadu 1447810
Improve documentation
barrucadu b8388f9
Fix starting of regular events (albeit in an ugly way)
barrucadu c78b70d
Simplify plugin interface to cron: restrict it to just that plugin's …
barrucadu 4c670d2
Remove Cron::scheduleAt
barrucadu 663780c
Unify Cron::schedule and Cron::scheduleEvery
barrucadu e37fc1a
Fix Cron::setup to use schedule instead of scheduleAt
barrucadu ed345a1
Rename PluginCron::schedule to PluginCron::after, to fit in with Plug…
barrucadu ba1fae6
Raise an exception in Cron::schedule if two events with the same name…
barrucadu 3eac061
Make Cron plugin-agnostic, and handle the name-mangling in PluginCron
barrucadu 653ab4f
Prettify Cron::setup_regular a bit
barrucadu cbc0a9f
The argument to the DuplicateNameException is now just the name
barrucadu 3db9c7d
Callbacks passed to Cron::schedule now get passed the time at which t…
barrucadu f12cfb5
Don't print out the callback in a Cron::_runcb error log
barrucadu 74bb2c7
Fix computation of when in Cron::setup
barrucadu 1fe0df4
Completely change representation of tasks in cron: now pass a method …
barrucadu 9a446fb
Integrate cron with mongodb so tasks get saved and restored when Math…
barrucadu 4780134
Gracefully handle tasks referring to bad plugin or method names
barrucadu b4bd822
Don't save cron.hourly/daily/weekly to database, as per the comment
barrucadu a55c503
Store all tasks in the database, and only remove them when a task has…
barrucadu 67eddd0
Update documentation
barrucadu 43a2efd
More comments, and shuffle around the scheduler slightly
barrucadu 2cd526c
More comments explaining the scheduler
barrucadu 94d8aca
Fix a documentation typo
barrucadu d376f07
Make regular events actually work (bug resulting from the callback to…
barrucadu 9e97ecc
Replace the six self.tasks.remove calls with one regex removal
barrucadu 08dcbf6
Add in a missing repeating=True (NOW regular events should work)
barrucadu 78816c9
Actually store task names (this is why I should test things)
barrucadu e56d8bc
Move removing/rescheduling tasks to the top of the scheduler loop, to…
barrucadu 04a155d
Use a raw string literal for the tasks.remove regex
barrucadu 5fcd5c9
That shouldn't have been in quotes…
barrucadu 430687d
In event_runner, only catch Exception, and don't use sys.exc_info()
barrucadu 23ea83f
Catch an AttributeError in event_runner on missing methods
barrucadu ad7793c
Remove self.plugins, use self.bot.plugins
barrucadu 2aefb06
Simplify task definition
barrucadu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,336 @@ | ||
from csbot.plugin import Plugin | ||
from csbot.events import Event | ||
from twisted.internet import task | ||
from datetime import datetime, timedelta | ||
import pymongo | ||
|
||
|
||
class Cron(Plugin): | ||
""" | ||
Time, that most mysterious of things. What is it? Is it discrete or | ||
continuous? What was before time? Does that even make sense to ask? This | ||
plugin will attempt to address some, perhaps all, of these questions. | ||
|
||
More seriously, this plugin allows the scheduling of events. Due to | ||
computers being the constructs of fallible humans, it's not guaranteed | ||
that a callback will be run precisely when you want it to be. Furthermore, | ||
if you schedule multiple events at the same time, don't make any | ||
assumptions about the order in which they'll be called. | ||
|
||
Example of usage: | ||
|
||
class MyPlugin(Plugin): | ||
PLUGIN_DEPENDS = ['cron'] | ||
|
||
@Plugin.integrate_with('cron') | ||
def _get_cron(self, cron): | ||
self.cron = cron.get_cron(self) | ||
|
||
def setup(self): | ||
... | ||
self.cron.after( | ||
"hello world", | ||
datetime.timedelta(days=1), | ||
"callback") | ||
|
||
def callback(self, when): | ||
self.log.info(u'I got called at {}'.format(when)) | ||
|
||
@Plugin.hook('cron.hourly') | ||
def hourlyevent(self, e): | ||
self.log.info(u'An hour has passed') | ||
""" | ||
|
||
@Plugin.integrate_with('mongodb') | ||
def _get_db(self, mongodb): | ||
self.db = mongodb.get_db(self.plugin_name()) | ||
self.tasks = self.db.tasks | ||
|
||
def setup(self): | ||
super(Cron, self).setup() | ||
|
||
# self.scheduler is a handle to the scheduler repeating task, and | ||
# self.scheduler_freq is how frequently it gets called. These need to | ||
# be set before anything is scheduled (like the repeated events). | ||
self.scheduler = None | ||
self.scheduler_freq = -1 | ||
|
||
# Now we need to remove the hourly, daily, and weekly events | ||
# (if there are any), because the scheduler just runs things | ||
# when their time has passed, but for these we want to run | ||
# them as close to the correct time as possible, so running a | ||
# past event is useless for our purposes. | ||
# | ||
# Sadly this can't happen in the teardown, as we want to do | ||
# this even if the bot crashes unexpectedly. | ||
self.tasks.remove({'owner': 'cron'}) | ||
|
||
# Add regular cron.hourly/daily/weekly events which plugins | ||
# can listen to. | ||
now = datetime.now() | ||
when = -timedelta(minutes=now.minute, | ||
seconds=now.second, | ||
microseconds=now.microsecond) | ||
|
||
self.schedule(owner='cron', | ||
name='hourly', | ||
when=now + when + timedelta(hours=1), | ||
interval=timedelta(hours=1), | ||
callback='fire_event', | ||
args=['cron.hourly']) | ||
|
||
when -= timedelta(hours=now.hour) | ||
self.schedule(owner='cron', | ||
name='daily', | ||
when=now + when + timedelta(days=1), | ||
interval=timedelta(days=1), | ||
callback='fire_event', | ||
args=['cron.daily']) | ||
|
||
when -= timedelta(days=now.weekday()) | ||
self.schedule(owner='cron', | ||
name='weekly', | ||
when=now + when + timedelta(weeks=1), | ||
interval=timedelta(weeks=1), | ||
callback='fire_event', | ||
args=['cron.weekly']) | ||
|
||
def fire_event(self, now, name): | ||
""" | ||
Fire off a regular event. This gets called by the scheduler at the | ||
appropriate time. | ||
""" | ||
|
||
self.bot.post_event(Event(None, name)) | ||
|
||
def get_cron(self, plugin): | ||
""" | ||
Return the crond for the given plugin, and save a reference to the | ||
plugin so it can be used by scheduled tasks. | ||
""" | ||
|
||
return PluginCron(self, plugin) | ||
|
||
def schedule(self, owner, name, when, | ||
interval=None, callback=None, | ||
args=[], kwargs={}): | ||
""" | ||
Schedule a new task. | ||
|
||
:param owner: The plugin which created the task | ||
:param name: The name of the task | ||
:param when: The datetime to trigger the task at | ||
:param interval: Optionally, reschedule at when + interval | ||
when triggered. Gives rise to repeating | ||
tasks. | ||
:param callback: Call owner.callback when triggered; if None, | ||
call owner.name. | ||
:param args: Callback positional arguments. | ||
:param kwargs: Callback keyword arguments. | ||
|
||
The name can be used to remove a callback. Names must be unique, | ||
otherwise a DuplicateNameException will be raised. | ||
""" | ||
|
||
if self.tasks.find_one({'owner': owner, | ||
'name': name}): | ||
raise DuplicateNameException(name) | ||
|
||
# Create the new task | ||
secs = interval.total_seconds() if interval is not None else None | ||
cb = name if callback is None else callback | ||
self.tasks.insert({'owner': owner, | ||
'name': name, | ||
'when': when, | ||
'interval': secs, | ||
'callback': cb, | ||
'args': args, | ||
'kwargs': kwargs}) | ||
|
||
# Call the scheduler immediately, as it may now need to be called | ||
# sooner than it had planned. | ||
self.event_runner() | ||
|
||
def unschedule(self, owner, name): | ||
""" | ||
Unschedule a named callback. | ||
|
||
This could result in the scheduler having nothing to do in its next | ||
call, but this isn't a problem as it's not a very intensive function, | ||
so there's no point in rescheduling it here. | ||
""" | ||
|
||
self.tasks.remove({'owner': owner, | ||
'name': name}) | ||
|
||
def event_runner(self): | ||
""" | ||
Run all tasks which have a trigger time in the past, and then | ||
reschedule self to run in time for the next task. | ||
""" | ||
|
||
now = datetime.now() | ||
|
||
# Find and run every task from before now | ||
for taskdef in self.tasks.find({'when': {'$lt': now}}): | ||
# Going to be using this a lot | ||
task_name = u'{}/{}'.format( | ||
taskdef['owner'], | ||
taskdef['name']) | ||
|
||
self.log.info(u'Running task ' + task_name) | ||
|
||
# Now that we have the task, we need to remove it from the | ||
# database (or reschedule it for the future) straight | ||
# away, as if it schedules things itself, the scheduler | ||
# will be called again, but the task will still be there | ||
# (and so be run again), resulting in an error when it | ||
# tries to schedule the second time. | ||
if taskdef['interval'] is not None: | ||
taskdef['when'] += timedelta(seconds=taskdef['interval']) | ||
self.tasks.save(taskdef) | ||
else: | ||
self.unschedule(taskdef['owner'], taskdef['name']) | ||
|
||
# There are two things that could go wrong in running a | ||
# task. The method might not exist, this can arise in two | ||
# ways: a plugin scheduled it in a prior incarnation of | ||
# the bot, and then didn't register start up on this run, | ||
# resulting in there being no entry in self.bot.plugins, | ||
# or it could have just provided a bad method name. | ||
# | ||
# There is clearly no way to recover from this with any | ||
# degree of certainty, so we just drop it from the | ||
# database to prevent an error cropping up every time it | ||
# gets run. | ||
try: | ||
func = getattr(self.bot.plugins[taskdef['owner']], | ||
taskdef['callback']) | ||
except AttributeError: | ||
self.log.error( | ||
u'Couldn\'t find method {}.{} for task {}'.format( | ||
taskdef['owner'], | ||
taskdef['callback'], | ||
task_name)) | ||
self.unschedule(taskdef['owner'], taskdef['name']) | ||
continue | ||
|
||
# The second way is if the method does exist, but raises | ||
# an exception during its execution. There are two ways to | ||
# handle this. We could let the exception propagate | ||
# upwards and outwards, killing the bot, or we could log | ||
# it as an error and carry on. I went for the latter here, | ||
# on the assumption that, whilst exceptions are bad and | ||
# shouldn't get this far anyway, killing the bot is worse. | ||
try: | ||
func(taskdef['when'], *taskdef['args'], **taskdef['kwargs']) | ||
except Exception as e: | ||
# Don't really want exceptions to kill cron, so let's just log | ||
# them as an error. | ||
|
||
self.log.error( | ||
u'Exception raised when running task {}: {} {}'.format( | ||
task_name, | ||
type(e), e.args)) | ||
|
||
# Schedule the event runner to happen no sooner than is required by the | ||
# next scheduled task. | ||
# | ||
# There will always be at least one event remaining because we | ||
# have three repeating ones, so this is safe. | ||
remaining_tasks = self.tasks.find().sort('when', pymongo.ASCENDING) | ||
next_run = remaining_tasks[0]['when'] | ||
|
||
# We use a looping call for the scheduler, rather than a | ||
# deferred task, because the expected behaviour is that cron | ||
# won't actually have that much to do. In fact, it wouldn't | ||
# surprise me if the most frequent events were the cron.hourly | ||
# ones. As it's likely that cron will end up running at a | ||
# mostly constant frequency anyway, using a looping call is | ||
# less work compared to rescheduling it every single time. | ||
freq = (next_run - now).total_seconds() | ||
|
||
if freq != self.scheduler_freq: | ||
# The first time this runs we won't actually have a | ||
# scheduler handle, so we have to check. | ||
if self.scheduler is not None: | ||
self.scheduler.stop() | ||
|
||
self.scheduler = task.LoopingCall(self.event_runner) | ||
self.scheduler.start(freq, now=False) | ||
self.scheduler_freq = freq | ||
|
||
|
||
class DuplicateNameException(Exception): | ||
""" | ||
This can be raised by Cron::schedule if a plugin tries to register two | ||
events with the same name. | ||
""" | ||
|
||
pass | ||
|
||
|
||
class PluginCron(object): | ||
""" | ||
An interface to the cron methods restricted to the view of one named | ||
plugin. | ||
|
||
How scheduling works: | ||
|
||
All of the scheduling functions have a signature of the form | ||
(name, time, method_name, *args, **kwargs). | ||
|
||
This means that at the appropriate time, the method plugin.method_name | ||
will be called with the arguments (time, *args, **kwargs), where the | ||
time argument is the time it was supposed to be run by the scheduler | ||
(which may not be identical to teh actual time it is run). | ||
|
||
These functions will raise a DuplicateNameException if you try to | ||
schedule two events with the same name. | ||
""" | ||
|
||
def __init__(self, cron, plugin): | ||
self.cron = cron | ||
self.plugin = plugin.plugin_name() | ||
|
||
def after(self, name, delay, method_name, *args, **kwargs): | ||
""" | ||
Schedule an event to occur after the timedelta delay has passed. | ||
""" | ||
|
||
self.cron.schedule(self.plugin, name, | ||
datetime.now() + delay, | ||
callback=method_name, | ||
args=args, | ||
kwargs=kwargs) | ||
|
||
def at(self, name, when, method_name, *args, **kwargs): | ||
""" | ||
Schedule an event to occur at a given time. | ||
""" | ||
|
||
self.cron.schedule(self.plugin, name, | ||
when, | ||
callback=methhod_name, | ||
args=args, | ||
kwargs=kwargs) | ||
|
||
def every(self, name, freq, method_name, *args, **kwargs): | ||
""" | ||
Schedule an event to occur every time the delay passes. | ||
""" | ||
|
||
self.cron.schedule(self.plugin, name, | ||
datetime.now() + freq, | ||
interval=freq, | ||
callback=method_name, | ||
args=args, | ||
kwargs=kwargs) | ||
|
||
def unschedule(self, name): | ||
""" | ||
Unschedule a named event which hasn't yet happened. | ||
If the name doesn't exist, nothing happens. | ||
""" | ||
|
||
self.cron.unschedule(self.plugin, name) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about tasks that might legitimately have more than one instance, e.g. deleting a title element, reversing a ban, etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should cancelling a task behave in that case then? Should all tasks of the same name be cancelled? Just one, if so, which? Alternatively, there could be anonymous tasks, by making
name
optional, which can't be cancelled.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think cancelling tasks should be as specific as the cancel request, as in
def unschedule(self, name, args=None, kwargs=None)
-name
is compulsory, but match is restricted byargs
andkwargs
if they're present. This is on the assumption that if you want to cancel a task, you know enough about the task to cancel that specific one.