From 039281b89cf1ab54a0ecc10c5e7fee56d4da0cf4 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 30 Oct 2019 13:59:24 -0700 Subject: [PATCH] fix(client): fix polling dispatch channel after it has closed --- src/proto/h1/dispatch.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index ff103a5c01..9e344c9b02 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -37,6 +37,7 @@ pub struct Server, B> { pub struct Client { callback: Option, Response>>, rx: ClientRx, + rx_closed: bool, } type ClientRx = crate::client::dispatch::Receiver, Response>; @@ -490,7 +491,8 @@ impl Client { pub fn new(rx: ClientRx) -> Client { Client { callback: None, - rx: rx, + rx, + rx_closed: false, } } } @@ -505,6 +507,7 @@ where type RecvItem = ResponseHead; fn poll_msg(&mut self, cx: &mut task::Context<'_>) -> Poll>> { + debug_assert!(!self.rx_closed); match self.rx.poll_next(cx) { Poll::Ready(Some((req, mut cb))) => { // check that future hasn't been canceled already @@ -526,8 +529,9 @@ where } }, Poll::Ready(None) => { - trace!("client tx closed"); // user has dropped sender handle + trace!("client tx closed"); + self.rx_closed = true; Poll::Ready(None) }, Poll::Pending => Poll::Pending, @@ -555,7 +559,7 @@ where if let Some(cb) = self.callback.take() { let _ = cb.send(Err((err, None))); Ok(()) - } else { + } else if !self.rx_closed { self.rx.close(); if let Some((req, cb)) = self.rx.try_recv() { trace!("canceling queued request with connection error: {}", err); @@ -566,6 +570,8 @@ where } else { Err(err) } + } else { + Err(err) } } }