Skip to content

Commit

Permalink
Add test for parse resumability
Browse files Browse the repository at this point in the history
  • Loading branch information
kristjanvalur committed Dec 13, 2022
1 parent 9c6261a commit a38597f
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest

import redis
from redis.asyncio.connection import (
Connection,
PythonParser,
Expand Down Expand Up @@ -112,3 +113,75 @@ async def test_connect_timeout_error_without_retry():
await conn.connect()
assert conn._connect.call_count == 1
assert str(e.value) == "Timeout connecting to server"


class TestError(BaseException):
pass


class InterruptingReader:
"""
A class simulating an asyncio input buffer, but raising a
special exception every other read.
"""

def __init__(self, data):
self.data = data
self.counter = 0
self.pos = 0

def tick(self):
self.counter += 1
# return
if (self.counter % 2) == 0:
raise TestError()

async def read(self, want):
self.tick()
want = 5
result = self.data[self.pos:self.pos + want]
self.pos += len(result)
return result

async def readline(self):
self.tick()
find = self.data.find(b"\n", self.pos)
if find >= 0:
result = self.data[self.pos:find + 1]
else:
result = self.data[self.pos:]
self.pos += len(result)
return result

async def readexactly(self, length):
self.tick()
result = self.data[self.pos:self.pos + length]
if len(result) < length:
raise asyncio.IncompleteReadError(result, None)
self.pos += len(result)
return result


async def test_connection_parse_response_resume(r: redis.Redis):
"""
This test verifies that the Connection parser,
be that PythonParser or HiredisParser,
can be interrupted at IO time and then resume parsing.
"""
conn = Connection(**r.connection_pool.connection_kwargs)
await conn.connect()
message = (b"*3\r\n$7\r\nmessage\r\n$8\r\nchannel1\r\n"
b"$25\r\nhi\r\nthere\r\n+how\r\nare\r\nyou\r\n")

conn._parser._stream = InterruptingReader(message)
for i in range(100):
try:
response = await conn.read_response()
break
except TestError:
pass

else:
pytest.fail("didn't receive a response")
assert response
assert i > 0

0 comments on commit a38597f

Please sign in to comment.