Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline execute in async blocks as it packs the commands #3416

Open
rad-pat opened this issue Oct 17, 2024 · 1 comment
Open

Pipeline execute in async blocks as it packs the commands #3416

rad-pat opened this issue Oct 17, 2024 · 1 comment

Comments

@rad-pat
Copy link

rad-pat commented Oct 17, 2024

Version:
redis-py: 5.11

Platform:
Python 3.12/Ubuntu 24.04

Description:
Executing a pipeline of many commands in asyncio blocks whilst it packs the commands

all_cmds = connection.pack_commands(

I'm not really sure how to build a repl to demonstrate the issue, but we have a case where we are sending many (200k) commands to a pipeline as a transaction, it blocks for several seconds.

@rad-pat
Copy link
Author

rad-pat commented Nov 15, 2024

For anyone interested, I created a custom Pipeline class and override the _execute_transaction method adding some releases back to the event loop. I would make a PR, but having commented on other PRs previously, not much progress seems to happen with them.

from redis.asyncio.client import Pipeline, Connection, CommandStackT, CommandT, ResponseError, EMPTY_RESPONSE, ExecAbortError, WatchError
import inspect

class AsyncPipeline(Pipeline):
    async def _execute_transaction(  # noqa: C901
            self, connection: Connection, commands: CommandStackT, raise_on_error
    ):
        pre: CommandT = (("MULTI",), {})
        post: CommandT = (("EXEC",), {})
        cmds = (pre, *commands, post)
        # Change 1 - run pack_commands in a thread
        all_cmds = await asyncio.to_thread(
            connection.pack_commands,
            (args for args, options in cmds if EMPTY_RESPONSE not in options)
        )
        await connection.send_packed_command(all_cmds)
        errors = []

        # parse off the response for MULTI
        # NOTE: we need to handle ResponseErrors here and continue
        # so that we read all the additional command messages from
        # the socket
        try:
            await self.parse_response(connection, "_")
        except ResponseError as err:
            errors.append((0, err))

        # and all the other commands
        for i, command in enumerate(commands):
            if EMPTY_RESPONSE in command[1]:
                errors.append((i, command[1][EMPTY_RESPONSE]))
            else:
                try:
                    await self.parse_response(connection, "_")
                except ResponseError as err:
                    self.annotate_exception(err, i + 1, command[0])
                    errors.append((i, err))
            # Change 2 - Release back to event loop to prevent blocking
            if i % 100 == 0:
                await asyncio.sleep(0)

        # parse the EXEC.
        try:
            response = await self.parse_response(connection, "_")
        except ExecAbortError as err:
            if errors:
                raise errors[0][1] from err
            raise

        # EXEC clears any watched keys
        self.watching = False

        if response is None:
            raise WatchError("Watched variable changed.") from None

        # put any parse errors into the response
        for i, e in errors:
            response.insert(i, e)

        if len(response) != len(commands):
            if self.connection:
                await self.connection.disconnect()
            raise ResponseError(
                "Wrong number of response items from pipeline execution"
            ) from None

        # find any errors in the response and raise if necessary
        if raise_on_error:
            self.raise_first_error(commands, response)
        # We have to run response callbacks manually
        data = []
        # Change 3 - Enumerate and then Release back to event loop to prevent blocking
        for i, (r, cmd) in enumerate(zip(response, commands)):
            if not isinstance(r, Exception):
                args, options = cmd
                command_name = args[0]
                if command_name in self.response_callbacks:
                    r = self.response_callbacks[command_name](r, **options)
                    if inspect.isawaitable(r):
                        r = await r
            data.append(r)
            if i % 100 == 0:
                await asyncio.sleep(0)
        return data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant