Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/dfx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ serde_cbor = "0.10"
serde_json = "1.0.40"
serde_repr = "0.1.5"
signal-hook = "0.1.13"
slog = "2.5.2"
slog-term = "2.5.0"
slog = { version = "2.5.2", features = ["max_level_trace"] }
slog-async = "2.4.0"
slog-term = "2.5.0"
sysinfo = "0.9.6"
tar = "0.4.26"
tempfile = "3.1.0"
Expand Down
5 changes: 4 additions & 1 deletion src/dfx/src/commands/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::lib::error::{DfxError, DfxResult};
use crate::lib::message::UserMessage;
use crate::lib::webserver::webserver;
use clap::{App, Arg, ArgMatches, SubCommand};
use slog::info;
use std::default::Default;
use std::fs;
use std::io::{Error, ErrorKind};
Expand Down Expand Up @@ -51,11 +52,13 @@ pub fn construct() -> App<'static, 'static> {

/// Runs the bootstrap server.
pub fn exec(env: &dyn Environment, args: &ArgMatches<'_>) -> DfxResult {
let logger = env.get_logger();
let config = get_config(env, args)?;

let (sender, receiver) = crossbeam::unbounded();

webserver(
logger.clone(),
SocketAddr::new(config.ip.unwrap(), config.port.unwrap()),
config
.providers
Expand All @@ -76,7 +79,7 @@ pub fn exec(env: &dyn Environment, args: &ArgMatches<'_>) -> DfxResult {
let _ = receiver.recv().expect("Failed to receive server...");

// Tell the user.
eprintln!("Webserver started...");
info!(logger, "Webserver started...");

// And then wait forever.
#[allow(clippy::empty_loop)]
Expand Down
1 change: 1 addition & 0 deletions src/dfx/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub fn exec(env: &dyn Environment, args: &ArgMatches<'_>) -> DfxResult {
let providers = Vec::new();

let proxy_config = ProxyConfig {
logger: env.get_logger().clone(),
client_api_port: address_and_port.port(),
bind: address_and_port,
serve_dir: bootstrap_dir,
Expand Down
71 changes: 71 additions & 0 deletions src/dfx/src/lib/logger.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
use crate::config::dfx_version_str;
use slog::{Drain, Level, Logger};
use slog_async;
use slog_term;
use std::fs::File;
use std::path::PathBuf;

/// The logging mode to use.
pub enum LoggingMode {
/// The default mode for logging; output without any decoration, to STDERR.
Stderr,

/// Tee logging to a file (in addition to STDERR). This mimics the verbose flag.
/// So it would be similar to `dfx ... |& tee /some/file.txt
Tee(PathBuf),

/// Output Debug logs and up to a file, regardless of verbosity, keep the STDERR output
/// the same (with verbosity).
File(PathBuf),
}

/// A Slog formatter that writes to a term decorator, without any formatting.
pub struct PlainFormat<D>
where
Expand Down Expand Up @@ -33,3 +54,53 @@ impl<D: slog_term::Decorator> slog::Drain for PlainFormat<D> {
})
}
}

/// Create a log drain.
fn create_drain(mode: LoggingMode) -> Logger {
match mode {
LoggingMode::Stderr => Logger::root(
PlainFormat::new(slog_term::PlainSyncDecorator::new(std::io::stderr())).fuse(),
slog::o!(),
),
LoggingMode::File(out) => {
let file = File::create(out).expect("Couldn't open log file");
let decorator = slog_term::PlainDecorator::new(file);
let drain = slog_term::FullFormat::new(decorator).build().fuse();
Logger::root(slog_async::Async::new(drain).build().fuse(), slog::o!())
}
// A Tee mode is basically 2 drains duplicated.
LoggingMode::Tee(out) => Logger::root(
slog::Duplicate::new(
create_drain(LoggingMode::Stderr),
create_drain(LoggingMode::File(out)),
)
.fuse(),
slog::o!(),
),
}
}

/// Create a root logger.
/// The verbose_level can be negative, in which case it's a quiet mode which removes warnings,
/// then errors entirely.
pub fn create_root_logger(verbose_level: i64, mode: LoggingMode) -> Logger {
let log_level = match verbose_level {
-3 => Level::Critical,
-2 => Level::Error,
-1 => Level::Warning,
0 => Level::Info,
1 => Level::Debug,
x => {
if x > 0 {
Level::Trace
} else {
return Logger::root(slog::Discard, slog::o!());
}
}
};

let drain = slog::LevelFilter::new(create_drain(mode), log_level).fuse();
let drain = slog_async::Async::new(drain).build().fuse();

Logger::root(drain, slog::o!("version" => dfx_version_str()))
}
4 changes: 3 additions & 1 deletion src/dfx/src/lib/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ pub struct Proxy {

/// Provide basic information to the proxy about the API port, the
/// address and the serve directory.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug)]
pub struct ProxyConfig {
pub client_api_port: u16,
pub bind: SocketAddr,
pub serve_dir: PathBuf,
pub providers: Vec<url::Url>,
pub logger: slog::Logger,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -91,6 +92,7 @@ impl Proxy {
eprintln!("client address: {:?}", ic_client_bind_addr);

run_webserver(
self.config.logger.clone(),
self.config.bind,
providers,
self.config.serve_dir.clone(),
Expand Down
100 changes: 78 additions & 22 deletions src/dfx/src/lib/webserver.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use actix::dev::Stream;
use actix::System;
use actix_cors::Cors;
use actix_server::Server;
use actix_web::client::Client;
use actix_web::{http, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web::{
http, middleware, web, App, Error, HttpMessage, HttpRequest, HttpResponse, HttpServer,
};
use crossbeam::channel::Sender;
use futures::Future;
use slog::{debug, info, trace, Logger};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
Expand All @@ -17,6 +21,7 @@ const FORWARD_REQUEST_TIMEOUT_IN_SECS: u64 = 20;

struct ForwardActixData {
pub providers: Vec<Url>,
pub logger: slog::Logger,
pub counter: usize,
}

Expand All @@ -30,12 +35,12 @@ fn forward(
data.counter += 1;
let count = data.counter;

let mut new_url = data.providers[count % data.providers.len()].clone();
new_url.set_path(req.uri().path());
new_url.set_query(req.uri().query());
let mut url = data.providers[count % data.providers.len()].clone();
url.set_path(req.uri().path());
url.set_query(req.uri().query());

let forwarded_req = client
.request_from(new_url.as_str(), req.head())
.request_from(url.as_str(), req.head())
.no_decompress()
.timeout(std::time::Duration::from_secs(
FORWARD_REQUEST_TIMEOUT_IN_SECS,
Expand All @@ -46,40 +51,90 @@ fn forward(
forwarded_req
};

forwarded_req
.send_stream(payload)
let logger = data.logger.clone();

// TODO(hansl): move this all to async/await. Jeez....
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to Google, I prefer filing an issue to TODOs. I don't know how it was on your team, but on ours, they would stay in there until the entire code became obsolete and got deleted. Recording it on buganizer meant you either got extra credit for doing the thing, or the issue magically and quietly disappeared on the next declaration of bug bankruptcy. Win-win!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a tab in my IDE that shows TODOs. And we had talks and work into making a TODO bot that just adds GitHub issues based on TODOs.

Both systems have pros and cons. I've seen many P2-4 just get ignored and closed after 2 months due to inactivity. And a refactor TODO like this one would definitely not be P0-1 ;)

When G+ launched we even had "G0-4" priorities which were basically grouping priorities for projects (so a G0/P4 could potentially be more important than a G1/P0 which was weird). In the end the concept was trashed because everyone thought their jobs were G0... :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// (PS: the reason I don't do this yet is moving actix to async/await is a bit more
// involved than replacing this single function)
payload
.map_err(Error::from)
.map(|res| {
let mut client_resp = HttpResponse::build(res.status());
for (header_name, header_value) in res
.headers()
.iter()
.filter(|(h, _)| *h != "connection" && *h != "content-length")
{
client_resp.header(header_name.clone(), header_value.clone());
}
client_resp.streaming(res)
.fold(web::BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, Error>(body)
})
.and_then(move |req_body| {
// We streamed the whole body in memory. Let's log some informations.
debug!(
logger,
"Request ({}) to replica ({})",
indicatif::HumanBytes(req_body.len() as u64),
url,
);
trace!(logger, " type {}", req.content_type());
trace!(logger, " body {}", hex::encode(&req_body));

forwarded_req
.send_body(req_body)
.map_err(Error::from)
.and_then(|mut res| {
res.take_payload()
.map_err(Error::from)
.fold(web::BytesMut::new(), move |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, Error>(body)
})
.and_then(move |res_body| {
let mut client_resp = HttpResponse::build(res.status());
for (header_name, header_value) in res
.headers()
.iter()
.filter(|(h, _)| *h != "connection" && *h != "content-length")
{
client_resp.header(header_name.clone(), header_value.clone());
}

debug!(
logger,
"Response ({}) with status code {}",
indicatif::HumanBytes(res_body.len() as u64),
res.status().as_u16()
);
trace!(logger, " type {}", res.content_type());
trace!(logger, " body {}", hex::encode(&res_body));

client_resp.body(res_body)
})
})
})
}

/// Run the webserver in the current thread.
pub fn run_webserver(
logger: Logger,
bind: SocketAddr,
providers: Vec<url::Url>,
serve_dir: PathBuf,
inform_parent: Sender<Server>,
) -> Result<(), std::io::Error> {
eprintln!("binding to: {:?}", bind);
info!(logger, "binding to: {:?}", bind);

const SHUTDOWN_WAIT_TIME: u64 = 60;

eprint!("client(s): ");
providers.iter().for_each(|uri| eprint!("{} ", uri));
eprint!("\n");
info!(
logger,
"client(s): {}",
providers
.iter()
.map(|x| x.clone().into_string())
.collect::<Vec<String>>()
.as_slice()
.join(", ")
);

let _sys = System::new("dfx-frontend-http-server");
let forward_data = Arc::new(Mutex::new(ForwardActixData {
providers,
logger: logger.clone(),
counter: 0,
}));

Expand Down Expand Up @@ -114,13 +169,14 @@ pub fn run_webserver(
}

pub fn webserver(
logger: Logger,
bind: SocketAddr,
clients_api_uri: Vec<url::Url>,
serve_dir: &Path,
inform_parent: Sender<Server>,
) -> std::io::Result<std::thread::JoinHandle<()>> {
std::thread::Builder::new().name("Frontend".into()).spawn({
let serve_dir = serve_dir.to_path_buf();
move || run_webserver(bind, clients_api_uri, serve_dir, inform_parent).unwrap()
move || run_webserver(logger, bind, clients_api_uri, serve_dir, inform_parent).unwrap()
})
}
Loading