Skip to content

Commit f3e17d0

Browse files
committed
perf(http1): implement an adaptive read buffer strategy
The default read strategy for HTTP/1 connections is now adaptive. It increases or decreases the size of the read buffer depending on the number of bytes that are received in a `read` call. If a transport continuously fills the read buffer, it will continue to grow (up to the `max_buf_size`), allowing for reading faster. If the transport consistently only fills a portion of the read buffer, it will be shrunk. This doesn't provide much benefit to small requests/responses, but benchmarks show it to be a noticeable improvement to throughput when streaming larger bodies. Closes #1708
1 parent a6fff13 commit f3e17d0

File tree

2 files changed

+202
-29
lines changed

2 files changed

+202
-29
lines changed

Diff for: benches/end_to_end.rs

+28
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,26 @@ fn http1_post(b: &mut test::Bencher) {
2929
.bench(b)
3030
}
3131

32+
#[bench]
33+
fn http1_body_100kb(b: &mut test::Bencher) {
34+
let body = &[b'x'; 1024 * 100];
35+
opts()
36+
.method(Method::POST)
37+
.request_body(body)
38+
.response_body(body)
39+
.bench(b)
40+
}
41+
42+
#[bench]
43+
fn http1_body_10mb(b: &mut test::Bencher) {
44+
let body = &[b'x'; 1024 * 1024 * 10];
45+
opts()
46+
.method(Method::POST)
47+
.request_body(body)
48+
.response_body(body)
49+
.bench(b)
50+
}
51+
3252
#[bench]
3353
fn http1_get_parallel(b: &mut test::Bencher) {
3454
opts()
@@ -96,6 +116,11 @@ impl Opts {
96116
self
97117
}
98118

119+
fn response_body(mut self, body: &'static [u8]) -> Self {
120+
self.response_body = body;
121+
self
122+
}
123+
99124
fn parallel(mut self, cnt: u32) -> Self {
100125
assert!(cnt > 0, "parallel count must be larger than 0");
101126
self.parallel_cnt = cnt;
@@ -105,6 +130,9 @@ impl Opts {
105130
fn bench(self, b: &mut test::Bencher) {
106131
let _ = pretty_env_logger::try_init();
107132
let mut rt = Runtime::new().unwrap();
133+
134+
b.bytes = self.response_body.len() as u64 + self.request_body.map(|b| b.len()).unwrap_or(0) as u64;
135+
108136
let addr = spawn_hello(&mut rt, self.response_body);
109137

110138
let connector = HttpConnector::new(1);

Diff for: src/proto/h1/io.rs

+174-29
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::cell::Cell;
2+
use std::cmp;
23
use std::collections::VecDeque;
34
use std::fmt;
45
use std::io;
@@ -60,9 +61,7 @@ where
6061
io: io,
6162
read_blocked: false,
6263
read_buf: BytesMut::with_capacity(0),
63-
read_buf_strategy: ReadStrategy::Adaptive {
64-
max: DEFAULT_MAX_BUFFER_SIZE,
65-
},
64+
read_buf_strategy: ReadStrategy::default(),
6665
write_buf: WriteBuf::new(),
6766
}
6867
}
@@ -81,9 +80,7 @@ where
8180
"The max_buf_size cannot be smaller than {}.",
8281
MINIMUM_MAX_BUFFER_SIZE,
8382
);
84-
self.read_buf_strategy = ReadStrategy::Adaptive {
85-
max,
86-
};
83+
self.read_buf_strategy = ReadStrategy::with_max(max);
8784
self.write_buf.max_buf_size = max;
8885
}
8986

@@ -149,18 +146,11 @@ where
149146
debug!("parsed {} headers", msg.head.headers.len());
150147
return Ok(Async::Ready(msg))
151148
},
152-
None => match self.read_buf_strategy {
153-
ReadStrategy::Adaptive { max } => {
154-
if self.read_buf.len() >= max {
155-
debug!("max_buf_size ({}) reached, closing", max);
156-
return Err(::Error::new_too_large());
157-
}
158-
},
159-
ReadStrategy::Exact(exact) => {
160-
if self.read_buf.len() >= exact {
161-
debug!("exact buf size ({}) filled, closing", exact);
162-
return Err(::Error::new_too_large());
163-
}
149+
None => {
150+
let max = self.read_buf_strategy.max();
151+
if self.read_buf.len() >= max {
152+
debug!("max_buf_size ({}) reached, closing", max);
153+
return Err(::Error::new_too_large());
164154
}
165155
},
166156
}
@@ -177,22 +167,15 @@ where
177167
pub fn read_from_io(&mut self) -> Poll<usize, io::Error> {
178168
use bytes::BufMut;
179169
self.read_blocked = false;
180-
match self.read_buf_strategy {
181-
ReadStrategy::Adaptive { .. } => {
182-
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
183-
self.read_buf.reserve(INIT_BUFFER_SIZE);
184-
}
185-
},
186-
ReadStrategy::Exact(exact) => {
187-
if self.read_buf.capacity() < exact {
188-
self.read_buf.reserve(exact);
189-
}
190-
},
170+
let next = self.read_buf_strategy.next();
171+
if self.read_buf.remaining_mut() < next {
172+
self.read_buf.reserve(next);
191173
}
192174
self.io.read_buf(&mut self.read_buf).map(|ok| {
193175
match ok {
194176
Async::Ready(n) => {
195177
debug!("read {} bytes", n);
178+
self.read_buf_strategy.record(n);
196179
Async::Ready(n)
197180
},
198181
Async::NotReady => {
@@ -285,11 +268,82 @@ where
285268
#[derive(Clone, Copy, Debug)]
286269
enum ReadStrategy {
287270
Adaptive {
271+
decrease_now: bool,
272+
next: usize,
288273
max: usize
289274
},
290275
Exact(usize),
291276
}
292277

278+
impl ReadStrategy {
279+
fn with_max(max: usize) -> ReadStrategy {
280+
ReadStrategy::Adaptive {
281+
decrease_now: false,
282+
next: INIT_BUFFER_SIZE,
283+
max,
284+
}
285+
}
286+
287+
fn next(&self) -> usize {
288+
match *self {
289+
ReadStrategy::Adaptive { next, .. } => next,
290+
ReadStrategy::Exact(exact) => exact,
291+
}
292+
}
293+
294+
fn max(&self) -> usize {
295+
match *self {
296+
ReadStrategy::Adaptive { max, .. } => max,
297+
ReadStrategy::Exact(exact) => exact,
298+
}
299+
}
300+
301+
fn record(&mut self, bytes_read: usize) {
302+
match *self {
303+
ReadStrategy::Adaptive { ref mut decrease_now, ref mut next, max, .. } => {
304+
if bytes_read >= *next {
305+
*next = cmp::min(incr_power_of_two(*next), max);
306+
*decrease_now = false;
307+
} else {
308+
let decr_to = prev_power_of_two(*next);
309+
if bytes_read < decr_to {
310+
if *decrease_now {
311+
*next = cmp::max(decr_to, INIT_BUFFER_SIZE);
312+
*decrease_now = false;
313+
} else {
314+
// Decreasing is a two "record" process.
315+
*decrease_now = true;
316+
}
317+
} else {
318+
// A read within the current range should cancel
319+
// a potential decrease, since we just saw proof
320+
// that we still need this size.
321+
*decrease_now = false;
322+
}
323+
}
324+
},
325+
_ => (),
326+
}
327+
}
328+
}
329+
330+
fn incr_power_of_two(n: usize) -> usize {
331+
n.saturating_mul(2)
332+
}
333+
334+
fn prev_power_of_two(n: usize) -> usize {
335+
// Only way this shift can underflow is if n is less than 4.
336+
// (Which would means `usize::MAX >> 64` and underflowed!)
337+
debug_assert!(n >= 4);
338+
(::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
339+
}
340+
341+
impl Default for ReadStrategy {
342+
fn default() -> ReadStrategy {
343+
ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
344+
}
345+
}
346+
293347
#[derive(Clone)]
294348
pub struct Cursor<T> {
295349
bytes: T,
@@ -637,6 +691,97 @@ mod tests {
637691
assert!(buffered.io.blocked());
638692
}
639693

694+
#[test]
695+
fn read_strategy_adaptive_increments() {
696+
let mut strategy = ReadStrategy::default();
697+
assert_eq!(strategy.next(), 8192);
698+
699+
// Grows if record == next
700+
strategy.record(8192);
701+
assert_eq!(strategy.next(), 16384);
702+
703+
strategy.record(16384);
704+
assert_eq!(strategy.next(), 32768);
705+
706+
// Enormous records still increment at same rate
707+
strategy.record(::std::usize::MAX);
708+
assert_eq!(strategy.next(), 65536);
709+
710+
let max = strategy.max();
711+
while strategy.next() < max {
712+
strategy.record(max);
713+
}
714+
715+
assert_eq!(strategy.next(), max, "never goes over max");
716+
strategy.record(max + 1);
717+
assert_eq!(strategy.next(), max, "never goes over max");
718+
}
719+
720+
#[test]
721+
fn read_strategy_adaptive_decrements() {
722+
let mut strategy = ReadStrategy::default();
723+
strategy.record(8192);
724+
assert_eq!(strategy.next(), 16384);
725+
726+
strategy.record(1);
727+
assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet");
728+
strategy.record(8192);
729+
assert_eq!(strategy.next(), 16384, "record was with range");
730+
731+
strategy.record(1);
732+
assert_eq!(strategy.next(), 16384, "in-range record should make this the 'first' again");
733+
734+
strategy.record(1);
735+
assert_eq!(strategy.next(), 8192, "second smaller record decrements");
736+
737+
strategy.record(1);
738+
assert_eq!(strategy.next(), 8192, "first doesn't decrement");
739+
strategy.record(1);
740+
assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
741+
}
742+
743+
#[test]
744+
fn read_strategy_adaptive_stays_the_same() {
745+
let mut strategy = ReadStrategy::default();
746+
strategy.record(8192);
747+
assert_eq!(strategy.next(), 16384);
748+
749+
strategy.record(8193);
750+
assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet");
751+
752+
strategy.record(8193);
753+
assert_eq!(strategy.next(), 16384, "with current step does not decrement");
754+
}
755+
756+
#[test]
757+
fn read_strategy_adaptive_max_fuzz() {
758+
fn fuzz(max: usize) {
759+
let mut strategy = ReadStrategy::with_max(max);
760+
while strategy.next() < max {
761+
strategy.record(::std::usize::MAX);
762+
}
763+
let mut next = strategy.next();
764+
while next > 8192 {
765+
strategy.record(1);
766+
strategy.record(1);
767+
next = strategy.next();
768+
assert!(
769+
next.is_power_of_two(),
770+
"decrement should be powers of two: {} (max = {})",
771+
next,
772+
max,
773+
);
774+
}
775+
}
776+
777+
let mut max = 8192;
778+
while max < ::std::usize::MAX {
779+
fuzz(max);
780+
max = (max / 2).saturating_mul(3);
781+
}
782+
fuzz(::std::usize::MAX);
783+
}
784+
640785
#[test]
641786
#[should_panic]
642787
fn write_buf_requires_non_empty_bufs() {

0 commit comments

Comments
 (0)