diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 889a0ed22..57a5a63b7 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -211,6 +211,7 @@ class GlobalKeyPrefixMixin: "DEL": {"args_start": 0, "args_end": None}, "BRPOP": {"args_start": 0, "args_end": -1}, "EVALSHA": {"args_start": 2, "args_end": 3}, + "WATCH": {"args_start": 0, "args_end": None}, } def _prefix_args(self, args): diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index d06f29591..b14408a61 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -1145,6 +1145,37 @@ def test_global_keyprefix_pubsub(self, mock_execute_command): 'foo_/{db}.a', ) + @patch("redis.client.Pipeline.execute_command") + def test_global_keyprefix_transaction(self, mock_execute_command): + from kombu.transport.redis import PrefixedStrictRedis + + with Connection(transport=Transport) as conn: + def pipeline(transaction=True, shard_hint=None): + pipeline_obj = original_pipeline( + transaction=transaction, shard_hint=shard_hint + ) + mock_execute_command.side_effect = [ + None, None, pipeline_obj, pipeline_obj + ] + return pipeline_obj + + client = PrefixedStrictRedis(global_keyprefix='foo_') + original_pipeline = client.pipeline + client.pipeline = pipeline + + channel = conn.channel() + channel._create_client = Mock() + channel._create_client.return_value = client + + channel.qos.restore_by_tag('test-tag') + assert mock_execute_command is not None + assert mock_execute_command.mock_calls == [ + call('WATCH', 'foo_unacked'), + call('HGET', 'foo_unacked', 'test-tag'), + call('ZREM', 'foo_unacked_index', 'test-tag'), + call('HDEL', 'foo_unacked', 'test-tag') + ] + class test_Redis: