Skip to content
This repository was archived by the owner on Apr 16, 2020. It is now read-only.

Commit d9b3140

Browse files
authored
Respect flow control when flushing writes (#26)
Previously, writing a body did not respect HTTP/2.0 flow control. This resulted in unbounded buffering. This patch updates the library to respect flow control when writing the streaming body. This also adds tests 🎉
1 parent 1ce770c commit d9b3140

File tree

11 files changed

+461
-39
lines changed

11 files changed

+461
-39
lines changed

Diff for: .travis.yml

+4
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ matrix:
1313
- rust: nightly
1414

1515
script:
16+
# Run lib and doc tests
1617
- cargo test
1718

19+
# Run integration tests
20+
- cargo test -p tests
21+
1822
deploy:
1923
provider: pages
2024
skip_cleanup: true

Diff for: Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ authors = [
88
description = "Provides an HTTP/2.0 Tower Service"
99
publish = false
1010

11+
[workspace]
12+
13+
members = [
14+
"tests",
15+
]
16+
1117
[dependencies]
1218
bytes = "0.4"
1319
futures = "0.1"

Diff for: src/buf.rs

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use bytes::Buf;
2+
3+
pub struct SendBuf<T>
4+
{
5+
inner: Option<T>,
6+
}
7+
8+
impl<T: Buf> SendBuf<T> {
9+
pub fn new(buf: T) -> SendBuf<T> {
10+
SendBuf { inner: Some(buf) }
11+
}
12+
13+
pub fn none() -> SendBuf<T> {
14+
SendBuf { inner: None }
15+
}
16+
}
17+
18+
impl<T: Buf> Buf for SendBuf<T> {
19+
fn remaining(&self) -> usize {
20+
match self.inner {
21+
Some(ref v) => v.remaining(),
22+
None => 0,
23+
}
24+
}
25+
26+
fn bytes(&self) -> &[u8] {
27+
match self.inner {
28+
Some(ref v) => v.bytes(),
29+
None => &[],
30+
}
31+
}
32+
33+
fn advance(&mut self, cnt: usize) {
34+
match self.inner {
35+
Some(ref mut v) => v.advance(cnt),
36+
None => {}
37+
}
38+
}
39+
}

Diff for: src/client/background.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use Body;
2+
use buf::SendBuf;
23
use flush::Flush;
34

5+
use bytes::IntoBuf;
46
use futures::{Future, Poll};
57
use h2::client::Connection;
68
use tokio_io::{AsyncRead, AsyncWrite};
@@ -18,7 +20,7 @@ where S: Body,
1820
enum Task<T, S>
1921
where S: Body,
2022
{
21-
Connection(Connection<T, S::Data>),
23+
Connection(Connection<T, SendBuf<<S::Data as IntoBuf>::Buf>>),
2224
Flush(Flush<S>),
2325
}
2426

@@ -27,7 +29,7 @@ where S: Body,
2729
impl<T, S> Background<T, S>
2830
where S: Body,
2931
{
30-
pub(crate) fn connection(connection: Connection<T, S::Data>) -> Self {
32+
pub(crate) fn connection(connection: Connection<T, SendBuf<<S::Data as IntoBuf>::Buf>>) -> Self {
3133
let task = Task::Connection(connection);
3234
Background { task }
3335
}

Diff for: src/client/connection.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use {Body, RecvBody};
22
use super::Background;
3+
use buf::SendBuf;
34
use flush::Flush;
45

56
use bytes::IntoBuf;
@@ -18,7 +19,7 @@ use std::marker::PhantomData;
1819
pub struct Connection<T, E, S>
1920
where S: Body,
2021
{
21-
client: SendRequest<S::Data>,
22+
client: SendRequest<SendBuf<<S::Data as IntoBuf>::Buf>>,
2223
executor: E,
2324
_p: PhantomData<(T, S)>,
2425
}
@@ -27,7 +28,7 @@ where S: Body,
2728
pub struct Handshake<T, E, S>
2829
where S: Body,
2930
{
30-
inner: h2::client::Handshake<T, S::Data>,
31+
inner: h2::client::Handshake<T, SendBuf<<S::Data as IntoBuf>::Buf>>,
3132
executor: E,
3233
}
3334

@@ -80,7 +81,9 @@ where S: Body,
8081
T: AsyncRead + AsyncWrite,
8182
{
8283
/// Builds Connection on an H2 client connection.
83-
pub fn new(client: SendRequest<S::Data>, executor: E) -> Self {
84+
pub(crate) fn new(client: SendRequest<SendBuf<<S::Data as IntoBuf>::Buf>>, executor: E)
85+
-> Self
86+
{
8487
let _p = PhantomData;
8588

8689
Connection {
@@ -195,7 +198,7 @@ where T: AsyncRead + AsyncWrite,
195198
S: Body,
196199
{
197200
/// Start an HTTP/2.0 handshake with the provided builder
198-
pub fn new(io: T, executor: E, builder: &Builder) -> Self {
201+
pub(crate) fn new(io: T, executor: E, builder: &Builder) -> Self {
199202
let inner = builder.handshake(io);
200203

201204
Handshake {
@@ -254,9 +257,9 @@ impl From<h2::Reason> for Error {
254257
impl fmt::Display for Error {
255258
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
256259
match self.kind {
257-
Kind::Inner(ref h2) =>
260+
Kind::Inner(ref h2) =>
258261
write!(f, "Error caused by underlying HTTP/2 error: {}", h2),
259-
Kind::Spawn =>
262+
Kind::Spawn =>
260263
write!(f, "Error spawning background task"),
261264
}
262265
}
@@ -292,12 +295,12 @@ impl fmt::Display for HandshakeError {
292295
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
293296
match *self {
294297
HandshakeError::Proto(ref h2) =>
295-
write!(f,
298+
write!(f,
296299
"An error occurred while attempting to perform the HTTP/2 \
297-
handshake: {}",
300+
handshake: {}",
298301
h2),
299302
HandshakeError::Execute =>
300-
write!(f,
303+
write!(f,
301304
"An error occurred while attempting to execute a worker \
302305
task."),
303306
}
@@ -321,5 +324,4 @@ impl error::Error for HandshakeError {
321324
"error attempting to execute a worker task",
322325
}
323326
}
324-
325327
}

Diff for: src/flush.rs

+56-22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use Body;
2+
use buf::SendBuf;
23

4+
use bytes::IntoBuf;
35
use futures::{Future, Poll, Async};
46
use h2::{self, SendStream};
57
use http::HeaderMap;
@@ -8,7 +10,7 @@ use http::HeaderMap;
810
pub(crate) struct Flush<S>
911
where S: Body,
1012
{
11-
h2: SendStream<S::Data>,
13+
h2: SendStream<SendBuf<<S::Data as IntoBuf>::Buf>>,
1214
body: S,
1315
state: FlushState,
1416
}
@@ -29,7 +31,9 @@ enum DataOrTrailers<B> {
2931
impl<S> Flush<S>
3032
where S: Body,
3133
{
32-
pub fn new(src: S, dst: SendStream<S::Data>) -> Self {
34+
pub fn new(src: S, dst: SendStream<SendBuf<<S::Data as IntoBuf>::Buf>>)
35+
-> Self
36+
{
3337
Flush {
3438
h2: dst,
3539
body: src,
@@ -39,41 +43,71 @@ where S: Body,
3943

4044
/// Try to flush the body.
4145
fn poll_complete(&mut self) -> Poll<(), h2::Error> {
42-
let mut first = try_ready!(self.poll_body());
46+
use self::DataOrTrailers::*;
4347

4448
loop {
45-
if let Some(DataOrTrailers::Data(buf)) = first {
46-
let second = self.poll_body()?;
47-
let eos = if let Async::Ready(None) = second {
48-
true
49-
} else {
50-
false
51-
};
52-
self.h2.send_data(buf, eos)?;
53-
if eos {
49+
match try_ready!(self.poll_body()) {
50+
Some(Data(buf)) => {
51+
let eos = self.body.is_end_stream();
52+
53+
self.h2.send_data(SendBuf::new(buf.into_buf()), eos)?;
54+
55+
if eos {
56+
self.state = FlushState::Done;
57+
return Ok(Async::Ready(()));
58+
}
59+
}
60+
Some(Trailers(trailers)) => {
61+
self.h2.send_trailers(trailers)?;
62+
return Ok(Async::Ready(()));
63+
}
64+
None => {
65+
// If this is hit, then an EOS was not reached via the other
66+
// paths. So, we must send an empty data frame with EOS.
67+
self.h2.send_data(SendBuf::none(), true)?;
68+
5469
return Ok(Async::Ready(()));
55-
} else if let Async::Ready(item) = second {
56-
first = item;
57-
} else {
58-
return Ok(Async::NotReady);
5970
}
60-
} else if let Some(DataOrTrailers::Trailers(trailers)) = first {
61-
self.h2.send_trailers(trailers)?;
62-
return Ok(Async::Ready(()));
63-
} else {
64-
return Ok(Async::Ready(()));
6571
}
6672
}
6773
}
6874

6975
/// Get the next message to write, either a data frame or trailers.
70-
fn poll_body(&mut self) -> Poll<Option<DataOrTrailers<S::Data>>, h2::Error> {
76+
fn poll_body(&mut self)
77+
-> Poll<Option<DataOrTrailers<S::Data>>, h2::Error>
78+
{
7179
loop {
7280
match self.state {
7381
FlushState::Data => {
82+
// Before trying to poll the next chunk, we have to see if
83+
// the h2 connection has capacity. We do this by requesting
84+
// a single byte (since we don't know how big the next chunk
85+
// will be.
86+
self.h2.reserve_capacity(1);
87+
88+
if self.h2.capacity() == 0 {
89+
// TODO: The loop should not be needed once
90+
// carllerche/h2#270 is fixed.
91+
loop {
92+
match try_ready!(self.h2.poll_capacity()) {
93+
Some(0) => {}
94+
Some(_) => break,
95+
None => {
96+
debug!("connection closed early");
97+
// The error shouldn't really matter at this
98+
// point as the peer has disconnected, the
99+
// error will be discarded anyway.
100+
return Err(h2::Reason::INTERNAL_ERROR.into());
101+
}
102+
}
103+
}
104+
}
105+
74106
if let Some(data) = try_ready!(self.body.poll_data()) {
75107
return Ok(Async::Ready(Some(DataOrTrailers::Data(data))));
76108
} else {
109+
// Release all capacity back to the connection
110+
self.h2.reserve_capacity(0);
77111
self.state = FlushState::Trailers;
78112
}
79113
}

Diff for: src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod client;
1414
pub mod server;
1515

1616
mod body;
17+
mod buf;
1718
mod flush;
1819
mod recv_body;
1920
mod service;

Diff for: src/server/mod.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use {flush, Body, RecvBody};
2+
use buf::SendBuf;
23

34
use tower_service::{NewService, Service};
45

6+
use bytes::IntoBuf;
57
use futures::{Async, Future, Poll, Stream};
68
use futures::future::{Executor, Either, Join, MapErr};
79
use h2::{self, Reason};
@@ -47,17 +49,17 @@ where T: AsyncRead + AsyncWrite,
4749
{
4850
/// Establish the HTTP/2.0 connection and get a service to process inbound
4951
/// requests.
50-
Init(Init<T, B::Data, S::Future, S::InitError>),
52+
Init(Init<T, SendBuf<<B::Data as IntoBuf>::Buf>, S::Future, S::InitError>),
5153

5254
/// Both the HTTP/2.0 connection and the service are ready.
5355
Ready {
54-
connection: Accept<T, B::Data>,
56+
connection: Accept<T, SendBuf<<B::Data as IntoBuf>::Buf>>,
5557
service: S::Service,
5658
},
5759

5860
/// The service has closed, so poll until connection is closed.
5961
GoAway {
60-
connection: Accept<T, B::Data>,
62+
connection: Accept<T, SendBuf<<B::Data as IntoBuf>::Buf>>,
6163
error: Error<S>,
6264
},
6365

@@ -84,7 +86,7 @@ enum BackgroundState<T, B>
8486
where B: Body,
8587
{
8688
Respond {
87-
respond: SendResponse<B::Data>,
89+
respond: SendResponse<SendBuf<<B::Data as IntoBuf>::Buf>>,
8890
response: T,
8991
},
9092
Flush(flush::Flush<B>),
@@ -379,7 +381,9 @@ impl<T, B> Background<T, B>
379381
where T: Future,
380382
B: Body,
381383
{
382-
fn new(respond: SendResponse<B::Data>, response: T) -> Self {
384+
fn new(respond: SendResponse<SendBuf<<B::Data as IntoBuf>::Buf>>, response: T)
385+
-> Self
386+
{
383387
Background {
384388
state: BackgroundState::Respond {
385389
respond,

Diff for: tests/Cargo.toml

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "tests"
3+
version = "0.1.0"
4+
authors = ["Carl Lerche <[email protected]>"]
5+
publish = false
6+
7+
[dependencies]
8+
bytes = "0.4.7"
9+
futures = "0.1.21"
10+
h2 = "0.1.4"
11+
h2-support = { git = "https://github.com/carllerche/h2" }
12+
http = "0.1.5"
13+
tokio = "0.1.5"
14+
tower-h2 = { path = ".." }
15+
tower-service = { git = "https://github.com/tower-rs/tower" }
16+
tower-util = { git = "https://github.com/tower-rs/tower" }

Diff for: tests/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# tower-h2 integration tests
2+
3+
This is a separate crate because it uses a git dependency on h2-support (from
4+
the h2 crate). When deploying to crates.io, there cannot be any git
5+
dependencies. This applies to dev dependencies as well.

0 commit comments

Comments
 (0)