-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asyncio client creates RuntimeError on parallel requests #245
Comments
I think this is similar to the non-async client. If you create a non-async client and use multiple threads to call it, there would be similar exceptions. For both the async and non-async client usage, I think the users should have a high-level wrapper, like a connection pool, to access it. Maybe this high-level wrapper should be built in with thriftpy2. Finally, we should mention this in the documentation. |
A note in the documentation would be great, yes. I am currently not quite sure how the connection pool should look like, but I am also not very familiar with the message format of thrift. Every packet appears to have a seqid, which I assume can be used to track which reply belongs to which request, is this correct? In this case I would extend the |
This is not a thrift protocol issue, but a TCP issue. TCP is stream-oriented, not packet-oriented. When you try to send or receive a thrift packet, TCP does not ensure that it will process the whole packet, so you may send or receive part of a packet sequentially. Thus, the seqid of this packet can't be used here.
It's more complicated than that. If you create a single reader, the remote server may close the underlying TCP connection immediately after processing one request, or after some idle time, or for any other reason. This depends on how the server's code is implemented. I suggest that, if performance is not a concern, you had better create a new client every time before sending a request, and close it after receiving the response. This is the simplest way to avoid the complexity. Or, you should Google or ask ChatGPT how to implement a connection pool, as it may involve some complex policies to adapt to common usage. So I advise updating the documentation, or implementing an example connection pool in the documentation, before we add it to the code base. |
I am not saying anywhere, that it is a protocol issue, but rather asking if the seqid can be reliably used to track requests (as I found out by testing myself: yes). This is not a TCP problem, rather an asyncio problem. Asyncio always operates on streams and handles TCP fragmentation internally. In fact I built a custom protocol layer using a serial link and get the same error behaviour. Especially in asyncio environments it is expected that parallel requests just work (worst case serialising using locks), as that's the whole point of the asyncio abstraction layer.
Fair point, but in that case the current client will also behave pretty badly and require constant manual requests.
I would recommend being a bit more open to bug reports you receive and not assuming the worst about the people informing about and trying to fix problems in your project. I now implemented my own parallelism by extending import asyncio
import logging
from thriftpy2.contrib.aio.client import TAsyncClient
from thriftpy2.thrift import TApplicationException, TMessageType, args_to_kwargs
log = logging.getLogger(__name__)
class TParallelAsyncClient(TAsyncClient):
def __init__(self, service, iprot, oprot=None):
super().__init__(service, iprot, oprot)
self._open_requests: dict[int, asyncio.Future] = {}
self._message_processor: asyncio.Task | None = None
async def _req(self, _api, *args, **kwargs):
try:
service_args = getattr(self._service, _api + "_args")
kwargs = args_to_kwargs(service_args.thrift_spec, *args, **kwargs)
except ValueError as e:
raise TApplicationException(
TApplicationException.UNKNOWN_METHOD,
"missing required argument {arg} for {service}.{api}".format(
arg=e.args[0], service=self._service.__name__, api=_api
),
)
fut = await self._send(_api, **kwargs)
if fut is not None:
self._process_messages()
return await fut
async def _send(self, _api, **kwargs) -> asyncio.Future | None:
oneway = getattr(getattr(self._service, _api + "_result"), "oneway")
msg_type = TMessageType.ONEWAY if oneway else TMessageType.CALL
seq_id = self._get_seqid()
self._oprot.write_message_begin(_api, msg_type, seq_id)
args = getattr(self._service, _api + "_args")()
for k, v in kwargs.items():
setattr(args, k, v)
self._oprot.write_struct(args)
self._oprot.write_message_end()
await self._oprot.trans.flush()
log.debug("Sent seqid %d: %s", seq_id, _api)
if oneway:
return None
else:
self._open_requests[seq_id] = asyncio.Future()
return self._open_requests[seq_id]
def _process_messages(self):
if self._message_processor is None or self._message_processor.done():
self._message_processor = asyncio.create_task(self._message_handler())
async def _message_handler(self):
while self._open_requests:
fname, mtype, rseqid = await self._iprot.read_message_begin()
log.debug("Reply for seqid %d: %s %s", rseqid, fname, mtype)
fut = self._open_requests.pop(rseqid, None)
if fut is None:
log.error("Received message with unknown seqid %d", rseqid)
try:
fut.set_result(await self._process_message(fname, mtype))
except Exception as e:
fut.set_exception(e)
async def _process_message(self, fname, mtype):
"""process a single message"""
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
await self._iprot.read_struct(x)
await self._iprot.read_message_end()
raise x
result = getattr(self._service, fname + "_result")()
await self._iprot.read_struct(result)
await self._iprot.read_message_end()
if hasattr(result, "success") and result.success is not None:
return result.success
# void api without throws
if len(result.thrift_spec) == 0:
return
# check throws
for k, v in result.__dict__.items():
if k != "success" and v:
raise v
# no throws & not void api
if hasattr(result, "success"):
raise TApplicationException(TApplicationException.MISSING_RESULT)
def _get_seqid(self) -> int:
seq_id = self._seqid
self._seqid += 1
return seq_id
def close(self):
if self._message_processor is not None and not self._message_processor.done():
self._message_processor.cancel()
self._message_processor = None
super().close() |
I'm not good at English, and all the comments I made above were translated by ChatGPT. I'm reading your comments via ChatGPT too, so there may be some contextual misunderstandings, but I have no assumptions about any people. The suggestions above (use short connections or write your own connection pool) were also made to my colleagues. |
Ok, sorry for my harsh response. For me ChatGPT is a useful tool, but not for programming, in general I got the impression of a "you should know that, don't ask me about the basic things". But I might have read too much into that, and translations can easily lead to these types of problems. Regarding the actual problem. The asyncio code is in a contrib folder, so I assume it was provided by someone else? I can create a PR adding the changes above the the TAsyncClient class. This allows multiplexing of multiple requests over a single connection (as long as that one is alive, but no changes there). It has one part I don't like, and that's the spawning of an extra task to handle the messages. It should be shut down properly all the time, but errors within this task can not be properly handled, as there is no caller. If you (or anyone else) have a suggestion on how to deal with this situation I would be glad. |
The whole project is community contribution, and I'll check the codes you provide above tomorrow or a few days latter, thank for reporting this! |
I encountered the same error. Will the error be fixed in thriftpy2? |
I am working on an app where I want make multiple parallel requests to a server. As some of these request require time for processing the client waits for multiple replies in parallel. In this situation (at least) the asyncio code crashes with a
RuntimeError: read() called while another coroutine is already waiting for incoming data
.The error can be easily replicated by changing the
asyncio_echo
example slightly. Instead of the single call toclient.echo
parallel calls can be made usingasyncio.gather
. The server already introduces a slight delay, triggering the bug.The code at least needs a lock somewhere to block the parallel reads. This would however prevent parallelised calls to the server. I am also happy to look into a solution for this, but would like to get some pointers on where to start.
Full error message:
The text was updated successfully, but these errors were encountered: