Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framer's sequential reads of frame header then payload can leave underlying async stream's @read_buffer in a corrupted state #14

Open
fables-tales opened this issue Nov 3, 2023 · 16 comments · Fixed by socketry/async-io#72 or #15 · May be fixed by #19
Assignees
Labels
bug Something isn't working

Comments

@fables-tales
Copy link

fables-tales commented Nov 3, 2023

Consider a call flow like:

  1. @read_buffer in our underlying async/io/stream contains exactly 9 bytes.
  2. read_frame (takes 9 bytes off the underlying @read_buffer in consume_read_buffer) and this completely drains @read_buffer
  3. successfully gets the header
  4. reads payload, times out, @read_buffer is still empty, we do not parse the frame, and exit the call flow.
  5. retry read_frame with a higher timeout
  6. we enter read_header again, which will call fill_read_buffer (fills buffer with ~thousands of bytes)
  7. @read_buffer in the underlying stream now contains the payload of the previous frame, instead of a valid frame header, and we get a protocol error.

I think in this case the "right" thing to do is put the 9 bytes back in the read buffer, or hold the frame header and retry reading the payload, instead of trying to read the header out of what is certainly payload.

def read_frame(maximum_frame_size = MAXIMUM_ALLOWED_FRAME_SIZE)
  # Read the header:
  length, type, flags, stream_id = read_header <- second time we come here, we're reading payload bytes, not header bytes
				
  # Async.logger.debug(self) {"read_frame: length=#{length} type=#{type} flags=#{flags} stream_id=#{stream_id} -> klass=#{@frames[type].inspect}"}
				
  # Allocate the frame:
  klass = @frames[type] || Frame
  frame = klass.new(stream_id, flags, type, length)
				
  # Read the payload:
  frame.read(@stream, maximum_frame_size) <- timeout occurs here
				
  # Async.logger.debug(self, name: "read") {frame.inspect}
				
  return frame
end
@fables-tales fables-tales changed the title Framer's sequential reads of frame header then payload can leave input buffer in a corrupted state Framer's sequential reads of frame header then payload can leave underlying async stream's @read_buffer in a corrupted state Nov 3, 2023
@penelope-stripe
Copy link

require "protocol/http2/data_frame"
require "stringio"
require "protocol/http2/framer"
require "async/reactor"

class FunkyIO
  def initialize
    @f = Protocol::HTTP2::DataFrame.new(401, 0, Protocol::HTTP2::DataFrame::TYPE, 13, "a" * 13)
    @sio = StringIO.new
    @f.write_header(@sio)
    @sio.rewind

    @state = :yield_first_header
  end

  def read(size, buf = nil)
    case @state
    when :yield_first_header
      res = @sio.read(size, buf)
      @state = :now_timeout
      res
    when :now_timeout
      @state = :write_payload
      raise Async::TimeoutError
    when :write_payload
      @sio = StringIO.new
      @f.write_payload(@sio)
      @sio.rewind
      @sio.read(size, buf)
    end
  end
end

f = Protocol::HTTP2::Framer.new(FunkyIO.new)
Async::Reactor.run do
  begin
    p(f.read_frame)
  rescue Async::TimeoutError
    # try again
  end

  p(f.read_frame)
end

this script minimally reproduces the bug

@ioquatix
Copy link
Member

ioquatix commented Nov 3, 2023

Wow, nice find, I'll sort this out right away! Thanks!

@ioquatix
Copy link
Member

retry read_frame with a higher timeout

Do you mind explaining in what situation you are retrying? I would assume that if the operation failed, you'd give up completely.

@maruth-stripe
Copy link

Do you mind explaining in what situation you are retrying? I would assume that if the operation failed, you'd give up completely

If the operation fails we would have to throw away the connection since the connection is left in a corrupted state (there's now a H/2 payload on the wire with no header, which is garbage for all intents and purposes.).

Currently, Request 1 timing out on reading a payload off the wire means any future reads off the wire are wrecked. We want to use the same connection for as long as possible. Having to re-establish a connection every time a read times out is quite toilsome.

Shopify has also seen this precise bug occur while using async-http (cc @dwdking)

We have had the patches in the PRs I've made deployed at Stripe for a couple of months now. Before the patch we were experiencing an incredibly high number of errors from this issue every day, the patch brought it down to ~0.

@ioquatix
Copy link
Member

ioquatix commented Jan 23, 2024

That makes sense and I understand the value of the related PRs. However, if timeout is a problem, why not increase the timeout too? It sounds like you are having a timeout while reading the frame, then retrying if timeout occurs. Maybe it would be more logical to increase the timeout, e.g. in your case 2x or 3x? At least my intention with the timeout is as a last ditch effort and retrying the operation would not make sense after a timeout occurred (the connection could be in a funky state as you've correctly outlined).

@maruth-stripe
Copy link

Increasing the timeout is not always feasible, since the timeout is determined by set of constraints the system must meet.

There are a couple of use-cases for wanting the connection to remain in a healthy state after timeout:

  1. Multiple streams: If we have multiple streams on a connection, one stream exceeding its timeout should not result in the connection being abandoned -- hence penalizing all the other streams which may have been completed within their respective timeouts.
  2. Correctness, Strong invariants: Makes reasoning significantly easier from a correctness point of view since you get a strong invariant regarding connection corruption. (I’ll come back to this again in a bit)
  3. Retry with backoff: Sometimes one may want to retry a request after some backoff in case the server we’re making requests to is experiencing pressure. Ideally, without re-establishing a connection

The current behavior basically ends up necessitating throwing away the connection upon timeout. However, this is currently (1) not explicit in documentation, (2) not clear from the API, and (3) not something the library protects against. What we end up with is having every callsite to read_frame become

begin
  stream.read_frame
rescue Async::TimeoutError
  # throw away connection
end

which is (a) very toilsome and ugly, (b) non-trivial to get right.

In summary, the problems faced were the following:

  • We experienced correctness issues that took significant developer effort to debug. The error manifested as FrameSizeErrors, and tracing that back to a corrupted connection from timeouts is not obvious.
  • Always giving up on the connection is not the best for performance
  • timeout’s being (implicitly) fatal makes development and debugging difficult

The proposed fixes provide a strong correctness invariant, while allowing connections to persist.

@ioquatix
Copy link
Member

Thanks for the clear explanation, it makes sense. I agree, the invariant makes sense.

@ioquatix
Copy link
Member

Okay, I'm planning to work on this here: #19

@ioquatix ioquatix linked a pull request Jun 10, 2024 that will close this issue
3 tasks
@ioquatix ioquatix reopened this Jun 10, 2024
@ioquatix
Copy link
Member

@maruth-stripe are you using async-http on top of protocol-http2 or something else?

@fables-tales
Copy link
Author

(I work with maruth) we built our own wrapper around protocol-http2 that uses async for a gRPC client that has some fairly specific constraints because the Stripe codebase is 30 million lines of Ruby with about 2000 active ruby developers.

@ioquatix
Copy link
Member

Thanks @fables-tales for the clarification.

What I'm trying to understand is what scenario you are re-entering the Framer#read_frame.

In Async::HTTP::Protocol::HTTP2::Connection, we have a single background task reading frames and invoking the correct logic.

Can you help answer a few questions for me?

  1. Are you assuming Framer#read_frame is re-entrant or safe to call from multiple tasks?
  2. Do you have a single task invoking Framer#read_frame like https://github.com/socketry/async-http/blob/d0894a0e1c9d7af40cbf2fa82716dea8b422c4ba/lib/async/http/protocol/http2/connection.rb#L89-L92 or are you doing something different?
  3. Is it only timeouts causing the problem, or are there other issues?
  4. Even if we read full frames (I agree, if we can do so efficiently, it's a good idea), does it matter that Headers + Continuation frames may be read in separate operations?

@fable-stripe
Copy link

@maruth-stripe keep me honest here but I don't think we are re-entering read frame, we specifically added code to check against that.

the core behind our code looks something like:

c = grpc_connection(some_hostname)
handle = c.some_rpc(some_data)
handle.blocking_response_iterator(timeout) do |decoded_message|
...
end

blocking_response_iterator then starts a task within a reactor that calls read_frame and yields messages. We do not capture the return value of read_frame, but instead use the process_* family of methods to update state, and blocking_response iterator will yield a message (or timeout) if a data frame is read.

@ioquatix
Copy link
Member

Okay, so for my understanding, you aren't multiplexing requests on a single connection and instead depending on sequential processing of frames for each stream until the stream is done?

@fables-tales
Copy link
Author

multiplexing is possible:

c = connect(some_host)
a = c.rpc_1(data)
b = c.rpc_2(data)
a.blocking_response_iterator.each.take(3) do
end
b.blocking_response_iterator.each do
  ...
end
a.blocking_response_iterator.each do
end

is a pattern we support

@ioquatix
Copy link
Member

What do you do if you receive a frame for a different stream? Are you multiplexing using a queue for each stream or something like that?

@fables-tales
Copy link
Author

yes, if b's blocking response iterator is running and we receive a message for a we store it in a queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment