diff --git a/e2e/tests-dfx/default.nix b/e2e/tests-dfx/default.nix index 2adabcf599..75a4d049c2 100644 --- a/e2e/tests-dfx/default.nix +++ b/e2e/tests-dfx/default.nix @@ -35,7 +35,7 @@ let procps which dfx.standalone - ] ++ lib.optional use_ic_ref ic-ref; + ]; BATSLIB = pkgs.sources.bats-support; USE_IC_REF = use_ic_ref; assets = assets; diff --git a/e2e/tests-dfx/signals.bash b/e2e/tests-dfx/signals.bash index da8d4353f1..5606d83a85 100644 --- a/e2e/tests-dfx/signals.bash +++ b/e2e/tests-dfx/signals.bash @@ -30,7 +30,7 @@ dfx_replica_kills_replica() { DFX_PID=$! # wait for replica to start - assert_file_eventually_exists .dfx/config/port.txt 15s + assert_file_eventually_exists .dfx/replica-configuration/replica-1.port 15s kill -"$signal" "$DFX_PID" diff --git a/e2e/utils/_.bash b/e2e/utils/_.bash index c0deda7a28..8db9f797e0 100644 --- a/e2e/utils/_.bash +++ b/e2e/utils/_.bash @@ -35,23 +35,19 @@ dfx_new() { dfx_start() { if [ "$USE_IC_REF" ] then - ic-ref --pick-port --write-port-to port 3>&- & - echo $! > ic-ref.pid - - sleep 5 - - test -f port - local port=$(cat port) - - cat <<<$(jq .networks.local.bind=\"127.0.0.1:${port}\" dfx.json) >dfx.json - cat dfx.json if [[ "$@" == "" ]]; then - dfx bootstrap --port 0 & # Start on random port for parallel test execution + dfx start --emulator --background --host "127.0.0.1:0" 3>&- # Start on random port for parallel test execution else - dfx bootstrap --port 8000 & + batslib_decorate "no arguments to dfx start --emulator supported yet" + fail fi + + test -f .dfx/ic-ref.port + local port=$(cat .dfx/ic-ref.port) + + # Overwrite the default networks.local.bind 127.0.0.1:8000 with allocated port local webserver_port=$(cat .dfx/webserver-port) - echo $! > dfx-bootstrap.pid + cat <<<$(jq .networks.local.bind=\"127.0.0.1:${webserver_port}\" dfx.json) >dfx.json else # Bats creates a FD 3 for test output, but child processes inherit it and Bats will # wait for it to close. Because `dfx start` leaves child processes running, we need @@ -82,25 +78,12 @@ dfx_start() { # Stop the replica and verify it is very very stopped. dfx_stop() { - if [ "$USE_IC_REF" ] - then - test -f ic-ref.pid - printf "Killing ic-ref at pid: %u\n" "$(cat ic-ref.pid)" - kill $(cat ic-ref.pid) - rm -f ic-ref.pid - - test -f dfx-bootstrap.pid - printf "Killing dfx bootstrap at pid: %u\n" "$(cat dfx-bootstrap.pid)" - kill $(cat dfx-bootstrap.pid) - rm -f dfx-bootstrap.pid - else - dfx stop - local dfx_root=.dfx/ - rm -rf $dfx_root + dfx stop + local dfx_root=.dfx/ + rm -rf $dfx_root - # Verify that processes are killed. - assert_no_dfx_start_or_replica_processes - fi + # Verify that processes are killed. + assert_no_dfx_start_or_replica_processes } # Create a canister to make sure we have a wallet on the local network, then diff --git a/e2e/utils/assertions.bash b/e2e/utils/assertions.bash index 66d0e5df7f..705cfe750b 100644 --- a/e2e/utils/assertions.bash +++ b/e2e/utils/assertions.bash @@ -149,7 +149,10 @@ assert_process_exits() { # Asserts that `dfx start` and `replica` are no longer running assert_no_dfx_start_or_replica_processes() { ! ( ps | grep "[/[:space:]]dfx start" ) - ! ( kill -0 $(cat .dfx/replica-configuration/replica-pid) 2>/dev/null ) + if [ -e .dfx/replica-configuration/replica-pid ]; + then + ! ( kill -0 $(cat .dfx/replica-configuration/replica-pid) 2>/dev/null ) + fi } assert_file_eventually_exists() { diff --git a/src/dfx/src/actors.rs b/src/dfx/src/actors.rs deleted file mode 100644 index b748f45c03..0000000000 --- a/src/dfx/src/actors.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod replica; -pub mod replica_webserver_coordinator; -pub mod shutdown_controller; diff --git a/src/dfx/src/actors/emulator.rs b/src/dfx/src/actors/emulator.rs new file mode 100644 index 0000000000..743fa6ba13 --- /dev/null +++ b/src/dfx/src/actors/emulator.rs @@ -0,0 +1,289 @@ +use crate::actors::replica_webserver_coordinator::signals::{PortReadySignal, PortReadySubscribe}; +use crate::actors::shutdown_controller::signals::outbound::Shutdown; +use crate::actors::shutdown_controller::signals::ShutdownSubscribe; +use crate::actors::shutdown_controller::ShutdownController; +use crate::lib::error::{DfxError, DfxResult}; + +use actix::{ + Actor, ActorContext, ActorFuture, Addr, AsyncContext, Context, Handler, Recipient, + ResponseActFuture, Running, WrapFuture, +}; +use anyhow::anyhow; +use crossbeam::channel::{unbounded, Receiver, Sender}; +use delay::{Delay, Waiter}; +use slog::{debug, info, Logger}; +use std::path::{Path, PathBuf}; +use std::thread::JoinHandle; +use std::time::Duration; + +pub mod signals { + use actix::prelude::*; + + /// A message sent to the Emulator when the process is restarted. Since we're + /// restarting inside our own actor, this message should not be exposed. + #[derive(Message)] + #[rtype(result = "()")] + pub(super) struct EmulatorRestarted { + pub port: u16, + } +} + +/// The configuration for the emulator actor. +#[derive(Clone)] +pub struct Config { + pub ic_ref_path: PathBuf, + pub write_port_to: PathBuf, + pub shutdown_controller: Addr, + pub logger: Option, +} + +/// A emulator actor. Starts the emulator, can subscribe to a Ready signal and a +/// Killed signal. +/// This starts a thread that monitors the process and send signals to any subscriber +/// listening for restarts. The message contains the port the emulator is listening to. +/// +/// Signals +/// - PortReadySubscribe +/// Subscribe a recipient (address) to receive a EmulatorReadySignal message when +/// the emulator is ready to listen to a port. The message can be sent multiple +/// times (e.g. if the emulator crashes). +/// If a emulator is already started and another actor sends this message, a +/// EmulatorReadySignal will be sent free of charge in the same thread. +pub struct Emulator { + logger: Logger, + config: Config, + + // We keep the port to send to subscribers on subscription. + port: Option, + stop_sender: Option>, + thread_join: Option>, + + /// Ready Signal subscribers. + ready_subscribers: Vec>, +} + +impl Emulator { + pub fn new(config: Config) -> Self { + let logger = + (config.logger.clone()).unwrap_or_else(|| Logger::root(slog::Discard, slog::o!())); + Emulator { + config, + port: None, + stop_sender: None, + thread_join: None, + ready_subscribers: Vec::new(), + logger, + } + } + + fn wait_for_port_file(file_path: &Path) -> DfxResult { + // Use a Waiter for waiting for the file to be created. + let mut waiter = Delay::builder() + .throttle(Duration::from_millis(100)) + .timeout(Duration::from_secs(30)) + .build(); + + waiter.start(); + loop { + if let Ok(content) = std::fs::read_to_string(file_path) { + if let Ok(port) = content.parse::() { + return Ok(port); + } + } + waiter + .wait() + .map_err(|err| anyhow!("Cannot start ic-ref: {:?}", err))?; + } + } + + fn start_emulator(&mut self, addr: Addr) -> DfxResult { + let logger = self.logger.clone(); + + let (sender, receiver) = unbounded(); + + let handle = emulator_start_thread(logger, self.config.clone(), addr, receiver)?; + + self.thread_join = Some(handle); + self.stop_sender = Some(sender); + Ok(()) + } + + fn send_ready_signal(&self, port: u16) { + for sub in &self.ready_subscribers { + let _ = sub.do_send(PortReadySignal { port }); + } + } +} + +impl Actor for Emulator { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.start_emulator(ctx.address()) + .expect("Could not start the emulator"); + + self.config + .shutdown_controller + .do_send(ShutdownSubscribe(ctx.address().recipient::())); + } + + fn stopping(&mut self, _ctx: &mut Self::Context) -> Running { + info!(self.logger, "Stopping ic-ref..."); + if let Some(sender) = self.stop_sender.take() { + let _ = sender.send(()); + } + + if let Some(join) = self.thread_join.take() { + let _ = join.join(); + } + + info!(self.logger, "Stopped."); + Running::Stop + } +} + +impl Handler for Emulator { + type Result = (); + + fn handle(&mut self, msg: PortReadySubscribe, _: &mut Self::Context) { + // If we have a port, send that we're already ready! Yeah! + if let Some(port) = self.port { + let _ = msg.0.do_send(PortReadySignal { port }); + } + + self.ready_subscribers.push(msg.0); + } +} + +impl Handler for Emulator { + type Result = (); + + fn handle( + &mut self, + msg: signals::EmulatorRestarted, + _ctx: &mut Self::Context, + ) -> Self::Result { + self.port = Some(msg.port); + self.send_ready_signal(msg.port); + } +} + +impl Handler for Emulator { + type Result = ResponseActFuture>; + + fn handle(&mut self, _msg: Shutdown, _ctx: &mut Self::Context) -> Self::Result { + // This is just the example for ResponseActFuture but stopping the context + Box::pin( + async {} + .into_actor(self) // converts future to ActorFuture + .map(|_, _act, ctx| { + ctx.stop(); + Ok(()) + }), + ) + } +} + +enum ChildOrReceiver { + Child, + Receiver, +} + +/// Function that waits for a child or a receiver to stop. This encapsulate the polling so +/// it is easier to maintain. +fn wait_for_child_or_receiver( + child: &mut std::process::Child, + receiver: &Receiver<()>, +) -> ChildOrReceiver { + loop { + // Check if either the child exited or a shutdown has been requested. + // These can happen in either order in response to Ctrl-C, so increase the chance + // to notice a shutdown request even if the emulator exited quickly. + let child_try_wait = child.try_wait(); + let receiver_signalled = receiver.recv_timeout(std::time::Duration::from_millis(100)); + + match (receiver_signalled, child_try_wait) { + (Ok(()), _) => { + // Prefer to indicate the shutdown request + return ChildOrReceiver::Receiver; + } + (Err(_), Ok(Some(_))) => { + return ChildOrReceiver::Child; + } + _ => {} + }; + } +} + +#[allow(clippy::too_many_arguments)] +fn emulator_start_thread( + logger: Logger, + config: Config, + addr: Addr, + receiver: Receiver<()>, +) -> DfxResult> { + let thread_handler = move || { + // Use a Waiter for waiting for the file to be created. + let mut waiter = Delay::builder() + .throttle(Duration::from_millis(1000)) + .exponential_backoff(Duration::from_secs(1), 1.2) + .build(); + waiter.start(); + + // Start the process, then wait for the file. + let ic_ref_path = config.ic_ref_path.as_os_str(); + + // form the ic-start command here similar to emulator command + let mut cmd = std::process::Command::new(ic_ref_path); + cmd.args(&["--pick-port"]); + cmd.args(&[ + "--write-port-to", + &config.write_port_to.to_string_lossy().to_string(), + ]); + cmd.stdout(std::process::Stdio::inherit()); + cmd.stderr(std::process::Stdio::inherit()); + + let mut done = false; + while !done { + let _ = std::fs::remove_file(&config.write_port_to); + let last_start = std::time::Instant::now(); + debug!(logger, "Starting emulator..."); + let mut child = cmd.spawn().expect("Could not start emulator."); + + let port = Emulator::wait_for_port_file(&config.write_port_to).unwrap(); + addr.do_send(signals::EmulatorRestarted { port }); + + // This waits for the child to stop, or the receiver to receive a message. + // We don't restart the emulator if done = true. + match wait_for_child_or_receiver(&mut child, &receiver) { + ChildOrReceiver::Receiver => { + debug!(logger, "Got signal to stop. Killing emulator process..."); + let _ = child.kill(); + let _ = child.wait(); + done = true; + } + ChildOrReceiver::Child => { + debug!(logger, "Emulator process failed."); + // Reset waiter if last start was over 2 seconds ago, and do not wait. + if std::time::Instant::now().duration_since(last_start) + >= Duration::from_secs(2) + { + debug!( + logger, + "Last emulator seemed to have been healthy, not waiting..." + ); + waiter.start(); + } else { + // Wait before we start it again. + let _ = waiter.wait(); + } + } + } + } + }; + + std::thread::Builder::new() + .name("emulator-actor".to_owned()) + .spawn(thread_handler) + .map_err(DfxError::from) +} diff --git a/src/dfx/src/actors/mod.rs b/src/dfx/src/actors/mod.rs new file mode 100644 index 0000000000..a50a51ed7f --- /dev/null +++ b/src/dfx/src/actors/mod.rs @@ -0,0 +1,89 @@ +use crate::actors; +use crate::actors::emulator::Emulator; +use crate::actors::replica::Replica; +use crate::actors::shutdown_controller::ShutdownController; +use crate::lib::environment::Environment; +use crate::lib::error::DfxResult; +use crate::lib::replica_config::ReplicaConfig; + +use actix::{Actor, Addr}; +use std::fs; +use std::path::PathBuf; + +pub mod emulator; +pub mod replica; +pub mod replica_webserver_coordinator; +pub mod shutdown_controller; + +pub fn start_shutdown_controller(env: &dyn Environment) -> DfxResult> { + let actor_config = shutdown_controller::Config { + logger: Some(env.get_logger().clone()), + }; + Ok(ShutdownController::new(actor_config).start()) +} + +pub fn start_emulator_actor( + env: &dyn Environment, + shutdown_controller: Addr, +) -> DfxResult> { + let ic_ref_path = env.get_cache().get_binary_command_path("ic-ref")?; + + let temp_dir = env.get_temp_dir(); + let emulator_port_path = temp_dir.join("ic-ref.port"); + + // Touch the port file. This ensures it is empty prior to + // handing it over to ic-ref. If we read the file and it has + // contents we shall assume it is due to our spawned ic-ref + // process. + std::fs::write(&emulator_port_path, "")?; + + let actor_config = actors::emulator::Config { + ic_ref_path, + write_port_to: emulator_port_path, + shutdown_controller, + logger: Some(env.get_logger().clone()), + }; + Ok(actors::emulator::Emulator::new(actor_config).start()) +} + +fn setup_replica_env(env: &dyn Environment, replica_config: &ReplicaConfig) -> DfxResult { + // create replica config dir + let replica_configuration_dir = env.get_temp_dir().join("replica-configuration"); + fs::create_dir_all(&replica_configuration_dir)?; + + if let Some(replica_port_path) = &replica_config.http_handler.write_port_to { + // Touch the replica port file. This ensures it is empty prior to + // handing it over to the replica. If we read the file and it has + // contents we shall assume it is due to our spawned replica + // process. + std::fs::write(&replica_port_path, "")?; + } + + // create replica state dir + let state_dir = env.get_state_dir().join("replicated_state"); + fs::create_dir_all(&state_dir)?; + + Ok(replica_configuration_dir) +} + +pub fn start_replica_actor( + env: &dyn Environment, + replica_config: ReplicaConfig, + shutdown_controller: Addr, +) -> DfxResult> { + // get binary path + let replica_path = env.get_cache().get_binary_command_path("replica")?; + let ic_starter_path = env.get_cache().get_binary_command_path("ic-starter")?; + + let replica_configuration_dir = setup_replica_env(env, &replica_config)?; + + let actor_config = replica::Config { + ic_starter_path, + replica_config, + replica_path, + shutdown_controller, + logger: Some(env.get_logger().clone()), + replica_configuration_dir, + }; + Ok(Replica::new(actor_config).start()) +} diff --git a/src/dfx/src/actors/replica.rs b/src/dfx/src/actors/replica.rs index c92d1ff722..a51e3cd799 100644 --- a/src/dfx/src/actors/replica.rs +++ b/src/dfx/src/actors/replica.rs @@ -1,4 +1,5 @@ use crate::actors::replica::signals::ReplicaRestarted; +use crate::actors::replica_webserver_coordinator::signals::{PortReadySignal, PortReadySubscribe}; use crate::actors::shutdown_controller::signals::outbound::Shutdown; use crate::actors::shutdown_controller::signals::ShutdownSubscribe; use crate::actors::shutdown_controller::ShutdownController; @@ -20,20 +21,6 @@ use std::time::Duration; pub mod signals { use actix::prelude::*; - pub mod outbound { - use super::*; - - #[derive(Message)] - #[rtype(result = "()")] - pub struct ReplicaReadySignal { - pub port: u16, - } - } - - #[derive(Message)] - #[rtype(result = "()")] - pub struct PortReadySubscribe(pub Recipient); - /// A message sent to the Replica when the process is restarted. Since we're /// restarting inside our own actor, this message should not be exposed. #[derive(Message)] @@ -60,11 +47,11 @@ pub struct Config { /// /// Signals /// - PortReadySubscribe -/// Subscribe a recipient (address) to receive a ReplicaReadySignal message when +/// Subscribe a recipient (address) to receive a PortReadySignal message when /// the replica is ready to listen to a port. The message can be sent multiple /// times (e.g. if the replica crashes). /// If a replica is already started and another actor sends this message, a -/// ReplicaReadySignal will be sent free of charge in the same thread. +/// PortReadySignal will be sent free of charge in the same thread. pub struct Replica { logger: Logger, config: Config, @@ -75,7 +62,7 @@ pub struct Replica { thread_join: Option>, /// Ready Signal subscribers. - ready_subscribers: Vec>, + ready_subscribers: Vec>, } impl Replica { @@ -145,7 +132,7 @@ impl Replica { fn send_ready_signal(&self, port: u16) { for sub in &self.ready_subscribers { - let _ = sub.do_send(signals::outbound::ReplicaReadySignal { port }); + let _ = sub.do_send(PortReadySignal { port }); } } } @@ -177,15 +164,13 @@ impl Actor for Replica { } } -impl Handler for Replica { +impl Handler for Replica { type Result = (); - fn handle(&mut self, msg: signals::PortReadySubscribe, _: &mut Self::Context) { + fn handle(&mut self, msg: PortReadySubscribe, _: &mut Self::Context) { // If we have a port, send that we're already ready! Yeah! if let Some(port) = self.port { - let _ = msg - .0 - .do_send(signals::outbound::ReplicaReadySignal { port }); + let _ = msg.0.do_send(PortReadySignal { port }); } self.ready_subscribers.push(msg.0); diff --git a/src/dfx/src/actors/replica_webserver_coordinator.rs b/src/dfx/src/actors/replica_webserver_coordinator.rs index ea4a997c6a..a4d4f7ae3e 100644 --- a/src/dfx/src/actors/replica_webserver_coordinator.rs +++ b/src/dfx/src/actors/replica_webserver_coordinator.rs @@ -1,6 +1,3 @@ -use crate::actors::replica::signals::outbound::ReplicaReadySignal; -use crate::actors::replica::signals::PortReadySubscribe; -use crate::actors::replica::Replica; use crate::actors::shutdown_controller::signals::outbound::Shutdown; use crate::actors::shutdown_controller::signals::ShutdownSubscribe; use crate::actors::shutdown_controller::ShutdownController; @@ -10,7 +7,7 @@ use crate::lib::webserver::run_webserver; use actix::clock::{delay_for, Duration}; use actix::fut::wrap_future; -use actix::{Actor, Addr, AsyncContext, Context, Handler, ResponseFuture}; +use actix::{Actor, Addr, AsyncContext, Context, Handler, Recipient, ResponseFuture}; use actix_server::Server; use futures::future; use futures::future::FutureExt; @@ -18,9 +15,23 @@ use slog::{debug, error, info, Logger}; use std::net::SocketAddr; use std::path::PathBuf; +pub mod signals { + use actix::prelude::*; + + #[derive(Message)] + #[rtype(result = "()")] + pub struct PortReadySignal { + pub port: u16, + } + + #[derive(Message)] + #[rtype(result = "()")] + pub struct PortReadySubscribe(pub Recipient); +} + pub struct Config { pub logger: Option, - pub replica_addr: Addr, + pub port_ready_subscribe: Recipient, pub shutdown_controller: Addr, pub bind: SocketAddr, pub providers: Vec, @@ -59,7 +70,7 @@ impl ReplicaWebserverCoordinator { providers.push(replica_api_uri); info!( self.logger, - "Starting webserver on port {} for replica at {:?}", port, ic_replica_bind_addr + "Starting webserver for replica at {:?}", ic_replica_bind_addr ); run_webserver( @@ -76,19 +87,20 @@ impl Actor for ReplicaWebserverCoordinator { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { - self.config - .replica_addr - .do_send(PortReadySubscribe(ctx.address().recipient())); + let _ = self + .config + .port_ready_subscribe + .do_send(signals::PortReadySubscribe(ctx.address().recipient())); self.config .shutdown_controller .do_send(ShutdownSubscribe(ctx.address().recipient::())); } } -impl Handler for ReplicaWebserverCoordinator { +impl Handler for ReplicaWebserverCoordinator { type Result = (); - fn handle(&mut self, msg: ReplicaReadySignal, ctx: &mut Self::Context) { + fn handle(&mut self, msg: signals::PortReadySignal, ctx: &mut Self::Context) { debug!(self.logger, "replica ready on {}", msg.port); if let Some(server) = &self.server { diff --git a/src/dfx/src/commands/replica.rs b/src/dfx/src/commands/replica.rs index 8da68dc9f2..c36232a81a 100644 --- a/src/dfx/src/commands/replica.rs +++ b/src/dfx/src/commands/replica.rs @@ -1,13 +1,12 @@ -use crate::actors; -use crate::actors::shutdown_controller; use crate::actors::shutdown_controller::ShutdownController; +use crate::actors::{start_emulator_actor, start_replica_actor, start_shutdown_controller}; use crate::config::dfinity::ConfigDefaultsReplica; use crate::error_invalid_argument; use crate::lib::environment::Environment; use crate::lib::error::DfxResult; use crate::lib::replica_config::{HttpHandlerConfig, ReplicaConfig, SchedulerConfig}; -use actix::Actor; +use actix::Addr; use clap::Clap; use std::default::Default; @@ -25,6 +24,10 @@ pub struct ReplicaOpts { /// Specifies the maximum number of cycles a single round can consume. #[clap(long, hidden = true)] round_gas_limit: Option, + + /// Runs a dedicated emulator instead of the replica + #[clap(long)] + emulator: bool, } /// Gets the configuration options for the Internet Computer replica. @@ -33,13 +36,16 @@ fn get_config(env: &dyn Environment, opts: ReplicaOpts) -> DfxResult DfxResult, +) -> DfxResult { + let replica_config = get_config(env, opts)?; + start_replica_actor(env, replica_config, shutdown_controller)?; + Ok(()) +} + /// Start the Internet Computer locally. Spawns a proxy to forward and /// manage browser requests. Responsible for running the network (one /// replica at the moment) and the proxy. pub fn exec(env: &dyn Environment, opts: ReplicaOpts) -> DfxResult { - let replica_pathbuf = env.get_cache().get_binary_command_path("replica")?; - let ic_starter_pathbuf = env.get_cache().get_binary_command_path("ic-starter")?; - let system = actix::System::new("dfx-replica"); - let config = get_config(env, opts)?; - - let shutdown_controller = ShutdownController::new(shutdown_controller::Config { - logger: Some(env.get_logger().clone()), - }) - .start(); - - let replica_configuration_dir = env.get_temp_dir().join("replica-configuration"); - std::fs::create_dir_all(&replica_configuration_dir)?; - - let _replica_addr = actors::replica::Replica::new(actors::replica::Config { - ic_starter_path: ic_starter_pathbuf, - replica_config: config, - replica_path: replica_pathbuf, - shutdown_controller, - logger: Some(env.get_logger().clone()), - replica_configuration_dir, - }) - .start(); - + let shutdown_controller = start_shutdown_controller(env)?; + if opts.emulator { + start_emulator_actor(env, shutdown_controller)?; + } else { + start_replica(env, opts, shutdown_controller)?; + } system.run()?; - Ok(()) } diff --git a/src/dfx/src/commands/start.rs b/src/dfx/src/commands/start.rs index 361139a5de..ca55f4fd6d 100644 --- a/src/dfx/src/commands/start.rs +++ b/src/dfx/src/commands/start.rs @@ -1,8 +1,8 @@ use crate::actors; -use crate::actors::replica::Replica; +use crate::actors::replica_webserver_coordinator::signals::PortReadySubscribe; use crate::actors::replica_webserver_coordinator::ReplicaWebserverCoordinator; -use crate::actors::shutdown_controller; use crate::actors::shutdown_controller::ShutdownController; +use crate::actors::{start_emulator_actor, start_replica_actor, start_shutdown_controller}; use crate::config::dfinity::Config; use crate::lib::environment::Environment; use crate::lib::error::{DfxError, DfxResult}; @@ -11,7 +11,7 @@ use crate::lib::provider::get_network_descriptor; use crate::lib::replica_config::ReplicaConfig; use crate::util::get_reusable_socket_addr; -use actix::{Actor, Addr}; +use actix::{Actor, Addr, Recipient}; use anyhow::{anyhow, bail, Context}; use clap::Clap; use delay::{Delay, Waiter}; @@ -38,6 +38,10 @@ pub struct StartOpts { /// Cleans the state of the current project. #[clap(long)] clean: bool, + + /// Runs a dedicated emulator instead of the replica + #[clap(long)] + emulator: bool, } fn ping_and_wait(frontend_url: &str) -> DfxResult { @@ -142,14 +146,27 @@ pub fn exec(env: &dyn Environment, opts: StartOpts) -> DfxResult { let shutdown_controller = start_shutdown_controller(env)?; - let replica = start_replica(env, &state_root, shutdown_controller.clone())?; + let port_ready_subscribe: Recipient = if opts.emulator { + let emulator = start_emulator_actor(env, shutdown_controller.clone())?; + emulator.recipient() + } else { + let replica_port_path = env + .get_temp_dir() + .join("replica-configuration") + .join("replica-1.port"); + + let replica_config = + ReplicaConfig::new(&env.get_state_dir()).with_random_port(&replica_port_path); + let replica = start_replica_actor(env, replica_config, shutdown_controller.clone())?; + replica.recipient() + }; let _webserver_coordinator = start_webserver_coordinator( env, network_descriptor, address_and_port, build_output_root, - replica, + port_ready_subscribe, shutdown_controller, )?; @@ -178,52 +195,12 @@ fn clean_state(temp_dir: &Path, state_root: &Path) -> DfxResult { Ok(()) } -fn start_shutdown_controller(env: &dyn Environment) -> DfxResult> { - let actor_config = shutdown_controller::Config { - logger: Some(env.get_logger().clone()), - }; - Ok(ShutdownController::new(actor_config).start()) -} - -fn start_replica( - env: &dyn Environment, - state_root: &Path, - shutdown_controller: Addr, -) -> DfxResult> { - let replica_path = env.get_cache().get_binary_command_path("replica")?; - let ic_starter_path = env.get_cache().get_binary_command_path("ic-starter")?; - - let temp_dir = env.get_temp_dir(); - let replica_configuration_dir = temp_dir.join("replica-configuration"); - fs::create_dir_all(&replica_configuration_dir)?; - let state_dir = temp_dir.join("state/replicated_state"); - fs::create_dir_all(&state_dir)?; - let replica_port_path = replica_configuration_dir.join("replica-1.port"); - - // Touch the replica port file. This ensures it is empty prior to - // handing it over to the replica. If we read the file and it has - // contents we shall assume it is due to our spawned replica - // process. - std::fs::write(&replica_port_path, "")?; - - let replica_config = ReplicaConfig::new(state_root).with_random_port(&replica_port_path); - let actor_config = actors::replica::Config { - ic_starter_path, - replica_config, - replica_path, - shutdown_controller, - logger: Some(env.get_logger().clone()), - replica_configuration_dir, - }; - Ok(actors::replica::Replica::new(actor_config).start()) -} - fn start_webserver_coordinator( env: &dyn Environment, network_descriptor: NetworkDescriptor, bind: SocketAddr, build_output_root: PathBuf, - replica_addr: Addr, + port_ready_subscribe: Recipient, shutdown_controller: Addr, ) -> DfxResult> { // By default we reach to no external IC nodes. @@ -231,7 +208,7 @@ fn start_webserver_coordinator( let actor_config = actors::replica_webserver_coordinator::Config { logger: Some(env.get_logger().clone()), - replica_addr, + port_ready_subscribe, shutdown_controller, bind, providers,