Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis PubSub Not Close Subscribing #55

Closed
vessaldaneshvar opened this issue Oct 8, 2021 · 3 comments
Closed

Redis PubSub Not Close Subscribing #55

vessaldaneshvar opened this issue Oct 8, 2021 · 3 comments

Comments

@vessaldaneshvar
Copy link

Hi
I Using broadcaster for websocket connection and set new unique id for websocket client when new websocket connection open and subscribe on this unique id
other application publish message to websocket client by this unique id.
my application work properly and this is fine.
but I have one issue .
connection close but subscribe of redis not close until applicatin down.

async def ws_sender(websocket):
    async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

context manager close but subscribe not close (unsubscribe)

@EdgyEdgemond
Copy link

EdgyEdgemond commented Mar 2, 2022

I have just tried switching to redis backend (as postgres backend seems to have memory leak issues due to asyncpg somewhere).

The issue with the subscribe not closing is due to the handling of the subscribers in _base.py


    @asynccontextmanager
    async def subscribe(self, channel: str) -> 'Subscriber':
        queue: asyncio.Queue = asyncio.Queue()

        try:
            if not self._subscribers.get(channel):
                await self._backend.subscribe(channel)
                self._subscribers[channel] = set([queue])
            else:
                self._subscribers[channel].add(queue)

            yield Subscriber(queue)

            self._subscribers[channel].remove(queue)
            if not self._subscribers.get(channel):
                del self._subscribers[channel]
                await self._backend.unsubscribe(channel)

        finally:

            await queue.put(None)

This section

            self._subscribers[channel].remove(queue)
            if not self._subscribers.get(channel):
                del self._subscribers[channel]
                await self._backend.unsubscribe(channel)

Needs to be contained in the Finally section, when the websocket ends it raises an exception which leads to the try/except breaking out and never unsubscribing.

@EdgyEdgemond
Copy link

It's not nice, but you can work around broadcast and maintain existing websocket behaviour like so.

async def ws_sender(websocket):
    exc = None
    async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
        try:
            async for event in subscriber:
                await websocket.send_text(event.message)
        except Exception as e:
           exc = e
    if exc:
        raise exc

tsotnesharvadze added a commit to tsotnesharvadze/broadcaster that referenced this issue Oct 24, 2023
@alex-oleshkevich alex-oleshkevich mentioned this issue Apr 3, 2024
19 tasks
@alex-oleshkevich
Copy link
Member

Fixed in #112

alex-oleshkevich pushed a commit to alex-oleshkevich/broadcaster that referenced this issue Apr 22, 2024
alex-oleshkevich added a commit that referenced this issue Apr 22, 2024
* Fixed #55 - Not Close Subscribing

* Added Redis Stream backend

* Fixed Linting Test

* Added Test For Redis Stream Backend

* align with master

* update docs

---------

Co-authored-by: tsotne <[email protected]>
Co-authored-by: Tom Christie <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants