@@ -13,6 +13,7 @@ use std::fmt;
13
13
use std:: sync:: Arc ;
14
14
#[ cfg( feature = "runtime" ) ] use std:: time:: Duration ;
15
15
16
+ use super :: rewind:: Rewind ;
16
17
use bytes:: Bytes ;
17
18
use futures:: { Async , Future , Poll , Stream } ;
18
19
use futures:: future:: { Either , Executor } ;
@@ -23,6 +24,7 @@ use common::Exec;
23
24
use proto;
24
25
use body:: { Body , Payload } ;
25
26
use service:: { NewService , Service } ;
27
+ use error:: { Kind , Parse } ;
26
28
27
29
#[ cfg( feature = "runtime" ) ] pub use super :: tcp:: AddrIncoming ;
28
30
@@ -74,31 +76,32 @@ pub(super) struct SpawnAll<I, S> {
74
76
///
75
77
/// Polling this future will drive HTTP forward.
76
78
#[ must_use = "futures do nothing unless polled" ]
77
- pub struct Connection < I , S >
79
+ pub struct Connection < T , S >
78
80
where
79
81
S : Service ,
80
82
{
81
- pub ( super ) conn : Either <
83
+ pub ( super ) conn : Option <
84
+ Either <
82
85
proto:: h1:: Dispatcher <
83
86
proto:: h1:: dispatch:: Server < S > ,
84
87
S :: ResBody ,
85
- I ,
88
+ T ,
86
89
proto:: ServerTransaction ,
87
90
> ,
88
91
proto:: h2:: Server <
89
- I ,
92
+ Rewind < T > ,
90
93
S ,
91
94
S :: ResBody ,
92
95
> ,
93
- > ,
96
+ > > ,
94
97
}
95
98
96
99
/// Deconstructed parts of a `Connection`.
97
100
///
98
101
/// This allows taking apart a `Connection` at a later time, in order to
99
102
/// reclaim the IO object, and additional related pieces.
100
103
#[ derive( Debug ) ]
101
- pub struct Parts < T , S > {
104
+ pub struct Parts < T , S > {
102
105
/// The original IO object used in the handshake.
103
106
pub io : T ,
104
107
/// A buffer of bytes that have been read but not processed as HTTP.
@@ -239,12 +242,13 @@ impl Http {
239
242
let sd = proto:: h1:: dispatch:: Server :: new ( service) ;
240
243
Either :: A ( proto:: h1:: Dispatcher :: new ( sd, conn) )
241
244
} else {
242
- let h2 = proto:: h2:: Server :: new ( io, service, self . exec . clone ( ) ) ;
245
+ let rewind_io = Rewind :: new ( io) ;
246
+ let h2 = proto:: h2:: Server :: new ( rewind_io, service, self . exec . clone ( ) ) ;
243
247
Either :: B ( h2)
244
248
} ;
245
249
246
250
Connection {
247
- conn : either,
251
+ conn : Some ( either) ,
248
252
}
249
253
}
250
254
@@ -322,7 +326,7 @@ where
322
326
/// This `Connection` should continue to be polled until shutdown
323
327
/// can finish.
324
328
pub fn graceful_shutdown ( & mut self ) {
325
- match self . conn {
329
+ match * self . conn . as_mut ( ) . unwrap ( ) {
326
330
Either :: A ( ref mut h1) => {
327
331
h1. disable_keep_alive ( ) ;
328
332
} ,
@@ -334,11 +338,12 @@ where
334
338
335
339
/// Return the inner IO object, and additional information.
336
340
///
341
+ /// If the IO object has been "rewound" the io will not contain those bytes rewound.
337
342
/// This should only be called after `poll_without_shutdown` signals
338
343
/// that the connection is "done". Otherwise, it may not have finished
339
344
/// flushing all necessary HTTP bytes.
340
345
pub fn into_parts ( self ) -> Parts < I , S > {
341
- let ( io, read_buf, dispatch) = match self . conn {
346
+ let ( io, read_buf, dispatch) = match self . conn . unwrap ( ) {
342
347
Either :: A ( h1) => {
343
348
h1. into_inner ( )
344
349
} ,
@@ -349,7 +354,7 @@ where
349
354
Parts {
350
355
io : io,
351
356
read_buf : read_buf,
352
- service : dispatch. service ,
357
+ service : dispatch. into_service ( ) ,
353
358
_inner : ( ) ,
354
359
}
355
360
}
@@ -362,14 +367,37 @@ where
362
367
/// but it is not desired to actally shutdown the IO object. Instead you
363
368
/// would take it back using `into_parts`.
364
369
pub fn poll_without_shutdown ( & mut self ) -> Poll < ( ) , :: Error > {
365
- match self . conn {
370
+ match * self . conn . as_mut ( ) . unwrap ( ) {
366
371
Either :: A ( ref mut h1) => {
367
372
try_ready ! ( h1. poll_without_shutdown( ) ) ;
368
373
Ok ( ( ) . into ( ) )
369
374
} ,
370
375
Either :: B ( ref mut h2) => h2. poll ( ) ,
371
376
}
372
377
}
378
+
379
+ fn try_h2 ( & mut self ) -> Poll < ( ) , :: Error > {
380
+ trace ! ( "Trying to upgrade connection to h2" ) ;
381
+ let conn = self . conn . take ( ) ;
382
+
383
+ let ( io, read_buf, dispatch) = match conn. unwrap ( ) {
384
+ Either :: A ( h1) => {
385
+ h1. into_inner ( )
386
+ } ,
387
+ Either :: B ( _h2) => {
388
+ panic ! ( "h2 cannot into_inner" ) ;
389
+ }
390
+ } ;
391
+ let mut rewind_io = Rewind :: new ( io) ;
392
+ rewind_io. rewind ( read_buf) ;
393
+ let mut h2 = proto:: h2:: Server :: new ( rewind_io, dispatch. into_service ( ) , Exec :: Default ) ;
394
+ let pr = h2. poll ( ) ;
395
+
396
+ debug_assert ! ( self . conn. is_none( ) ) ;
397
+ self . conn = Some ( Either :: B ( h2) ) ;
398
+
399
+ pr
400
+ }
373
401
}
374
402
375
403
impl < I , B , S > Future for Connection < I , S >
@@ -384,7 +412,16 @@ where
384
412
type Error = :: Error ;
385
413
386
414
fn poll ( & mut self ) -> Poll < Self :: Item , Self :: Error > {
387
- self . conn . poll ( )
415
+ match self . conn . poll ( ) {
416
+ Ok ( x) => Ok ( x. map ( |o| o. unwrap_or_else ( || ( ) ) ) ) ,
417
+ Err ( e) => {
418
+ debug ! ( "error polling connection protocol: {}" , e) ;
419
+ match * e. kind ( ) {
420
+ Kind :: Parse ( Parse :: VersionH2 ) => self . try_h2 ( ) ,
421
+ _ => Err ( e) ,
422
+ }
423
+ }
424
+ }
388
425
}
389
426
}
390
427
0 commit comments