Skip to content

Commit

Permalink
Fix async (#2673)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvora-h authored Mar 29, 2023
1 parent 5acbde3 commit ef3f086
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 29 deletions.
10 changes: 3 additions & 7 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1227,13 +1227,9 @@ async def immediate_execute_command(self, *args, **options):
command_name, self.shard_hint
)
self.connection = conn
try:
return await asyncio.shield(
self._try_send_command_parse_response(conn, *args, **options)
)
except asyncio.CancelledError:
await conn.disconnect()
raise
return await asyncio.shield(
self._try_send_command_parse_response(conn, *args, **options)
)

def pipeline_execute_command(self, *args, **options):
"""
Expand Down
45 changes: 23 additions & 22 deletions tests/test_asyncio/test_cwe_404.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,34 +88,35 @@ async def test_standalone_pipeline(delay):
addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
)
await dp.start()
async with Redis(host="localhost", port=5380) as r:
await r.set("foo", "foo")
await r.set("bar", "bar")
for b in [True, False]:
async with Redis(host="localhost", port=5380, single_connection_client=b) as r:
await r.set("foo", "foo")
await r.set("bar", "bar")

pipe = r.pipeline()
pipe = r.pipeline()

pipe2 = r.pipeline()
pipe2.get("bar")
pipe2.ping()
pipe2.get("foo")
pipe2 = r.pipeline()
pipe2.get("bar")
pipe2.ping()
pipe2.get("foo")

t = asyncio.create_task(pipe.get("foo").execute())
await asyncio.sleep(delay)
t.cancel()
t = asyncio.create_task(pipe.get("foo").execute())
await asyncio.sleep(delay)
t.cancel()

pipe.get("bar")
pipe.ping()
pipe.get("foo")
pipe.reset()
pipe.get("bar")
pipe.ping()
pipe.get("foo")
pipe.reset()

assert await pipe.execute() is None
assert await pipe.execute() is None

# validating that the pipeline can be used as it could previously
pipe.get("bar")
pipe.ping()
pipe.get("foo")
assert await pipe.execute() == [b"bar", True, b"foo"]
assert await pipe2.execute() == [b"bar", True, b"foo"]
# validating that the pipeline can be used as it could previously
pipe.get("bar")
pipe.ping()
pipe.get("foo")
assert await pipe.execute() == [b"bar", True, b"foo"]
assert await pipe2.execute() == [b"bar", True, b"foo"]

await dp.stop()

Expand Down

0 comments on commit ef3f086

Please sign in to comment.