Skip to content

Commit

Permalink
test(server): remove unneeded mutex for TestService
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 6, 2018
1 parent fdd0413 commit 138b1f8
Showing 1 changed file with 29 additions and 28 deletions.
57 changes: 29 additions & 28 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::net::{TcpStream, Shutdown, SocketAddr};
use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::sync::{Arc};
use std::net::{TcpListener as StdTcpListener};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -701,7 +701,7 @@ fn header_connection_close() {
}

#[test]
fn expect_continue() {
fn expect_continue_sends_100() {
let server = serve();
let mut req = connect(server.addr());
server.reply();
Expand Down Expand Up @@ -1655,7 +1655,7 @@ impl Drop for Serve {

#[derive(Clone)]
struct TestService {
tx: Arc<Mutex<mpsc::Sender<Msg>>>,
tx: mpsc::Sender<Msg>,
reply: spmc::Receiver<Reply>,
}

Expand All @@ -1670,51 +1670,52 @@ enum Reply {

#[derive(Debug)]
enum Msg {
//Head(Request),
Chunk(Vec<u8>),
Error(hyper::Error),
End,
}

impl TestService {
// Box is needed until we can return `impl Future` from a fn
fn call(&self, req: Request<Body>) -> Box<Future<Item=Response<Body>, Error=hyper::Error> + Send> {
let tx1 = self.tx.clone();
let tx2 = self.tx.clone();

let replies = self.reply.clone();
Box::new(req.into_body().for_each(move |chunk| {
tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap();
tx1.send(Msg::Chunk(chunk.to_vec())).unwrap();
Ok(())
}).then(move |result| {
let msg = match result {
Ok(()) => Msg::End,
Err(e) => Msg::Error(e),
};
tx2.lock().unwrap().send(msg).unwrap();
tx2.send(msg).unwrap();
Ok(())
}).map(move |_| {
let mut res = Response::new(Body::empty());
while let Ok(reply) = replies.try_recv() {
match reply {
Reply::Status(s) => {
*res.status_mut() = s;
},
Reply::Version(v) => {
*res.version_mut() = v;
},
Reply::Header(name, value) => {
res.headers_mut().insert(name, value);
},
Reply::Body(body) => {
*res.body_mut() = body;
},
Reply::End => break,
}
}
res
TestService::build_reply(replies)
}))
}

fn build_reply(replies: spmc::Receiver<Reply>) -> Response<Body> {
let mut res = Response::new(Body::empty());
while let Ok(reply) = replies.try_recv() {
match reply {
Reply::Status(s) => {
*res.status_mut() = s;
},
Reply::Version(v) => {
*res.version_mut() = v;
},
Reply::Header(name, value) => {
res.headers_mut().insert(name, value);
},
Reply::Body(body) => {
*res.body_mut() = body;
},
Reply::End => break,
}
}
res
}
}

const HELLO: &'static str = "hello";
Expand Down Expand Up @@ -1806,7 +1807,7 @@ impl ServeOptions {
.http1_pipeline_flush(options.pipeline)
.serve(move || {
let ts = TestService {
tx: Arc::new(Mutex::new(msg_tx.clone())),
tx: msg_tx.clone(),
reply: reply_rx.clone(),
};
service_fn(move |req| ts.call(req))
Expand Down

0 comments on commit 138b1f8

Please sign in to comment.