@@ -267,14 +267,14 @@ where
267267
268268 fn into_future ( self ) -> Self :: IntoFuture {
269269 private:: ServeFuture ( Box :: pin ( async move {
270- self . run ( ) . await ;
270+ do_serve ( self . listener , self . make_service , self . signal ) . await ;
271271 Ok ( ( ) )
272272 } ) )
273273 }
274274}
275275
276276#[ cfg( all( feature = "tokio" , any( feature = "http1" , feature = "http2" ) ) ) ]
277- impl < L , M , S , F > WithGracefulShutdown < L , M , S , F >
277+ async fn do_serve < L , M , F , S > ( mut listener : L , mut make_service : M , signal : F )
278278where
279279 L : Listener ,
280280 L :: Addr : Debug ,
@@ -284,93 +284,84 @@ where
284284 S :: Future : Send ,
285285 F : Future < Output = ( ) > + Send + ' static ,
286286{
287- async fn run ( self ) {
288- let Self {
289- mut listener,
290- mut make_service,
291- signal,
292- _marker : _,
293- } = self ;
294-
295- let ( signal_tx, signal_rx) = watch:: channel ( ( ) ) ;
296- tokio:: spawn ( async move {
297- signal. await ;
298- trace ! ( "received graceful shutdown signal. Telling tasks to shutdown" ) ;
299- drop ( signal_rx) ;
300- } ) ;
301-
302- let ( close_tx, close_rx) = watch:: channel ( ( ) ) ;
303-
304- loop {
305- let ( io, remote_addr) = tokio:: select! {
306- conn = listener. accept( ) => conn,
307- _ = signal_tx. closed( ) => {
308- trace!( "signal received, not accepting new connections" ) ;
309- break ;
310- }
311- } ;
312-
313- let io = TokioIo :: new ( io) ;
314-
315- trace ! ( "connection {remote_addr:?} accepted" ) ;
287+ let ( signal_tx, signal_rx) = watch:: channel ( ( ) ) ;
288+ tokio:: spawn ( async move {
289+ signal. await ;
290+ trace ! ( "received graceful shutdown signal. Telling tasks to shutdown" ) ;
291+ drop ( signal_rx) ;
292+ } ) ;
293+
294+ let ( close_tx, close_rx) = watch:: channel ( ( ) ) ;
295+
296+ loop {
297+ let ( io, remote_addr) = tokio:: select! {
298+ conn = listener. accept( ) => conn,
299+ _ = signal_tx. closed( ) => {
300+ trace!( "signal received, not accepting new connections" ) ;
301+ break ;
302+ }
303+ } ;
316304
317- poll_fn ( |cx| make_service. poll_ready ( cx) )
318- . await
319- . unwrap_or_else ( |err| match err { } ) ;
305+ let io = TokioIo :: new ( io) ;
320306
321- let tower_service = make_service
322- . call ( IncomingStream {
323- io : & io,
324- remote_addr,
325- } )
326- . await
327- . unwrap_or_else ( |err| match err { } )
328- . map_request ( |req : Request < Incoming > | req. map ( Body :: new) ) ;
307+ trace ! ( "connection {remote_addr:?} accepted" ) ;
329308
330- let hyper_service = TowerToHyperService :: new ( tower_service) ;
309+ poll_fn ( |cx| make_service. poll_ready ( cx) )
310+ . await
311+ . unwrap_or_else ( |err| match err { } ) ;
331312
332- let signal_tx = signal_tx. clone ( ) ;
313+ let tower_service = make_service
314+ . call ( IncomingStream {
315+ io : & io,
316+ remote_addr,
317+ } )
318+ . await
319+ . unwrap_or_else ( |err| match err { } )
320+ . map_request ( |req : Request < Incoming > | req. map ( Body :: new) ) ;
333321
334- let close_rx = close_rx . clone ( ) ;
322+ let hyper_service = TowerToHyperService :: new ( tower_service ) ;
335323
336- tokio:: spawn ( async move {
337- #[ allow( unused_mut) ]
338- let mut builder = Builder :: new ( TokioExecutor :: new ( ) ) ;
339- // CONNECT protocol needed for HTTP/2 websockets
340- #[ cfg( feature = "http2" ) ]
341- builder. http2 ( ) . enable_connect_protocol ( ) ;
324+ let signal_tx = signal_tx. clone ( ) ;
342325
343- let mut conn = pin ! ( builder. serve_connection_with_upgrades( io, hyper_service) ) ;
344- let mut signal_closed = pin ! ( signal_tx. closed( ) . fuse( ) ) ;
326+ let close_rx = close_rx. clone ( ) ;
345327
346- loop {
347- tokio:: select! {
348- result = conn. as_mut( ) => {
349- if let Err ( _err) = result {
350- trace!( "failed to serve connection: {_err:#}" ) ;
351- }
352- break ;
353- }
354- _ = & mut signal_closed => {
355- trace!( "signal received in task, starting graceful shutdown" ) ;
356- conn. as_mut( ) . graceful_shutdown( ) ;
328+ tokio:: spawn ( async move {
329+ #[ allow( unused_mut) ]
330+ let mut builder = Builder :: new ( TokioExecutor :: new ( ) ) ;
331+ // CONNECT protocol needed for HTTP/2 websockets
332+ #[ cfg( feature = "http2" ) ]
333+ builder. http2 ( ) . enable_connect_protocol ( ) ;
334+
335+ let mut conn = pin ! ( builder. serve_connection_with_upgrades( io, hyper_service) ) ;
336+ let mut signal_closed = pin ! ( signal_tx. closed( ) . fuse( ) ) ;
337+
338+ loop {
339+ tokio:: select! {
340+ result = conn. as_mut( ) => {
341+ if let Err ( _err) = result {
342+ trace!( "failed to serve connection: {_err:#}" ) ;
357343 }
344+ break ;
345+ }
346+ _ = & mut signal_closed => {
347+ trace!( "signal received in task, starting graceful shutdown" ) ;
348+ conn. as_mut( ) . graceful_shutdown( ) ;
358349 }
359350 }
351+ }
360352
361- drop ( close_rx) ;
362- } ) ;
363- }
353+ drop ( close_rx) ;
354+ } ) ;
355+ }
364356
365- drop ( close_rx) ;
366- drop ( listener) ;
357+ drop ( close_rx) ;
358+ drop ( listener) ;
367359
368- trace ! (
369- "waiting for {} task(s) to finish" ,
370- close_tx. receiver_count( )
371- ) ;
372- close_tx. closed ( ) . await ;
373- }
360+ trace ! (
361+ "waiting for {} task(s) to finish" ,
362+ close_tx. receiver_count( )
363+ ) ;
364+ close_tx. closed ( ) . await ;
374365}
375366
376367/// An incoming stream.
0 commit comments