-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Switch the JSON byte producer from a pull to a push producer #8116
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Iteratively encode JSON to avoid blocking the reactor. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect): | |
pass | ||
|
||
|
||
@implementer(interfaces.IPullProducer) | ||
@implementer(interfaces.IPushProducer) | ||
class _ByteProducer: | ||
""" | ||
Iteratively write bytes to the request. | ||
|
@@ -515,52 +515,62 @@ def __init__( | |
): | ||
self._request = request | ||
self._iterator = iterator | ||
self._paused = False | ||
|
||
def start(self) -> None: | ||
self._request.registerProducer(self, False) | ||
# Register the producer and start producing data. | ||
self._request.registerProducer(self, True) | ||
self.resumeProducing() | ||
|
||
def _send_data(self, data: List[bytes]) -> None: | ||
""" | ||
Send a list of strings as a response to the request. | ||
Send a list of bytes as a chunk of a response. | ||
""" | ||
if not data: | ||
return | ||
self._request.write(b"".join(data)) | ||
|
||
def pauseProducing(self) -> None: | ||
self._paused = True | ||
|
||
def resumeProducing(self) -> None: | ||
# We've stopped producing in the meantime (note that this might be | ||
# re-entrant after calling write). | ||
if not self._request: | ||
return | ||
|
||
# Get the next chunk and write it to the request. | ||
# | ||
# The output of the JSON encoder is coalesced until min_chunk_size is | ||
# reached. (This is because JSON encoders produce a very small output | ||
# per iteration.) | ||
# | ||
# Note that buffer stores a list of bytes (instead of appending to | ||
# bytes) to hopefully avoid many allocations. | ||
buffer = [] | ||
buffered_bytes = 0 | ||
while buffered_bytes < self.min_chunk_size: | ||
try: | ||
data = next(self._iterator) | ||
buffer.append(data) | ||
buffered_bytes += len(data) | ||
except StopIteration: | ||
# The entire JSON object has been serialized, write any | ||
# remaining data, finalize the producer and the request, and | ||
# clean-up any references. | ||
self._send_data(buffer) | ||
self._request.unregisterProducer() | ||
self._request.finish() | ||
self.stopProducing() | ||
return | ||
|
||
self._send_data(buffer) | ||
self._paused = False | ||
|
||
# Write until there's backpressure telling us to stop. | ||
while not self._paused: | ||
# Get the next chunk and write it to the request. | ||
# | ||
# The output of the JSON encoder is coalesced until min_chunk_size is | ||
# reached. (This is because JSON encoders produce a very small output | ||
# per iteration.) | ||
# | ||
# Note that buffer stores a list of bytes (instead of appending to | ||
# bytes) to hopefully avoid many allocations. | ||
buffer = [] | ||
buffered_bytes = 0 | ||
while buffered_bytes < self.min_chunk_size: | ||
try: | ||
data = next(self._iterator) | ||
buffer.append(data) | ||
buffered_bytes += len(data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at the code this is basically what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue is that the The goal of this code is to avoid There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh gah I hate that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment to that effect please? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I expanded the comment above this section in 2afe301. |
||
except StopIteration: | ||
# The entire JSON object has been serialized, write any | ||
# remaining data, finalize the producer and the request, and | ||
# clean-up any references. | ||
self._send_data(buffer) | ||
self._request.unregisterProducer() | ||
self._request.finish() | ||
self.stopProducing() | ||
return | ||
|
||
self._send_data(buffer) | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def stopProducing(self) -> None: | ||
# Clear a circular reference. | ||
self._request = None | ||
|
||
|
||
|
@@ -620,8 +630,7 @@ def respond_with_json( | |
if send_cors: | ||
set_cors_headers(request) | ||
|
||
producer = _ByteProducer(request, encoder(json_object)) | ||
producer.start() | ||
_ByteProducer(request, encoder(json_object)) | ||
return NOT_DONE_YET | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely looks like an infinite loop, but I don't think it is (see a previous discussion about this description here). I think when we call
write
we give up control to the reactor which might callpauseProducing
before control is returned toresumeProducing
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you call
write()
you go through theRequest
->HTTPChannel
->iocpreactor.tcp.Connection
(this isHTTPChannel.transport
) ->iocpreactor.abstract.FileHandle
, which might causeproducer.pauseProducing
(which goes back through the chain in the opposite direction, essentially).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does mean that if we write slower than the TCP buffer drains we will write everything in one go. I don't know whether we want to put some limit to the amount written in one reactor tick? Maybe something like 64KB (or even 512KB)? A reactor tick generally takes less than 1ms, so those values would mean maxing out at ~64MB/s (or 512MB/s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, but
FileHandle
has a write buffer of 64KB and doesn't actually pull from the buffer until a reactor tick, so we don't need to worry about it. The flow becomes: 1) write data, 2) pause producing is called if over 64kb of data has been written, 3) we exit the loop until theFileHandle
buffer has been drained.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my understanding as well! 👍 This is some fun spelunking into Twisted.