Skip to content

Commit

Permalink
Merge pull request #56 from betagouv/only_force
Browse files Browse the repository at this point in the history
remove front keepalive, better timeout, fix empty body ...
  • Loading branch information
LeSim authored Nov 19, 2020
2 parents 13948bd + c43baa2 commit d035ba3
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ futures = "0"
futures-core = "0"
actix-web = { version="3", features = ["openssl"] }
actix-rt = "1"
actix-http = "*"
bytes = "0"
docopt = "1"
serde = { version = "1", features = ["derive"] }
Expand Down
105 changes: 82 additions & 23 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use log::error;
use sodiumoxide::crypto::secretstream::xchacha20poly1305::{ABYTES, HEADERBYTES};
use std::time::Duration;

const TIMEOUT_DURATION: Duration = Duration::from_secs(60 * 60);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);
const UPLOAD_TIMEOUT: Duration = Duration::from_secs(3 * 60);

static FORWARD_REQUEST_HEADERS_TO_REMOVE: [header::HeaderName; 4] = [
// Connection settings (keepalived) must not be resend
Expand Down Expand Up @@ -82,7 +84,8 @@ async fn forward(

let mut forwarded_req = client
.request_from(put_url.as_str(), req.head())
.timeout(TIMEOUT_DURATION);
.force_close()
.timeout(UPLOAD_TIMEOUT);

let forward_length: Option<usize> = content_length(req.headers()).map(|content_length| {
if config.noop {
Expand All @@ -96,7 +99,7 @@ async fn forward(
forwarded_req.headers_mut().remove(header);
}

let stream_to_send: Box<dyn Stream<Item = _> + Unpin> = if config.noop {
let stream: Box<dyn Stream<Item = _> + Unpin> = if config.noop {
Box::new(payload)
} else {
Box::new(Encoder::new(
Expand All @@ -106,23 +109,27 @@ async fn forward(
))
};

let mut res = if let Some(length) = forward_length {
let req_copy = req.clone();
let stream_to_send = stream.map_err(move |e| {
error!("forward error with stream {:?}, {:?}", e, req_copy);
Error::from(e)
});

let res_e = if let Some(length) = forward_length {
forwarded_req
.send_body(SizedStream::new(
length as u64,
stream_to_send.map_err(Error::from),
))
.send_body(SizedStream::new(length as u64, stream_to_send))
.await
.map_err(Error::from)?
} else {
forwarded_req
.send_stream(stream_to_send.map_err(Error::from))
.await
.map_err(Error::from)?
forwarded_req.send_stream(stream_to_send).await
};

let mut res = res_e.map_err(|e| {
error!("forward fwk error {:?}, {:?}", e, req);
Error::from(e)
})?;

if res.status().is_client_error() || res.status().is_server_error() {
error!("forward error {:?} {:?}", req, res);
error!("forward status error {:?} {:?}", req, res);
}

let mut client_resp = HttpResponse::build(res.status());
Expand All @@ -148,16 +155,19 @@ async fn fetch(

let mut fetch_req = client
.request_from(get_url.as_str(), req.head())
.timeout(TIMEOUT_DURATION);
.force_close();

for header in &FETCH_REQUEST_HEADERS_TO_REMOVE {
fetch_req.headers_mut().remove(header);
}

let res = fetch_req.send_body(body).await.map_err(Error::from)?;
let res = fetch_req.send_body(body).await.map_err(|e| {
error!("fetch fwk error {:?}, {:?}", e, req);
Error::from(e)
})?;

if res.status().is_client_error() || res.status().is_server_error() {
error!("fetch error {:?} {:?}", req, res);
error!("fetch status error {:?} {:?}", req, res);
}

let mut client_resp = HttpResponse::build(res.status());
Expand Down Expand Up @@ -204,9 +214,7 @@ async fn simple_proxy(
) -> Result<HttpResponse, Error> {
let url = config.create_url(&req.uri());

let mut proxied_req = client
.request_from(url.as_str(), req.head())
.timeout(TIMEOUT_DURATION);
let mut proxied_req = client.request_from(url.as_str(), req.head()).force_close();

for header in &FETCH_REQUEST_HEADERS_TO_REMOVE {
proxied_req.headers_mut().remove(header);
Expand All @@ -215,10 +223,13 @@ async fn simple_proxy(
proxied_req
.send_stream(payload)
.await
.map_err(Error::from)
.map_err(|e| {
error!("simple proxy fwk error {:?}, {:?}", e, req);
Error::from(e)
})
.map(|res| {
if res.status().is_client_error() || res.status().is_server_error() {
error!("simple proxy error {:?} {:?}", req, res);
error!("simple proxy status error {:?} {:?}", req, res);
}

let mut client_resp = HttpResponse::build(res.status());
Expand All @@ -243,6 +254,10 @@ fn content_length(headers: &HeaderMap) -> Option<usize> {
}

fn encrypted_content_length(clear_length: usize, chunk_size: usize) -> usize {
if clear_length == 0 {
return 0;
}

let nb_chunk = clear_length / chunk_size;
let remainder = clear_length % chunk_size;

Expand All @@ -254,6 +269,10 @@ fn encrypted_content_length(clear_length: usize, chunk_size: usize) -> usize {
}

fn decrypted_content_length(encrypted_length: usize, decipher: DecipherType) -> usize {
if encrypted_length == 0 {
return 0;
}

match decipher {
DecipherType::Encrypted { chunk_size } => {
// encrypted = header_ds + header_crypto + n ( abytes + chunk ) + a (abytes + remainder)
Expand Down Expand Up @@ -285,9 +304,20 @@ pub async fn main(config: Config) -> std::io::Result<()> {
let address = config.address.unwrap();
let max_conn = config.max_connections;

use actix_http;

HttpServer::new(move || {
App::new()
.data(actix_web::client::Client::new())
.data(
actix_web::client::ClientBuilder::new()
.connector(
actix_web::client::Connector::new()
.timeout(CONNECT_TIMEOUT) // max time to connect to remote host including dns name resolution
.finish(),
)
.timeout(RESPONSE_TIMEOUT) // the total time before a response must be received
.finish(),
)
.data(config.clone())
.wrap(middleware::Logger::default())
.service(web::resource("/ping").guard(guard::Get()).to(ping))
Expand All @@ -296,6 +326,7 @@ pub async fn main(config: Config) -> std::io::Result<()> {
.default_service(web::route().to(simple_proxy))
})
.max_connections(max_conn)
.keep_alive(actix_http::KeepAlive::Disabled)
.bind_uds("/tmp/actix-uds.socket")?
.bind(address)?
.run()
Expand All @@ -306,6 +337,22 @@ pub async fn main(config: Config) -> std::io::Result<()> {
mod tests {
use super::*;

#[test]
fn test_decrypt_content_length_from_0() {
let original_length = 0;
let chunk_size = 16;
let encrypted_length = 0;

let decrypted_length = decrypted_content_length(
encrypted_length,
DecipherType::Encrypted {
chunk_size: chunk_size,
},
);

assert_eq!(original_length, decrypted_length);
}

#[test]
fn test_decrypt_content_length_without_remainder() {
let original_length = 32;
Expand Down Expand Up @@ -354,6 +401,18 @@ mod tests {
assert_eq!(original_length, decrypted_length);
}

#[test]
fn test_encrypted_content_length_from_0() {
let original_length = 0;
let chunk_size = 16;
let encrypted_length = 0;

assert_eq!(
encrypted_length,
encrypted_content_length(original_length, chunk_size)
);
}

#[test]
fn test_encrypted_content_length_without_remainder() {
let original_length = 32;
Expand Down
16 changes: 13 additions & 3 deletions tests/fixtures/server-static/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,20 @@ let last_put_headers = {};
app.put('*', function(req, res) {
last_put_headers = req.headers;

req.pipe(fs.createWriteStream(__dirname + '/uploads/' +req.url));
writeStream = fs.createWriteStream(__dirname + '/uploads/' +req.url);

res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('OK!');
req.pipe(writeStream);

// After all the data is saved, respond Ok
req.on('end', function () {
res.writeHead(200, {"content-type":"text/html"});
res.end('Ok!');
});

// This is here incase any errors occur
writeStream.on('error', function (err) {
console.log(err);
});
});

// Add extra latency to all requests
Expand Down

0 comments on commit d035ba3

Please sign in to comment.