From 54a958209e384dc173eaa779017910faccea95e0 Mon Sep 17 00:00:00 2001 From: Antonin Delpeuch Date: Thu, 30 Aug 2018 19:22:31 +0200 Subject: [PATCH 1/6] Introduce global key prefix for redis transport Co-authored-by: Matus Valo --- kombu/transport/redis.py | 30 ++++++++++++++++++++++++++++++ t/unit/transport/test_redis.py | 23 +++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 5b4e1bcaf..029294377 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -38,6 +38,7 @@ * ``unacked_restore_limit`` * ``fanout_prefix`` * ``fanout_patterns`` +* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys used by Kombu * ``socket_timeout`` * ``socket_connect_timeout`` * ``socket_keepalive`` @@ -485,6 +486,11 @@ class Channel(virtual.Channel): #: Disable for backwards compatibility with Kombu 3.x. fanout_patterns = True + #: The global key prefix will be prepended to all keys used + #: by Kombu, which can be useful when a redis database is shared + #: by different users. By default, no prefix is prepended. + global_keyprefix = '' + #: Order in which we consume from queues. #: #: Can be either string alias, or a cycle strategy class @@ -526,6 +532,7 @@ class Channel(virtual.Channel): 'unacked_restore_limit', 'fanout_prefix', 'fanout_patterns', + 'global_keyprefix', 'socket_timeout', 'socket_connect_timeout', 'socket_keepalive', @@ -562,6 +569,23 @@ def __init__(self, *args, **kwargs): # by default. self.keyprefix_fanout = '' + # Prepend the global key prefix + self.unacked_key = self._queue_with_prefix(self.unacked_key) + self.unacked_index_key = self._queue_with_prefix( + self.unacked_index_key + ) + self.unacked_mutex_key = self._queue_with_prefix( + self.unacked_mutex_key + ) + + # The default `keyprefix_queue` starts with an underscore, therefore + # adding a prefix ending an undescore will result in double + # underscores. Since both `keyprefix_queue` and `global_keyprefix` + # can be set by the user, this behavior is better than manipulating + # `keyprefix_queue` here. + self.keyprefix_queue = self._queue_with_prefix(self.keyprefix_queue) + self.keyprefix_fanout = self._queue_with_prefix(self.keyprefix_fanout) + # Evaluate connection. try: self.client.ping() @@ -577,6 +601,10 @@ def __init__(self, *args, **kwargs): if register_after_fork is not None: register_after_fork(self, _after_fork_cleanup_channel) + def _queue_with_prefix(self, queue): + """Return the queue name prefixed with `global_keyprefix` if set.""" + return self.global_keyprefix + queue + def _after_fork(self): self._disconnect_pools() @@ -632,6 +660,7 @@ def _restore_at_beginning(self, message): return self._restore(message, leftmost=True) def basic_consume(self, queue, *args, **kwargs): + queue = self._queue_with_prefix(queue) if queue in self._fanout_queues: exchange, _ = self._fanout_queues[queue] self.active_fanout_queues.add(queue) @@ -846,6 +875,7 @@ def _new_queue(self, queue, auto_delete=False, **kwargs): self.auto_delete_queues.add(queue) def _queue_bind(self, exchange, routing_key, pattern, queue): + queue = self._queue_with_prefix(queue) if self.typeof(exchange).type == 'fanout': # Mark exchange as fanout. self._fanout_queues[queue] = ( diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 52ea7f816..b821d9872 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -926,6 +926,29 @@ def test_sep_transport_option(self): ('celery', '', 'celery'), ] + def test_global_keyprefix(self): + with Connection(transport=Transport, transport_options={ + 'global_keyprefix': 'foo', + }) as conn: + channel = conn.channel() + c = channel._create_client = Mock() + + body = {'hello': 'world'} + channel._put_fanout('exchange', body, '') + c().publish.assert_called_with('foo/{db}.exchange', dumps(body)) + + def test_global_keyprefix_queue_bind(self): + with Connection(transport=Transport, transport_options={ + 'global_keyprefix': 'foo', + }) as conn: + channel = conn.channel() + c = channel._create_client = Mock() + channel._queue_bind('default', '', None, 'queue') + c().sadd.assert_called_with( + 'foo_kombu.binding.default', + '\x06\x16\x06\x16fooqueue' + ) + class test_Redis: From 0c12e02a118561e55210bca2854230813e48ea40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Boros?= Date: Mon, 16 Aug 2021 09:55:45 +0400 Subject: [PATCH 2/6] refactor: use a custom redis client As per the suggestions, refactor the redis key prefixing to use a custom redis client that prefixes the keys it uses. The custom client implementation does not prefix every key by default as the way of prefixing keys may differ for some redis commands, instead it lists those keys that will be prefixed. In case of commands, where multiple keys can be passed as an argument, the custom client defines where the arg positions are starting and ending for the given command. --- AUTHORS | 1 + kombu/transport/redis.py | 130 ++++++++++++++++++++++++++------- t/unit/transport/test_redis.py | 98 +++++++++++++++++++++---- 3 files changed, 189 insertions(+), 40 deletions(-) diff --git a/AUTHORS b/AUTHORS index 34a47ab19..90ae30bd6 100644 --- a/AUTHORS +++ b/AUTHORS @@ -53,6 +53,7 @@ Fernando Jorge Mota Flavio [FlaPer87] Percoco Premoli Florian Munz Franck Cuny +Gábor Boros Germán M. Bravo Gregory Haskins Hank John diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 029294377..c95b9327a 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -38,7 +38,8 @@ * ``unacked_restore_limit`` * ``fanout_prefix`` * ``fanout_patterns`` -* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys used by Kombu +* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys + used by Kombu * ``socket_timeout`` * ``socket_connect_timeout`` * ``socket_keepalive`` @@ -50,6 +51,7 @@ * ``priority_steps`` """ +import functools import numbers import socket from bisect import bisect @@ -180,6 +182,95 @@ def _after_fork_cleanup_channel(channel): channel._after_fork() +class GlobalKeyPrefixMixin: + """Mixin to provide common logic for global key prefixing. + + Overriding all the methods used by Kombu with the same key prefixing logic + would be cumbersome and inefficient. Hence, we override the command + execution logic that is called by all commands. + """ + + PREFIXED_SIMPLE_COMMANDS = [ + "HDEL", + "HGET", + "HSET", + "LLEN", + "LPUSH", + "PUBLISH", + "SADD", + "SET", + "SMEMBERS", + "ZADD", + "ZREM", + "ZREVRANGEBYSCORE", + ] + + PREFIXED_COMPLEX_COMMANDS = { + "BRPOP": {"args_start": 0, "args_end": -1}, + "EVALSHA": {"args_start": 2, "args_end": 3}, + } + + def _prefix_args(self, args): + args = list(args) + command = args.pop(0) + + if command in self.PREFIXED_SIMPLE_COMMANDS: + args[0] = self.global_keyprefix + str(args[0]) + + if command in self.PREFIXED_COMPLEX_COMMANDS.keys(): + args_start = self.PREFIXED_COMPLEX_COMMANDS[command]["args_start"] + args_end = self.PREFIXED_COMPLEX_COMMANDS[command]["args_end"] + + pre_args = args[:args_start] if args_start > 0 else [] + + if args_end is not None: + post_args = args[args_end:] + elif args_end < 0: + post_args = args[len(args):] + else: + post_args = [] + + args = pre_args + [ + self.global_keyprefix + str(arg) + for arg in args[args_start:args_end] + ] + post_args + + return [command, *args] + + def execute_command(self, *args, **kwargs): + return super().execute_command(*self._prefix_args(args), **kwargs) + + def pipeline(self, transaction=True, shard_hint=None): + return PrefixedRedisPipeline( + self.connection_pool, + self.response_callbacks, + transaction, + shard_hint, + global_keyprefix=self.global_keyprefix, + ) + + +class PrefixedStrictRedis(GlobalKeyPrefixMixin, redis.Redis): + """Returns a ``StrictRedis`` client that prefixes the keys it uses.""" + + def __init__(self, *args, **kwargs): + self.global_keyprefix = kwargs.pop('global_keyprefix', '') + redis.Redis.__init__(self, *args, **kwargs) + + +class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline): + """Custom Redis pipeline that takes global_keyprefix into consideration. + + As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix + the keys it uses, the pipeline called by the client must be able to prefix + the keys as well. + """ + + def __init__(self, *args, **kwargs): + self.global_keyprefix = kwargs.pop('global_keyprefix', '') + redis.client.Pipeline.__init__(self, *args, **kwargs) + + class QoS(virtual.QoS): """Redis Ack Emulation.""" @@ -569,23 +660,6 @@ def __init__(self, *args, **kwargs): # by default. self.keyprefix_fanout = '' - # Prepend the global key prefix - self.unacked_key = self._queue_with_prefix(self.unacked_key) - self.unacked_index_key = self._queue_with_prefix( - self.unacked_index_key - ) - self.unacked_mutex_key = self._queue_with_prefix( - self.unacked_mutex_key - ) - - # The default `keyprefix_queue` starts with an underscore, therefore - # adding a prefix ending an undescore will result in double - # underscores. Since both `keyprefix_queue` and `global_keyprefix` - # can be set by the user, this behavior is better than manipulating - # `keyprefix_queue` here. - self.keyprefix_queue = self._queue_with_prefix(self.keyprefix_queue) - self.keyprefix_fanout = self._queue_with_prefix(self.keyprefix_fanout) - # Evaluate connection. try: self.client.ping() @@ -601,10 +675,6 @@ def __init__(self, *args, **kwargs): if register_after_fork is not None: register_after_fork(self, _after_fork_cleanup_channel) - def _queue_with_prefix(self, queue): - """Return the queue name prefixed with `global_keyprefix` if set.""" - return self.global_keyprefix + queue - def _after_fork(self): self._disconnect_pools() @@ -660,7 +730,6 @@ def _restore_at_beginning(self, message): return self._restore(message, leftmost=True) def basic_consume(self, queue, *args, **kwargs): - queue = self._queue_with_prefix(queue) if queue in self._fanout_queues: exchange, _ = self._fanout_queues[queue] self.active_fanout_queues.add(queue) @@ -798,7 +867,12 @@ def _brpop_start(self, timeout=1): keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps for queue in queues] + [timeout or 0] self._in_poll = self.client.connection - self.client.connection.send_command('BRPOP', *keys) + + command_args = ['BRPOP', *keys] + if self.global_keyprefix: + command_args = self.client._prefix_args(command_args) + + self.client.connection.send_command(*command_args) def _brpop_read(self, **options): try: @@ -875,7 +949,6 @@ def _new_queue(self, queue, auto_delete=False, **kwargs): self.auto_delete_queues.add(queue) def _queue_bind(self, exchange, routing_key, pattern, queue): - queue = self._queue_with_prefix(queue) if self.typeof(exchange).type == 'fanout': # Mark exchange as fanout. self._fanout_queues[queue] = ( @@ -1055,6 +1128,13 @@ def _get_client(self): raise VersionMismatch( 'Redis transport requires redis-py versions 3.2.0 or later. ' 'You have {0.__version__}'.format(redis)) + + if self.global_keyprefix: + return functools.partial( + PrefixedStrictRedis, + global_keyprefix=self.global_keyprefix, + ) + return redis.StrictRedis @contextmanager diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index b821d9872..b3542918d 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -12,6 +12,7 @@ from kombu import Connection, Consumer, Exchange, Producer, Queue from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.transport import virtual +from kombu.transport.redis import GlobalKeyPrefixMixin from kombu.utils import eventio # patch poll from kombu.utils.json import dumps @@ -746,7 +747,7 @@ def test_rotate_cycle_ValueError(self): def test_get_client(self): import redis as R KombuRedis = redis.Channel._get_client(self.channel) - assert KombuRedis + assert isinstance(KombuRedis(), R.StrictRedis) Rv = getattr(R, 'VERSION', None) try: @@ -757,6 +758,12 @@ def test_get_client(self): if Rv is not None: R.VERSION = Rv + def test_get_prefixed_client(self): + from kombu.transport.redis import PrefixedStrictRedis + self.channel.global_keyprefix = "test_" + PrefixedRedis = redis.Channel._get_client(self.channel) + assert isinstance(PrefixedRedis(), PrefixedStrictRedis) + def test_get_response_error(self): from redis.exceptions import ResponseError assert redis.Channel._get_response_error(self.channel) is ResponseError @@ -926,27 +933,41 @@ def test_sep_transport_option(self): ('celery', '', 'celery'), ] - def test_global_keyprefix(self): - with Connection(transport=Transport, transport_options={ - 'global_keyprefix': 'foo', - }) as conn: + @patch("redis.StrictRedis.execute_command") + def test_global_keyprefix(self, mock_execute_command): + from kombu.transport.redis import PrefixedStrictRedis + + with Connection(transport=Transport) as conn: + client = PrefixedStrictRedis(global_keyprefix='foo_') + channel = conn.channel() - c = channel._create_client = Mock() + channel._create_client = Mock() + channel._create_client.return_value = client body = {'hello': 'world'} channel._put_fanout('exchange', body, '') - c().publish.assert_called_with('foo/{db}.exchange', dumps(body)) + mock_execute_command.assert_called_with( + 'PUBLISH', + 'foo_/{db}.exchange', + dumps(body) + ) + + @patch("redis.StrictRedis.execute_command") + def test_global_keyprefix_queue_bind(self, mock_execute_command): + from kombu.transport.redis import PrefixedStrictRedis + + with Connection(transport=Transport) as conn: + client = PrefixedStrictRedis(global_keyprefix='foo_') - def test_global_keyprefix_queue_bind(self): - with Connection(transport=Transport, transport_options={ - 'global_keyprefix': 'foo', - }) as conn: channel = conn.channel() - c = channel._create_client = Mock() + channel._create_client = Mock() + channel._create_client.return_value = client + channel._queue_bind('default', '', None, 'queue') - c().sadd.assert_called_with( - 'foo_kombu.binding.default', - '\x06\x16\x06\x16fooqueue' + mock_execute_command.assert_called_with( + 'SADD', + 'foo__kombu.binding.default', + '\x06\x16\x06\x16queue' ) @@ -1523,3 +1544,50 @@ def test_sentinel_with_ssl(self): from kombu.transport.redis import SentinelManagedSSLConnection assert (params['connection_class'] is SentinelManagedSSLConnection) + + +class test_GlobalKeyPrefixMixin: + + global_keyprefix = "prefix_" + mixin = GlobalKeyPrefixMixin() + mixin.global_keyprefix = global_keyprefix + + def test_prefix_simple_args(self): + for command in GlobalKeyPrefixMixin.PREFIXED_SIMPLE_COMMANDS: + prefixed_args = self.mixin._prefix_args([command, "fake_key"]) + assert prefixed_args == [ + command, + f"{self.global_keyprefix}fake_key" + ] + + def test_prefix_brpop_args(self): + prefixed_args = self.mixin._prefix_args([ + "BRPOP", + "fake_key", + "fake_key2", + "not_prefixed" + ]) + + assert prefixed_args == [ + "BRPOP", + f"{self.global_keyprefix}fake_key", + f"{self.global_keyprefix}fake_key2", + "not_prefixed", + ] + + def test_prefix_evalsha_args(self): + prefixed_args = self.mixin._prefix_args([ + "EVALSHA", + "not_prefixed", + "not_prefixed", + "fake_key", + "not_prefixed", + ]) + + assert prefixed_args == [ + "EVALSHA", + "not_prefixed", + "not_prefixed", + f"{self.global_keyprefix}fake_key", + "not_prefixed", + ] From d95ce1d6613de6c2ad4a501e30e36c143349b301 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Boros?= Date: Fri, 20 Aug 2021 00:27:53 +0400 Subject: [PATCH 3/6] test: fix unit tests by moving import statement --- t/unit/transport/test_redis.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index b3542918d..3e1f76f58 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -12,7 +12,6 @@ from kombu import Connection, Consumer, Exchange, Producer, Queue from kombu.exceptions import InconsistencyError, VersionMismatch from kombu.transport import virtual -from kombu.transport.redis import GlobalKeyPrefixMixin from kombu.utils import eventio # patch poll from kombu.utils.json import dumps @@ -1548,12 +1547,14 @@ def test_sentinel_with_ssl(self): class test_GlobalKeyPrefixMixin: + from kombu.transport.redis import GlobalKeyPrefixMixin + global_keyprefix = "prefix_" mixin = GlobalKeyPrefixMixin() mixin.global_keyprefix = global_keyprefix def test_prefix_simple_args(self): - for command in GlobalKeyPrefixMixin.PREFIXED_SIMPLE_COMMANDS: + for command in self.mixin.PREFIXED_SIMPLE_COMMANDS: prefixed_args = self.mixin._prefix_args([command, "fake_key"]) assert prefixed_args == [ command, From 537534a5e234e4f32deec981213600fb66eb0007 Mon Sep 17 00:00:00 2001 From: Jillian Vogel Date: Mon, 23 Aug 2021 11:32:00 +0930 Subject: [PATCH 4/6] fix: wrap redis.parse_response to remove key prefixes Co-authored-by: Matus Valo --- kombu/transport/redis.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index c95b9327a..33810cfcc 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -237,6 +237,18 @@ def _prefix_args(self, args): return [command, *args] + def parse_response(self, connection, command_name, **options): + """Parses a response from the Redis server. + + Method wraps ``redis.parse_response()`` to remove prefixes of keys + returned by redis command. + """ + ret = super().parse_response(connection, command_name, **options) + if command_name == 'BRPOP' and ret: + key, value = ret + key = key[len(self.global_keyprefix):] + return key, value + return ret def execute_command(self, *args, **kwargs): return super().execute_command(*self._prefix_args(args), **kwargs) From 434f9233981ee3f364e1359f02633e61183b0f74 Mon Sep 17 00:00:00 2001 From: Jillian Vogel Date: Mon, 23 Aug 2021 11:35:19 +0930 Subject: [PATCH 5/6] fix: typo --- kombu/transport/redis.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 33810cfcc..1bd6ba9bd 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -249,6 +249,7 @@ def parse_response(self, connection, command_name, **options): key = key[len(self.global_keyprefix):] return key, value return ret + def execute_command(self, *args, **kwargs): return super().execute_command(*self._prefix_args(args), **kwargs) From 5e6dcc89d43572d3e6acaf4a0bccc359df7d8c5e Mon Sep 17 00:00:00 2001 From: Jillian Vogel Date: Mon, 23 Aug 2021 16:19:12 +0930 Subject: [PATCH 6/6] fix: lint --- kombu/transport/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 1bd6ba9bd..02d06b458 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -238,7 +238,7 @@ def _prefix_args(self, args): return [command, *args] def parse_response(self, connection, command_name, **options): - """Parses a response from the Redis server. + """Parse a response from the Redis server. Method wraps ``redis.parse_response()`` to remove prefixes of keys returned by redis command.