Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global key prefix for keys set by Redis transporter #1349

Merged
merged 6 commits into from
Aug 25, 2021

Conversation

gabor-boros
Copy link
Contributor

@gabor-boros gabor-boros commented Jun 20, 2021

This PR continues the work of @wetneb, adding a global key prefix to queue names addressing #853. In the original PR, there were some unresolved issues that are resolved by this PR.

I believe most of the comments in the original approach were answered/resolved but #912 (review).

By adding the global key prefix to the unacked_key , unacked_index_key, unacked_mutex_key the prefix is applied for the unacked and unacked_index keys as well when the message is not ack'ed.

Also, the implementation adds the global key prefix even to the queue received by producer. Meaning that in the following example, the Redis transport will handle a_queue_name as <prefix>a_queue_name.

queue = Queue('a_queue_name', exchange=Exchange('default'))
producer.publish('message_1', exchange=queue.exchange, declare=[queue])

Discussions:

Dependencies: None

Testing instructions:

  1. Start Redis (using docker: docker run --name kombu_redis --rm -it -p 6379:6379 redis:latest)
  2. Connect to redis using redis-cli in a separate terminal window (docker exec -it kombu_redis redis-cli)
  3. List all keys (KEYS *) and validate empty array returned
  4. Run snippet "example 1" (defined below) and validate the message object is printed
  5. List all keys (KEYS *) and validate (only) _prefixed__kombu.binding.default is listed
  6. Flush all keys (FLUSHALL)
  7. Run snippet "example 2" (defined below) in an ipython shell and validate the message object is printed
  8. List all keys (KEYS *) and validate (only) _prefixed_unacked, _prefixed_unacked_index, and _prefixed__kombu.binding.default are listed
  9. Quit the shell and list the keys again (KEYS *) and validate that the _prefixed_a_queue_name is restored and _prefixed__kombu.binding.default is still listed
  10. Run SMEMBERS _prefixed__kombu.binding.default and validate that 1) "\x06\x16\x06\x16_prefixed_a_queue_name" is present

Example 1

from kombu import Exchange, Queue, Producer, Connection, Consumer
connection = Connection('redis://', transport_options={'global_keyprefix': '_prefixed_'})
channel = connection.channel()
producer = Producer(channel)
queue = Queue('a_queue_name', exchange=Exchange('default'))

# Publish a message
producer.publish('message_1', exchange=queue.exchange, declare=[queue])

def printer(message):
    print(message)
    # Remove the line below for unacknowledged behavior
    message.ack()

with Consumer(connection, [queue], on_message=printer):
    connection.drain_events()

Example 2

from kombu import Exchange, Queue, Producer, Connection, Consumer
connection = Connection('redis://', transport_options={'global_keyprefix': '_prefixed_'})
channel = connection.channel()
producer = Producer(channel)
queue = Queue('a_queue_name', exchange=Exchange('default'))

# Publish a message
producer.publish('message_1', exchange=queue.exchange, declare=[queue])

def printer(message):
    print(message)

with Consumer(connection, [queue], on_message=printer):
    connection.drain_events()

Author notes and concerns:

The original PR listed some maintainer concerns, but -- as I mentioned -- I believe those were "discussed" though not confirmed. In case any questions from the original PR are still open and not addressed by this PR, please let me know.

Copy link
Contributor

@pomegranited pomegranited left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 This is clean and functioning beautifully, thank you @gabor-boros !

  • I tested this using the PR test instructions.
  • I read through the code.
  • Includes documentation for the new argument -- open question about where/how to update the docs generated from these comments.

kombu/transport/redis.py Outdated Show resolved Hide resolved
@gabor-boros
Copy link
Contributor Author

@matusvalo @auvipy The linter issues are fixed and I had a question regarding documentation. Could you please take a look?

@matusvalo
Copy link
Member

Hi @gabor-boros thank you for your PR. The CI seems to be failing with following:

kombu/transport/redis.py:41:80: E501 line too long (93 > 79 characters)

@matusvalo
Copy link
Member

Unfortunatelly, I will also need more time for PR review so please be patient.

@gabor-boros
Copy link
Contributor Author

gabor-boros commented Jun 30, 2021

@matusvalo The linter error is fixed. Let me know if you need me to update anything else.
PS: The commits are squashed.

Copy link
Member

@matusvalo matusvalo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabor-boros unfortunately, the code misses the case when default exchange is used:

127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379>
(kombu38) matus@dev:~/dev/kombu$ cat p.py
import kombu
with kombu.Connection('redis://') as conn:
    with conn.channel() as channel:
        producer = kombu.Producer(channel)

        producer.publish(
                {'hello': 'world'},
                retry=True,
                routing_key='rkey',
                retry_policy={
                    'interval_start': 0, # First retry immediately,
                    'interval_step': 2,  # then increase by 2s for every retry.
                    'interval_max': 30,  # but don't exceed 30s between retries.
                    'max_retries': 30,   # give up after 30 tries.
                },
                transport_options={'global_keyprefix': 'myprefix'}
            )
(kombu38) matus@dev:~/dev/kombu$ python p.py
(kombu38) matus@dev:~/dev/kombu$ redis-cli
127.0.0.1:6379> keys *
1) "rkey"
127.0.0.1:6379>

@gabor-boros
Copy link
Contributor Author

@matusvalo Oh! Thank you for pointing it out. I'll update the PR today to include that case too.

@gabor-boros
Copy link
Contributor Author

@gabor-boros unfortunately, the code misses the case when default exchange is used:

@matusvalo I was curious why did the global_keyprefix not passed. I may overlook something, but the transport_options is not a valid kwarg for the publish method. In fact, the **properties is available on publish, but nothing reads transport_options from it. Therefore none of the transport options are can be set from here. If you try unacked_key you can see that that's not changed either.

@matusvalo
Copy link
Member

@gabor-boros yes it was typo but it is still not working (this time I have ensured that transport options were filled as expected):

(kombu38) matus@dev:~/dev/kombu$ redis-cli keys '*'
(empty list or set)
(kombu38) matus@dev:~/dev/kombu$ cat p.py
import kombu
with kombu.Connection('redis://', transport_options={'global_keyprefix': 'myprefix'}) as conn:
    with conn.channel() as channel:
        producer = kombu.Producer(channel)

        producer.publish(
                {'hello': 'world'},
                retry=True,
                routing_key='rkey',
                retry_policy={
                    'interval_start': 0, # First retry immediately,
                    'interval_step': 2,  # then increase by 2s for every retry.
                    'interval_max': 30,  # but don't exceed 30s between retries.
                    'max_retries': 30,   # give up after 30 tries.
                }
            )
(kombu38) matus@dev:~/dev/kombu$ python p.py
(kombu38) matus@dev:~/dev/kombu$ redis-cli keys '*'
1) "rkey"

BTW: I you look into codebase of kombu it cannot work since you are not adjusting key value here (method _put() is used when data are published):

def _put(self, queue, message, **kwargs):
"""Deliver message."""
pri = self._get_message_priority(message, reverse=False)
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))

Moreover, I have also problem with the PR itself. I think that it does changes on wrong layer. Key name adjustments should be done in lower layer in client itself. I propose a solution by :

  1. creating a wrapper of redis client which will adjust key name with the prefix:
class PrefixedClient:

    def __init__(self, redis_client, prefix=''):
        self.prefix = prefix
        self.client = client

    def lpush(queue, *args, **kwargs):
        return self.client.lpush(prefix + queue, *args, **kwargs)
    ... # All other used methods needs to be implemented in similar way
  1. creating a wrapper of pipeline doing the same as redis client (prefixing the queues).
  2. the codebase of redis transport in this case will not need any changes (except creating instance of PrefixedClient).

The reason behind this is following:

  • all logic around prefixing is in single place, so the readability will be better.
  • in this way we can guarantee that all key will be prefixed.
  • unittests will be much easer to create covering all cases.

@gabor-boros
Copy link
Contributor Author

@matusvalo Thank you for your feedback and for investigating the root cause of the issue. Adding queue = self._queue_with_prefix(queue) to the _put method fixes the issue and afterward the rkey in the example prefixed as expected.

Do you mean to create the PrefixedClient in Kombu or in the Redis client implementation?

@gabor-boros
Copy link
Contributor Author

@matusvalo Could you please help me to get unblocked and sort out the remaining bits of this PR?

@matusvalo
Copy link
Member

Sorry @gabor-boros for late reply. I was thinking about creating wrapper around PrefixedClientin kombu. It means you create new class in kombu/transport/redis.py with all used methods wrapped. Each method do just prefixing. Does it make sense?

@gabor-boros
Copy link
Contributor Author

@matusvalo Yes, it makes. I'll update the PR with the requested changes soon.

@lgtm-com
Copy link

lgtm-com bot commented Aug 16, 2021

This pull request introduces 2 alerts when merging 1a358f2 into 78f9b60 - view on LGTM.com

new alerts:

  • 1 for Missing call to `__del__` during object destruction
  • 1 for Missing call to `__init__` during object initialization

Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix the lint failures

@gabor-boros
Copy link
Contributor Author

@auvipy I wasn't satisfied with the constant names I introduced so renaming them solves two problems at once.

@lgtm-com
Copy link

lgtm-com bot commented Aug 17, 2021

This pull request introduces 2 alerts when merging 20e20e7 into 78f9b60 - view on LGTM.com

new alerts:

  • 1 for Missing call to `__del__` during object destruction
  • 1 for Missing call to `__init__` during object initialization

@lgtm-com
Copy link

lgtm-com bot commented Aug 17, 2021

This pull request introduces 2 alerts when merging 75e6113 into 78f9b60 - view on LGTM.com

new alerts:

  • 1 for Missing call to `__del__` during object destruction
  • 1 for Missing call to `__init__` during object initialization

@thedrow thedrow self-assigned this Aug 17, 2021
wetneb and others added 2 commits August 20, 2021 00:04
As per the suggestions, refactor the redis key prefixing to use a custom
redis client that prefixes the keys it uses.

The custom client implementation does not prefix every key by default as
the way of prefixing keys may differ for some redis commands, instead it
lists those keys that will be prefixed. In case of commands, where
multiple keys can be passed as an argument, the custom client defines
where the arg positions are starting and ending for the given command.
@gabor-boros
Copy link
Contributor Author

@auvipy Thanks for noticing. I just squashed the commits and will fix the failing test

@lgtm-com
Copy link

lgtm-com bot commented Aug 19, 2021

This pull request introduces 3 alerts when merging 0c12e02 into 78f9b60 - view on LGTM.com

new alerts:

  • 2 for Conflicting attributes in base classes
  • 1 for Missing call to `__del__` during object destruction

@lgtm-com
Copy link

lgtm-com bot commented Aug 19, 2021

This pull request introduces 3 alerts when merging d95ce1d into 78f9b60 - view on LGTM.com

new alerts:

  • 2 for Conflicting attributes in base classes
  • 1 for Missing call to `__del__` during object destruction

pomegranited pushed a commit to open-craft/kombu that referenced this pull request Aug 20, 2021
Co-authored-by: Matus Valo <[email protected]>

refactor: use a custom redis client

As per the suggestions, refactor the redis key prefixing to use a custom
redis client that prefixes the keys it uses.

The custom client implementation does not prefix every key by default as
the way of prefixing keys may differ for some redis commands, instead it
lists those keys that will be prefixed. In case of commands, where
multiple keys can be passed as an argument, the custom client defines
where the arg positions are starting and ending for the given command.

docs: update authors doc
(cherry picked from commit 3fe1f880846fa44b346496390e4198859662476c)

squashed commits from celery#1349
@gabor-boros
Copy link
Contributor Author

@auvipy @matusvalo @thedrow I believe this is now ready for your review.

auvipy
auvipy previously approved these changes Aug 20, 2021
Copy link
Member

@matusvalo matusvalo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that example from PR description (at least first one) is not working with the patch. When executed nothing is printed:

(kombu37) matus@matus-debian:~/zmaz$ python ex1.py
(kombu37) matus@matus-debian:~/zmaz$

But when I have adjusted the example in a way that I have removed prefixed option - hence connection looked like following:

connection = Connection('redis://')

The script worked fine:

(kombu37) matus@matus-debian:~/zmaz$ python ex1.py
<Message object at 0x7fa5df224c18 with details {'state': 'RECEIVED', 'content_type': 'text/plain', 'delivery_tag': 'c7bf6578-4fb7-4c52-a921-4a975046ebd2', 'body_length': 9, 'properties': {}, 'delivery_info': {'exchange': 'default', 'routing_key': ''}}>

I think this PR needs more love... I will try to allocate some time for investigation - please be patient...

@auvipy auvipy dismissed their stale review August 22, 2021 05:41

not working

Copy link
Member

@matusvalo matusvalo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabor-boros I have found out the reason why the examples were failing - see my code review. The missing piece are unittests covering it. Unfortunately, I am not able to push changes directly to PR branch since you have unchecked this possibility.

kombu/transport/redis.py Show resolved Hide resolved
Copy link
Contributor

@pomegranited pomegranited left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

  • I tested this using the PR test instructions, and everything works perfectly now!
  • I read through the code
  • I checked for accessibility issues N/A
  • Includes documentation

pomegranited pushed a commit to open-craft/kombu that referenced this pull request Aug 23, 2021
Co-authored-by: Matus Valo <[email protected]>

refactor: use a custom redis client

As per the suggestions, refactor the redis key prefixing to use a custom
redis client that prefixes the keys it uses.

The custom client implementation does not prefix every key by default as
the way of prefixing keys may differ for some redis commands, instead it
lists those keys that will be prefixed. In case of commands, where
multiple keys can be passed as an argument, the custom client defines
where the arg positions are starting and ending for the given command.

docs: update authors doc
(cherry picked from commit 3fe1f880846fa44b346496390e4198859662476c)

squashed commits from celery#1349

fix: wrap redis.parse_response to remove key prefixes

Co-authored-by: Matus Valo <[email protected]>
(cherry picked from commit 537534a)

fix: typo

(cherry picked from commit 434f923)
@lgtm-com
Copy link

lgtm-com bot commented Aug 23, 2021

This pull request introduces 3 alerts when merging 5e6dcc8 into 9a91e8b - view on LGTM.com

new alerts:

  • 2 for Conflicting attributes in base classes
  • 1 for Missing call to `__del__` during object destruction

@thedrow thedrow requested a review from matusvalo August 23, 2021 08:53
@matusvalo matusvalo merged commit 39584a1 into celery:master Aug 25, 2021
@matusvalo
Copy link
Member

Thank you @gabor-boros !

gabor-boros added a commit to open-craft/kombu that referenced this pull request Aug 26, 2021
* Introduce global key prefix for redis transport

Co-authored-by: Matus Valo <[email protected]>

* refactor: use a custom redis client

As per the suggestions, refactor the redis key prefixing to use a custom
redis client that prefixes the keys it uses.

The custom client implementation does not prefix every key by default as
the way of prefixing keys may differ for some redis commands, instead it
lists those keys that will be prefixed. In case of commands, where
multiple keys can be passed as an argument, the custom client defines
where the arg positions are starting and ending for the given command.

* test: fix unit tests by moving import statement

* fix: wrap redis.parse_response to remove key prefixes

Co-authored-by: Matus Valo <[email protected]>

* fix: typo

* fix: lint

Co-authored-by: Antonin Delpeuch <[email protected]>
Co-authored-by: Matus Valo <[email protected]>
Co-authored-by: Jillian Vogel <[email protected]>
@pomegranited pomegranited deleted the gabor/add-global-keyprefix branch August 28, 2021 04:35
pomegranited pushed a commit to open-craft/kombu that referenced this pull request Aug 28, 2021
* Introduce global key prefix for redis transport

Co-authored-by: Matus Valo <[email protected]>

* refactor: use a custom redis client

As per the suggestions, refactor the redis key prefixing to use a custom
redis client that prefixes the keys it uses.

The custom client implementation does not prefix every key by default as
the way of prefixing keys may differ for some redis commands, instead it
lists those keys that will be prefixed. In case of commands, where
multiple keys can be passed as an argument, the custom client defines
where the arg positions are starting and ending for the given command.

* test: fix unit tests by moving import statement

* fix: wrap redis.parse_response to remove key prefixes

Co-authored-by: Matus Valo <[email protected]>

* fix: typo

* fix: lint

Co-authored-by: Antonin Delpeuch <[email protected]>
Co-authored-by: Matus Valo <[email protected]>
Co-authored-by: Jillian Vogel <[email protected]>
(cherry picked from commit 39584a1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

redis: add configuration parameter to prefix all keys used with a custom string
6 participants