From 3affe2a0af445a01acb75181b16e71eb9fef4ae2 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 5 Jun 2018 17:27:09 -0700 Subject: [PATCH] fix(http2): send trailers if Payload includes them --- src/proto/h2/mod.rs | 111 +++++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 43 deletions(-) diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 4baad8a400..2cfb475f23 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -30,6 +30,7 @@ where S: Payload, { body_tx: SendStream>, + data_done: bool, stream: S, } @@ -40,9 +41,23 @@ where fn new(stream: S, tx: SendStream>) -> PipeToSendStream { PipeToSendStream { body_tx: tx, + data_done: false, stream: stream, } } + + fn on_err(&mut self, err: S::Error) -> ::Error { + let err = ::Error::new_user_body(err); + trace!("send body user stream error: {}", err); + self.body_tx.send_reset(Reason::INTERNAL_ERROR); + err + } + + fn send_eos_frame(&mut self) -> ::Result<()> { + trace!("send body eos"); + self.body_tx.send_data(SendBuf(None), true) + .map_err(::Error::new_body_write) + } } impl Future for PipeToSendStream @@ -54,49 +69,59 @@ where fn poll(&mut self) -> Poll { loop { - // TODO: make use of flow control on SendStream - // If you're looking at this and thinking of trying to fix this TODO, - // you may want to look at: - // https://docs.rs/h2/0.1.*/h2/struct.SendStream.html - // - // With that doc open, we'd want to do these things: - // - check self.body_tx.capacity() to see if we can send *any* data - // - if > 0: - // - poll self.stream - // - reserve chunk.len() more capacity (because its about to be used)? - // - send the chunk - // - else: - // - try reserve a smallish amount of capacity - // - call self.body_tx.poll_capacity(), return if NotReady - match self.stream.poll_data() { - Ok(Async::Ready(Some(chunk))) => { - let is_eos = self.stream.is_end_stream(); - trace!( - "send body chunk: {} bytes, eos={}", - chunk.remaining(), - is_eos, - ); - - let buf = SendBuf(Some(chunk)); - self.body_tx.send_data(buf, is_eos) - .map_err(::Error::new_body_write)?; - - if is_eos { - return Ok(Async::Ready(())) - } - }, - Ok(Async::Ready(None)) => { - trace!("send body eos"); - self.body_tx.send_data(SendBuf(None), true) - .map_err(::Error::new_body_write)?; - return Ok(Async::Ready(())); - }, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => { - let err = ::Error::new_user_body(err); - trace!("send body user stream error: {}", err); - self.body_tx.send_reset(Reason::INTERNAL_ERROR); - return Err(err); + if !self.data_done { + // TODO: make use of flow control on SendStream + // If you're looking at this and thinking of trying to fix this TODO, + // you may want to look at: + // https://docs.rs/h2/0.1.*/h2/struct.SendStream.html + // + // With that doc open, we'd want to do these things: + // - check self.body_tx.capacity() to see if we can send *any* data + // - if > 0: + // - poll self.stream + // - reserve chunk.len() more capacity (because its about to be used)? + // - send the chunk + // - else: + // - try reserve a smallish amount of capacity + // - call self.body_tx.poll_capacity(), return if NotReady + match try_ready!(self.stream.poll_data().map_err(|e| self.on_err(e))) { + Some(chunk) => { + let is_eos = self.stream.is_end_stream(); + trace!( + "send body chunk: {} bytes, eos={}", + chunk.remaining(), + is_eos, + ); + + let buf = SendBuf(Some(chunk)); + self.body_tx.send_data(buf, is_eos) + .map_err(::Error::new_body_write)?; + + if is_eos { + return Ok(Async::Ready(())) + } + }, + None => { + let is_eos = self.stream.is_end_stream(); + if is_eos { + return self.send_eos_frame().map(Async::Ready); + } else { + self.data_done = true; + // loop again to poll_trailers + } + }, + } + } else { + match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_err(e))) { + Some(trailers) => { + self.body_tx.send_trailers(trailers) + .map_err(::Error::new_body_write)?; + return Ok(Async::Ready(())); + }, + None => { + // There were no trailers, so send an empty DATA frame... + return self.send_eos_frame().map(Async::Ready); + }, } } }