Skip to content

Commit

Permalink
Merge pull request #80 from wegift/retrying
Browse files Browse the repository at this point in the history
Reconnect to Redis on connection or time-out error
  • Loading branch information
sibson authored Oct 1, 2018
2 parents 8635a28 + 90cfbe9 commit 97223a6
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 21 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,14 @@ avoid conflicting options. Here follows the example:::
'password': '123',
'service_name': 'master',
'socket_timeout': 0.1,
'retry_period': 60,
}

If ``retry_period`` is given, retry connection for ``retry_period``
seconds. If not set, retrying mechanism is not triggered. If set
to ``-1`` retry infinitely.



Development
--------------
Expand Down
83 changes: 67 additions & 16 deletions redbeat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import absolute_import

import calendar
import logging
import warnings
from datetime import datetime, MINYEAR
from distutils.version import StrictVersion
Expand All @@ -28,7 +29,13 @@
from celery.app import app_or_default
from celery.five import values
from kombu.utils.url import maybe_sanitize_url
from tenacity import (before_sleep_log,
retry,
retry_if_exception_type,
stop_after_delay,
wait_exponential)

import redis.exceptions
from redis.client import StrictRedis

from .decoder import (
Expand All @@ -39,6 +46,45 @@
CELERY_4_OR_GREATER = CELERY_VERSION[0] >= 4


class RetryingConnection(object):
"""A proxy for the Redis connection that delegates all the calls to
underlying Redis connection while retrying on connection or time-out error.
"""
RETRY_MAX_WAIT = 30

def __init__(self, retry_period, wrapped_connection):
self.wrapped_connection = wrapped_connection
self.retry_kwargs = dict(
retry=(retry_if_exception_type(redis.exceptions.ConnectionError)
| retry_if_exception_type(redis.exceptions.TimeoutError)),
reraise=True,
wait=wait_exponential(multiplier=1, max=self.RETRY_MAX_WAIT),
before_sleep=self._log_retry_attempt
)
if retry_period >= 0:
self.retry_kwargs.update(dict(stop=stop_after_delay(retry_period)))

def __getattr__(self, item):
method = getattr(self.wrapped_connection, item)

# we don't want to deal attributes or properties
if not callable(method):
return method

@retry(**self.retry_kwargs)
def retrier(*args, **kwargs):
return method(*args, **kwargs)

return retrier

@staticmethod
def _log_retry_attempt(retry_state):
"""Log when next reconnection attempt is about to be made."""
logger.log(logging.WARNING,
"Retrying connection in %s seconds...",
retry_state.next_action.sleep)


def ensure_conf(app):
"""
Ensure for the given app the the redbeat_conf
Expand All @@ -55,23 +101,28 @@ def ensure_conf(app):
return config


def redis(app=None):
def get_redis(app=None):
app = app_or_default(app)
conf = ensure_conf(app)
if not hasattr(app, 'redbeat_redis') or app.redbeat_redis is None:
redis_options = conf.app.conf.get(
'REDBEAT_REDIS_OPTIONS',
conf.app.conf.get('BROKER_TRANSPORT_OPTIONS', {}))
retry_period = redis_options.get('retry_period')
if conf.redis_url.startswith('redis-sentinel') and 'sentinels' in redis_options:
from redis.sentinel import Sentinel
sentinel = Sentinel(redis_options['sentinels'],
socket_timeout=redis_options.get('socket_timeout'),
password=redis_options.get('password'),
decode_responses=True)
app.redbeat_redis = sentinel.master_for(redis_options.get('service_name', 'master'))
connection = sentinel.master_for(redis_options.get('service_name', 'master'))
else:
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True)

if retry_period is None:
app.redbeat_redis = connection
else:
app.redbeat_redis = StrictRedis.from_url(conf.redis_url,
decode_responses=True)
app.redbeat_redis = RetryingConnection(retry_period, connection)

return app.redbeat_redis

Expand Down Expand Up @@ -131,7 +182,7 @@ def __init__(self, name=None, task=None, schedule=None,
@staticmethod
def load_definition(key, app=None, definition=None):
if definition is None:
definition = redis(app).hget(key, 'definition')
definition = get_redis(app).hget(key, 'definition')

if not definition:
raise KeyError(key)
Expand All @@ -146,7 +197,7 @@ def decode_definition(definition):

@staticmethod
def load_meta(key, app=None):
return RedBeatSchedulerEntry.decode_meta(redis(app).hget(key, 'meta'))
return RedBeatSchedulerEntry.decode_meta(get_redis(app).hget(key, 'meta'))

@staticmethod
def decode_meta(meta, app=None):
Expand All @@ -158,7 +209,7 @@ def decode_meta(meta, app=None):
@classmethod
def from_key(cls, key, app=None):
ensure_conf(app)
with redis(app).pipeline() as pipe:
with get_redis(app).pipeline() as pipe:
pipe.hget(key, 'definition')
pipe.hget(key, 'meta')
definition, meta = pipe.execute()
Expand Down Expand Up @@ -206,7 +257,7 @@ def score(self):

@property
def rank(self):
return redis(self.app).zrank(self.app.redbeat_conf.schedule_key, self.key)
return get_redis(self.app).zrank(self.app.redbeat_conf.schedule_key, self.key)

def save(self):
definition = {
Expand All @@ -218,15 +269,15 @@ def save(self):
'schedule': self.schedule,
'enabled': self.enabled,
}
with redis(self.app).pipeline() as pipe:
with get_redis(self.app).pipeline() as pipe:
pipe.hset(self.key, 'definition', json.dumps(definition, cls=RedBeatJSONEncoder))
pipe.zadd(self.app.redbeat_conf.schedule_key, self.score, self.key)
pipe.execute()

return self

def delete(self):
with redis(self.app).pipeline() as pipe:
with get_redis(self.app).pipeline() as pipe:
pipe.zrem(self.app.redbeat_conf.schedule_key, self.key)
pipe.delete(self.key)
pipe.execute()
Expand All @@ -243,7 +294,7 @@ def _next_instance(self, last_run_at=None, only_update_last_run_at=False):
'total_run_count': entry.total_run_count,
}

with redis(self.app).pipeline() as pipe:
with get_redis(self.app).pipeline() as pipe:
pipe.hset(self.key, 'meta', json.dumps(meta, cls=RedBeatJSONEncoder))
pipe.zadd(self.app.redbeat_conf.schedule_key, entry.score, entry.key)
pipe.execute()
Expand All @@ -256,7 +307,7 @@ def reschedule(self, last_run_at=None):
meta = {
'last_run_at': self.last_run_at,
}
with redis(self.app).pipeline() as pipe:
with get_redis(self.app).pipeline() as pipe:
pipe.hset(self.key, 'meta', json.dumps(meta, cls=RedBeatJSONEncoder))
pipe.zadd(self.app.redbeat_conf.schedule_key, self.score, self.key)
pipe.execute()
Expand Down Expand Up @@ -290,7 +341,7 @@ def __init__(self, app, lock_key=None, lock_timeout=None, **kwargs):

def setup_schedule(self):
# cleanup old static schedule entries
client = redis(self.app)
client = get_redis(self.app)
previous = set(key for key in client.smembers(self.app.redbeat_conf.statics_key))
removed = previous.difference(self.app.redbeat_conf.schedule.keys())

Expand Down Expand Up @@ -331,7 +382,7 @@ def schedule(self):
logger.debug('Selecting tasks')

max_due_at = to_timestamp(self.app.now())
client = redis(self.app)
client = get_redis(self.app)

with client.pipeline() as pipe:
pipe.zrangebyscore(self.app.redbeat_conf.schedule_key, 0, max_due_at)
Expand Down Expand Up @@ -373,7 +424,7 @@ def maybe_due(self, entry, **kwargs):
def tick(self, min=min, **kwargs):
if self.lock:
logger.debug('beat: Extending lock...')
redis(self.app).pexpire(self.lock_key, int(self.lock_timeout * 1000))
get_redis(self.app).pexpire(self.lock_key, int(self.lock_timeout * 1000))

remaining_times = []
try:
Expand Down Expand Up @@ -418,7 +469,7 @@ def acquire_distributed_beat_lock(sender=None, **kwargs):

logger.debug('beat: Acquiring lock...')

lock = redis(scheduler.app).lock(
lock = get_redis(scheduler.app).lock(
scheduler.lock_key,
timeout=scheduler.lock_timeout,
sleep=scheduler.max_interval,
Expand Down
1 change: 1 addition & 0 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pytest-catchlog
pytest-cov
python-dateutil
redis
tenacity
tox
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
install_requires=[
'redis',
'celery',
'python-dateutil'
'python-dateutil',
'tenacity'
],
tests_require=[
'pytest',
Expand Down
8 changes: 4 additions & 4 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from basecase import RedBeatCase, AppCase
from redbeat import RedBeatScheduler
from redbeat.schedulers import redis
from redbeat.schedulers import get_redis


class mocked_schedule(schedule):
Expand Down Expand Up @@ -168,7 +168,7 @@ def setup(self):
pass

def test_sentinel_scheduler(self):
redis_client = redis(app=self.app)
redis_client = get_redis(app=self.app)
assert 'Sentinel' not in str(redis_client.connection_pool)

class SentinelRedBeatCase(AppCase):
Expand All @@ -194,7 +194,7 @@ def setup(self): # celery3
self.app.conf.add_defaults(deepcopy(self.config_dict))

def test_sentinel_scheduler(self):
redis_client = redis(app=self.app)
redis_client = get_redis(app=self.app)
assert 'Sentinel' in str(redis_client.connection_pool)


Expand All @@ -221,5 +221,5 @@ def setup(self): # celery3
self.app.conf.add_defaults(deepcopy(self.config_dict))

def test_sentinel_scheduler(self):
redis_client = redis(app=self.app)
redis_client = get_redis(app=self.app)
assert 'Sentinel' in str(redis_client.connection_pool)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ deps=
pytest-cov
python-dateutil
redis
tenacity

commands=
py.test [] tests {posargs}

0 comments on commit 97223a6

Please sign in to comment.