diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index ee800676a..8078228ef 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -216,18 +216,40 @@ enum HostMeshAllocation { } impl HostMesh { - /// Fork a new `HostMesh` from this process, returning the new `HostMesh` - /// to the parent (owning) process, while running forever in child processes - /// (i.e., individual procs). + /// Bring up a local single-host mesh and, in the launcher + /// process, return a `HostMesh` handle for it. /// - /// All of the code preceding the call to `local` will run in each child proc; - /// thus it is important to call `local` early in the lifetime of the program, - /// and to ensure that it is reached unconditionally. + /// There are two execution modes: /// - /// This is intended for testing, development, examples. + /// - bootstrap-child mode: if `Bootstrap::get_from_env()` says + /// this process was launched as a bootstrap child, we call + /// `boot.bootstrap().await`, which hands control to the + /// bootstrap logic for this process (as defined by the + /// `BootstrapCommand` the parent used to spawn it). if that + /// call returns, we log the error and terminate. this branch + /// does not produce a `HostMesh`. + /// + /// - launcher mode: otherwise, we are the process that is setting + /// up the mesh. we create a `Host`, spawn a `HostMeshAgent` in + /// it, and build a single-host `HostMesh` around that. that + /// `HostMesh` is returned to the caller. + /// + /// This API is intended for tests, examples, and local bring-up, + /// not production. /// /// TODO: fix up ownership pub async fn local() -> v1::Result { + Self::local_with_bootstrap(BootstrapCommand::current()?).await + } + + /// Same as [`local`], but the caller supplies the + /// `BootstrapCommand` instead of deriving it from the current + /// process. + /// + /// The provided `bootstrap_cmd` is used when spawning bootstrap + /// children and determines the behavior of + /// `boot.bootstrap().await` in those children. + pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> v1::Result { if let Ok(Some(boot)) = Bootstrap::get_from_env() { let err = boot.bootstrap().await; tracing::error!("failed to bootstrap local host mesh process: {}", err); @@ -236,7 +258,7 @@ impl HostMesh { let addr = config::global::get_cloned(DEFAULT_TRANSPORT).any(); - let manager = BootstrapProcManager::new(BootstrapCommand::current()?)?; + let manager = BootstrapProcManager::new(bootstrap_cmd)?; let (host, _handle) = Host::serve(manager, addr).await?; let addr = host.addr().clone(); let host_mesh_agent = host diff --git a/monarch_hyperactor/Cargo.toml b/monarch_hyperactor/Cargo.toml index 928cb8726..bb7afedf2 100644 --- a/monarch_hyperactor/Cargo.toml +++ b/monarch_hyperactor/Cargo.toml @@ -1,4 +1,4 @@ -# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,process_allocator-oss,test_monarch_hyperactor] +# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,monarch_hyperactor_test_bootstrap,process_allocator-oss,test_monarch_hyperactor] [package] name = "monarch_hyperactor" @@ -7,6 +7,11 @@ authors = ["Meta"] edition = "2021" license = "BSD-3-Clause" +[[bin]] +name = "monarch_hyperactor_test_bootstrap" +path = "test/bootstrap.rs" +edition = "2024" + [[bin]] name = "process_allocator" edition = "2024" @@ -52,6 +57,7 @@ tokio-util = { version = "0.7.15", features = ["full"] } tracing = { version = "0.1.41", features = ["attributes", "valuable"] } [dev-dependencies] +buck-resources = "1" dir-diff = "0.3" [features] diff --git a/monarch_hyperactor/src/lib.rs b/monarch_hyperactor/src/lib.rs index 658982c64..14e4fa445 100644 --- a/monarch_hyperactor/src/lib.rs +++ b/monarch_hyperactor/src/lib.rs @@ -32,6 +32,7 @@ pub mod selection; pub mod shape; pub mod supervision; pub mod telemetry; +mod testresource; pub mod v1; pub mod value_mesh; diff --git a/monarch_hyperactor/src/pytokio.rs b/monarch_hyperactor/src/pytokio.rs index b0fd6daef..da2da08a7 100644 --- a/monarch_hyperactor/src/pytokio.rs +++ b/monarch_hyperactor/src/pytokio.rs @@ -50,6 +50,8 @@ use hyperactor::config::CONFIG; use hyperactor::config::ConfigAttr; use monarch_types::SerializablePyErr; use pyo3::IntoPyObjectExt; +#[cfg(test)] +use pyo3::PyClass; use pyo3::exceptions::PyRuntimeError; use pyo3::exceptions::PyStopIteration; use pyo3::exceptions::PyTimeoutError; @@ -547,3 +549,72 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul Ok(()) } + +/// Ensure the embedded Python interpreter is initialized exactly +/// once. +/// +/// Safe to call from multiple threads, multiple times. +#[cfg(test)] +pub(crate) fn ensure_python() { + static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + INIT.get_or_init(|| { + pyo3::prepare_freethreaded_python(); + }); +} + +#[cfg(test)] +// Helper: let us "await" a `PyPythonTask` in Rust. +// +// Semantics: +// - consume the `PyPythonTask`, +// - take the inner future, +// - `.await` it on tokio to get `Py`, +// - turn that into `Py`. +pub(crate) trait AwaitPyExt { + async fn await_py(self) -> Result, PyErr>; + + // For tasks whose future just resolves to (), i.e. no object, + // just "did it work?" + async fn await_unit(self) -> Result<(), PyErr>; +} + +#[cfg(test)] +impl AwaitPyExt for PyPythonTask { + async fn await_py(mut self) -> Result, PyErr> { + // Take ownership of the inner future. + let fut = self + .take_task() + .expect("PyPythonTask already consumed in await_py"); + + // Await a Result, PyErr>. + let py_any: Py = fut.await?; + + // Convert Py -> Py. + Python::with_gil(|py| { + let bound_any = py_any.bind(py); + + // Try extract a Py. + let obj: Py = bound_any + .extract::>() + .expect("spawn() did not return expected Python type"); + + Ok(obj) + }) + } + + async fn await_unit(mut self) -> Result<(), PyErr> { + let fut = self + .take_task() + .expect("PyPythonTask already consumed in await_unit"); + + // Await it. This still gives us a Py because + // Python-side return values are always materialized as 'some + // object'. For "no value" / None, that's just a PyAny(None). + let py_any: Py = fut.await?; + + // We don't need to extract anything. Just drop it. + drop(py_any); + + Ok(()) + } +} diff --git a/monarch_hyperactor/src/testresource.rs b/monarch_hyperactor/src/testresource.rs new file mode 100644 index 000000000..20aef8156 --- /dev/null +++ b/monarch_hyperactor/src/testresource.rs @@ -0,0 +1,49 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#![cfg(test)] + +use std::path::PathBuf; + +/// Fetch the named (BUCK) named resource, heuristically falling back on +/// the cargo-built path when possible. Beware! This is not actually a +/// true cargo dependency, so the binaries have to be built independently. +/// +/// We should convert these tests to integration tests, so that cargo can +/// also manage the binaries. +pub fn get(name: S) -> PathBuf +where + S: AsRef, +{ + let name = name.as_ref().to_owned(); + // TODO: actually check if we're running in Buck context or not. + if let Ok(path) = buck_resources::get(name.clone()) { + return path; + } + + assert!( + name.starts_with("monarch/monarch_hyperactor/"), + "invalid resource {}: must start with \"monarch/monarch_hyperactor/\"", + name + ); + + let path: PathBuf = name + .replace( + "monarch/monarch_hyperactor/", + "../target/debug/monarch_hyperactor_test_", + ) + .into(); + + assert!( + path.exists(), + "no cargo-built resource at {}", + path.display() + ); + + path +} diff --git a/monarch_hyperactor/src/v1/logging.rs b/monarch_hyperactor/src/v1/logging.rs index c6c39269d..4274ef4ab 100644 --- a/monarch_hyperactor/src/v1/logging.rs +++ b/monarch_hyperactor/src/v1/logging.rs @@ -12,6 +12,7 @@ use std::ops::Deref; use hyperactor::ActorHandle; use hyperactor::context; +use hyperactor_mesh::bootstrap::MESH_ENABLE_LOG_FORWARDING; use hyperactor_mesh::logging::LogClientActor; use hyperactor_mesh::logging::LogClientMessage; use hyperactor_mesh::logging::LogForwardActor; @@ -31,20 +32,109 @@ use crate::logging::LoggerRuntimeMessage; use crate::pytokio::PyPythonTask; use crate::v1::proc_mesh::PyProcMesh; +/// `LoggingMeshClient` is the Python-facing handle for distributed +/// logging over a `ProcMesh`. +/// +/// Calling `spawn(...)` builds three pieces of logging infra: +/// +/// - `client_actor`: a single `LogClientActor` running in the +/// *local* process. It aggregates forwarded stdout/stderr, +/// batches it, and coordinates sync flush barriers. +/// +/// - `forwarder_mesh`: (optional) an `ActorMesh` +/// with one actor per remote proc. Each `LogForwardActor` sits in +/// that proc and forwards its stdout/stderr back to the client. +/// This mesh only exists if `MESH_ENABLE_LOG_FORWARDING` was `true` +/// at startup; otherwise it's `None` and we never spawn any +/// forwarders. +/// +/// - `logger_mesh`: an `ActorMesh` with one +/// actor per remote proc. Each `LoggerRuntimeActor` controls that +/// proc's Python logging runtime (log level, handlers, etc.). +/// This mesh is always created, even if forwarding is disabled. +/// +/// The Python object you get back holds references to all of this so +/// that you can: +/// - toggle streaming vs "stay quiet" (`set_mode(...)`), +/// - adjust the per-proc Python log level (`set_mode(...)`), +/// - force a sync flush of forwarded output and wait for completion +/// (`flush(...)`). +/// +/// Drop semantics: +/// Dropping the Python handle runs `Drop` on this Rust struct, +/// which drains/stops the local `LogClientActor` but does *not* +/// synchronously tear down the per-proc meshes. The remote +/// `LogForwardActor` / `LoggerRuntimeActor` instances keep running +/// until the remote procs themselves are shut down (e.g. via +/// `host_mesh.shutdown(...)` in tests). #[pyclass( frozen, name = "LoggingMeshClient", module = "monarch._rust_bindings.monarch_hyperactor.v1.logging" )] pub struct LoggingMeshClient { - // handles remote process log forwarding; no python runtime - forwarder_mesh: ActorMesh, - // handles python logger; has python runtime + // Per-proc LogForwardActor mesh (optional). When enabled, each + // remote proc forwards its stdout/stderr back to the client. This + // actor does not interact with the embedded Python runtime. + forwarder_mesh: Option>, + + // Per-proc LoggerRuntimeActor mesh. One LoggerRuntimeActor runs + // on every proc in the mesh and is responsible for driving that + // proc's Python logging configuration (log level, handlers, + // etc.). + // + // `set_mode(..)` always broadcasts the requested log level to + // this mesh, regardless of whether stdout/stderr forwarding is + // enabled. + // + // Even on a proc that isn't meaningfully running Python code, we + // still spawn LoggerRuntimeActor and it will still apply the new + // level to that proc's Python logger. In that case, updating the + // level may have no visible effect simply because nothing on that + // proc ever emits logs through Python's `logging` module. logger_mesh: ActorMesh, + + // Client-side LogClientActor. Lives in the client process; + // receives forwarded output, aggregates and buffers it, and + // coordinates sync flush barriers. client_actor: ActorHandle, } impl LoggingMeshClient { + /// Drive a synchronous "drain all logs now" barrier across the + /// mesh. + /// + /// Protocol: + /// 1. Tell the local `LogClientActor` we're starting a sync + /// flush. We give it: + /// - how many procs we expect to hear from + /// (`expected_procs`), + /// - a `reply` port it will use to signal completion, + /// - a `version` port it will use to hand us a flush version + /// token. After this send, the client_actor is now in "sync + /// flush vN" mode. + /// + /// 2. Wait for that version token from the client. This tells + /// us which flush epoch we're coordinating + /// (`version_rx.recv()`). + /// + /// 3. Broadcast `ForceSyncFlush { version }` to every + /// `LogForwardActor` in the `forwarder_mesh`. Each forwarder + /// tells its proc-local logger/forwarding loop: "flush + /// everything you have for this version now, then report + /// back." + /// + /// 4. Wait on `reply_rx`. The `LogClientActor` only replies + /// once it has: + /// - received the per-proc sync points for this version from + /// all forwarders, + /// - emitted/forwarded their buffered output, + /// - and finished flushing its own buffers. + /// + /// When this returns `Ok(())`, all stdout/stderr that existed at + /// the moment we kicked off the flush has been forwarded to the + /// client and drained. This is used by + /// `LoggingMeshClient.flush()`. async fn flush_internal( cx: &impl context::Actor, client_actor: ActorHandle, @@ -62,10 +152,12 @@ impl LoggingMeshClient { let version = version_rx.recv().await?; - // Then ask all the flushers to ask the log forwarders to sync flush + // Then ask all the flushers to ask the log forwarders to sync + // flush forwarder_mesh.cast(cx, LogForwardMessage::ForceSyncFlush { version })?; - // Finally the forwarder will send sync point back to the client, flush, and return. + // Finally the forwarder will send sync point back to the + // client, flush, and return. reply_rx.recv().await?; Ok(()) @@ -74,11 +166,40 @@ impl LoggingMeshClient { #[pymethods] impl LoggingMeshClient { + /// Initialize logging for a `ProcMesh` and return a + /// `LoggingMeshClient`. + /// + /// This wires up three pieces of logging infrastructure: + /// + /// 1. A single `LogClientActor` in the *client* process. This + /// actor receives forwarded stdout/stderr, buffers and + /// aggregates it, and coordinates sync flush barriers. + /// + /// 2. (Optional) A `LogForwardActor` on every remote proc in the + /// mesh. These forwarders read that proc's stdout/stderr and + /// stream it back to the client. We only spawn this mesh if + /// `MESH_ENABLE_LOG_FORWARDING` was `true` in the config. If + /// forwarding is disabled at startup, we do not spawn these + /// actors and `forwarder_mesh` will be `None`. + /// + /// 3. A `LoggerRuntimeActor` on every remote proc in the mesh. + /// This actor controls the Python logging runtime (log level, + /// handlers, etc.) in that process. This is always spawned, + /// even if log forwarding is disabled. + /// + /// The returned `LoggingMeshClient` holds handles to those + /// actors. Later, `set_mode(...)` can adjust per-proc log level + /// and (if forwarding was enabled) toggle whether remote output + /// is actually streamed back to the client. If forwarding was + /// disabled by config, requests to enable streaming will fail. #[staticmethod] fn spawn(instance: &PyInstance, proc_mesh: &PyProcMesh) -> PyResult { let proc_mesh = proc_mesh.mesh_ref()?; let instance = instance.clone(); + PyPythonTask::new(async move { + // 1. Spawn the client-side coordinator actor (lives in + // the caller's process). let client_actor: ActorHandle = instance_dispatch!(instance, async move |cx_instance| { cx_instance @@ -87,12 +208,28 @@ impl LoggingMeshClient { .await })?; let client_actor_ref = client_actor.bind(); - let forwarder_mesh = instance_dispatch!(instance, async |cx_instance| { - proc_mesh - .spawn(cx_instance, "log_forwarder", &client_actor_ref) - .await - }) - .map_err(anyhow::Error::from)?; + + // Read config to decide if we stand up per-proc + // stdout/stderr forwarding. + let forwarding_enabled = hyperactor::config::global::get(MESH_ENABLE_LOG_FORWARDING); + + // 2. Optionally spawn per-proc `LogForwardActor` mesh + // (stdout/stderr forwarders). + let forwarder_mesh = if forwarding_enabled { + // Spawn a `LogFwdActor` on every proc. + let mesh = instance_dispatch!(instance, async |cx_instance| { + proc_mesh + .spawn(cx_instance, "log_forwarder", &client_actor_ref) + .await + }) + .map_err(anyhow::Error::from)?; + + Some(mesh) + } else { + None + }; + + // 3. Always spawn a `LoggerRuntimeActor` on every proc. let logger_mesh = instance_dispatch!(instance, async |cx_instance| { proc_mesh.spawn(cx_instance, "logger", &()).await }) @@ -106,6 +243,34 @@ impl LoggingMeshClient { }) } + /// Update logging behavior for this mesh. + /// + /// `stream_to_client` controls whether remote procs actively + /// stream their stdout/stderr back to the client process. + /// + /// - If log forwarding was enabled at startup, `forwarder_mesh` + /// is `Some` and we propagate this flag to every per-proc + /// `LogForwardActor`. + /// - If log forwarding was disabled at startup, `forwarder_mesh` + /// is `None`. + /// In that case: + /// * requesting `stream_to_client = false` is a no-op + /// (accepted), + /// * requesting `stream_to_client = true` is rejected, + /// because we did not spawn forwarders and we don't + /// dynamically create them later. + /// + /// `aggregate_window_sec` configures how the client-side + /// `LogClientActor` batches forwarded output. It is only + /// meaningful when streaming is enabled. Calling this with + /// `Some(..)` while `stream_to_client == false` is invalid and + /// returns an error. + /// + /// `level` is the desired Python logging level. We always + /// broadcast this to the per-proc `LoggerRuntimeActor` mesh so + /// each remote process can update its own Python logger + /// configuration, regardless of whether stdout/stderr forwarding + /// is active. fn set_mode( &self, instance: &PyInstance, @@ -113,24 +278,52 @@ impl LoggingMeshClient { aggregate_window_sec: Option, level: u8, ) -> PyResult<()> { + // We can't ask for an aggregation window if we're not + // streaming. if aggregate_window_sec.is_some() && !stream_to_client { return Err(PyErr::new::( "cannot set aggregate window without streaming to client".to_string(), )); } - instance_dispatch!(instance, |cx_instance| { - self.forwarder_mesh - .cast(cx_instance, LogForwardMessage::SetMode { stream_to_client }) - }) - .map_err(|e| PyErr::new::(e.to_string()))?; + // Handle the forwarder side (stdout/stderr streaming back to + // client). + match (&self.forwarder_mesh, stream_to_client) { + // Forwarders exist (config enabled at startup). We can + // toggle live. + (Some(fwd_mesh), _) => { + instance_dispatch!(instance, |cx_instance| { + fwd_mesh.cast(cx_instance, LogForwardMessage::SetMode { stream_to_client }) + }) + .map_err(|e| PyErr::new::(e.to_string()))?; + } + + // Forwarders were never spawned (global forwarding + // disabled) and the caller is asking NOT to stream. + // That's effectively a no-op so we silently accept. + (None, false) => { + // Nothing to do. + } + // Forwarders were never spawned, but caller is asking to + // stream. We can't satisfy this request without + // re-spawning infra, which we deliberately don't do at + // runtime. + (None, true) => { + return Err(PyErr::new::( + "log forwarding disabled by config at startup; cannot enable streaming_to_client", + )); + } + } + + // Always update the per-proc Python logging level. instance_dispatch!(instance, |cx_instance| { self.logger_mesh .cast(cx_instance, LoggerRuntimeMessage::SetLogging { level }) }) .map_err(|e| PyErr::new::(e.to_string()))?; + // Always update the client actor's aggregation window. self.client_actor .send(LogClientMessage::SetAggregate { aggregate_window_sec, @@ -140,13 +333,28 @@ impl LoggingMeshClient { Ok(()) } - // A sync flush mechanism for the client make sure all the stdout/stderr are streamed back and flushed. + /// Force a sync flush of remote stdout/stderr back to the client, + /// and wait for completion. + /// + /// If log forwarding was disabled at startup (so we never spawned + /// any `LogForwardActor`s), this becomes a no-op success: there's + /// nothing to flush from remote procs in that mode, and we don't + /// try to manufacture it dynamically. fn flush(&self, instance: &PyInstance) -> PyResult { - let forwarder_mesh = self.forwarder_mesh.deref().clone(); + let forwarder_mesh_opt = self + .forwarder_mesh + .as_ref() + .map(|mesh| mesh.deref().clone()); let client_actor = self.client_actor.clone(); let instance = instance.clone(); PyPythonTask::new(async move { + // If there's no forwarer mesh (forwarding disabled by + // config), we just succeed immediately. + let Some(forwarder_mesh) = forwarder_mesh_opt else { + return Ok(()); + }; + instance_dispatch!(instance, async move |cx_instance| { Self::flush_internal(cx_instance, client_actor, forwarder_mesh).await }) @@ -155,6 +363,47 @@ impl LoggingMeshClient { } } +// NOTE ON LIFECYCLE / CLEANUP +// +// `LoggingMeshClient` is a thin owner for three pieces of logging +// infra: +// +// - `client_actor`: a single `LogClientActor` in the *local* +// process. +// - `forwarder_mesh`: (optional) an `ActorMesh` +// with one actor per remote proc in the `ProcMesh`, responsible for +// forwarding that proc's stdout/stderr back to the client. +// - `logger_mesh`: an `ActorMesh` with one +// actor per remote proc, responsible for driving that proc's Python +// logging configuration. +// +// The Python-facing handle we hand back to callers is a +// `Py`. When that handle is dropped (or goes out +// of scope in a test), PyO3 will run `Drop` for `LoggingMeshClient`. +// +// Important: +// +// - In `Drop` we *only* call `drain_and_stop()` on the local +// `LogClientActor`. This asks the client-side aggregator to +// flush/stop so we don't leave a local task running. +// - We do NOT synchronously tear down the per-proc meshes here. +// Dropping `forwarder_mesh` / `logger_mesh` just releases our +// handles; the actual `LogForwardActor` / `LoggerRuntimeActor` +// instances keep running on the remote procs until those procs are +// shut down. +// +// This is fine in tests because we always shut the world down +// afterward via `host_mesh.shutdown(&instance)`, which tears down the +// spawned procs and all actors running in them. In other words: +// +// drop(Py) +// → stops the local `LogClientActor`, drops mesh handles +// host_mesh.shutdown(...) +// → kills the remote procs, which takes out the per-proc actors +// +// If you reuse this type outside tests, keep in mind that simply +// dropping `LoggingMeshClient` does *not* on its own tear down the +// remote logging actors; it only stops the local client actor. impl Drop for LoggingMeshClient { fn drop(&mut self) { match self.client_actor.drain_and_stop() { @@ -167,7 +416,320 @@ impl Drop for LoggingMeshClient { } } +/// Register the Python-facing types for this module. +/// +/// `pyo3` calls this when building `monarch._rust_bindings...`. We +/// expose `LoggingMeshClient` so that Python can construct it and +/// call its methods (`spawn`, `set_mode`, `flush`, ...). pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> { module.add_class::()?; Ok(()) } + +#[cfg(test)] +mod tests { + use anyhow::Result; + use hyperactor::Instance; + use hyperactor::channel::ChannelTransport; + use hyperactor::proc::Proc; + use hyperactor_mesh::v1::ProcMesh; + use hyperactor_mesh::v1::host_mesh::HostMesh; + use ndslice::Extent; + use ndslice::View; // .region(), .num_ranks() etc. + + use super::*; + use crate::pytokio::AwaitPyExt; + use crate::pytokio::ensure_python; + + /// Bring up a minimal "world" suitable for integration-style + /// tests. + pub async fn test_world() -> Result<(Proc, Instance<()>, HostMesh, ProcMesh)> { + ensure_python(); + + let proc = Proc::direct(ChannelTransport::Unix.any(), "root".to_string()) + .await + .expect("failed to start root Proc"); + + let (instance, _handle) = proc + .instance("client") + .expect("failed to create proc Instance"); + + let host_mesh = HostMesh::local_with_bootstrap( + crate::testresource::get("monarch/monarch_hyperactor/bootstrap").into(), + ) + .await + .expect("failed to bootstrap HostMesh"); + + let proc_mesh = host_mesh + .spawn(&instance, "p0", Extent::unity()) + .await + .expect("failed to spawn ProcMesh"); + + Ok((proc, instance, host_mesh, proc_mesh)) + } + + #[tokio::test] + async fn test_world_smoke() { + let (proc, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed"); + + assert_eq!( + host_mesh.region().num_ranks(), + 1, + "should allocate exactly one host" + ); + assert_eq!( + proc_mesh.region().num_ranks(), + 1, + "should spawn exactly one proc" + ); + assert_eq!( + instance.self_id().proc_id(), + proc.proc_id(), + "returned Instance<()> should be bound to the root Proc" + ); + + host_mesh.shutdown(&instance).await.expect("host shutdown"); + } + + #[tokio::test] + async fn spawn_respects_forwarding_flag() { + let (_, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed"); + + let py_instance = PyInstance::from(&instance); + let py_proc_mesh = PyProcMesh::new_owned(proc_mesh); + let lock = hyperactor::config::global::lock(); + + // Case 1: forwarding disabled => `forwarder_mesh` should be `None`. + { + let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding disabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding disabled)"); + + Python::with_gil(|py| { + let client_ref = client_py.borrow(py); + assert!( + client_ref.forwarder_mesh.is_none(), + "forwarder_mesh should be None when forwarding disabled" + ); + }); + + drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP" + } + + // Case 2: forwarding enabled => `forwarder_mesh` should be `Some`. + { + let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding enabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding enabled)"); + + Python::with_gil(|py| { + let client_ref = client_py.borrow(py); + assert!( + client_ref.forwarder_mesh.is_some(), + "forwarder_mesh should be Some(..) when forwarding is enabled" + ); + }); + + drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP" + } + + host_mesh.shutdown(&instance).await.expect("host shutdown"); + } + + #[tokio::test] + async fn set_mode_behaviors() { + let (_proc, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed"); + + let py_instance = PyInstance::from(&instance); + let py_proc_mesh = PyProcMesh::new_owned(proc_mesh); + let lock = hyperactor::config::global::lock(); + + // Case 1: forwarding disabled => `forwarder_mesh.is_none()`. + { + let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding disabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding disabled)"); + + Python::with_gil(|py| { + let client_ref = client_py.borrow(py); + + // (a) stream_to_client = false, no aggregate window + // -> OK + let res = client_ref.set_mode(&py_instance, false, None, 10); + assert!(res.is_ok(), "expected Ok(..), got {res:?}"); + + // (b) stream_to_client = false, + // aggregate_window_sec.is_some() -> Err = Some(..) -> + // Err + let res = client_ref.set_mode(&py_instance, false, Some(1), 10); + assert!( + res.is_err(), + "expected Err(..) for window without streaming" + ); + if let Err(e) = res { + let msg = e.to_string(); + assert!( + msg.contains("cannot set aggregate window without streaming to client"), + "unexpected err for aggregate_window without streaming: {msg}" + ); + } + + // (c) stream_to_client = true when forwarding was + // never spawned -> Err + let res = client_ref.set_mode(&py_instance, true, None, 10); + assert!( + res.is_err(), + "expected Err(..) when enabling streaming but no forwarders" + ); + if let Err(e) = res { + let msg = e.to_string(); + assert!( + msg.contains("log forwarding disabled by config at startup"), + "unexpected err when enabling streaming with no forwarders: {msg}" + ); + } + }); + + drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP" + } + + // Case 2: forwarding enabled => `forwarder_mesh.is_some()`. + { + let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding enabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding enabled)"); + + Python::with_gil(|py| { + let client_ref = client_py.borrow(py); + + // (d) stream_to_client = true, aggregate_window_sec = + // Some(..) -> OK now that we *do* have forwarders, + // enabling streaming should succeed. + let res = client_ref.set_mode(&py_instance, true, Some(2), 20); + assert!( + res.is_ok(), + "expected Ok(..) enabling streaming w/ window: {res:?}" + ); + + // (e) aggregate_window_sec = Some(..) but + // stream_to_client = false -> still Err (this + // rule doesn't care about forwarding being + // enabled or not). + let res = client_ref.set_mode(&py_instance, false, Some(2), 20); + assert!( + res.is_err(), + "expected Err(..) for window without streaming even w/ forwarders" + ); + if let Err(e) = res { + let msg = e.to_string(); + assert!( + msg.contains("cannot set aggregate window without streaming to client"), + "unexpected err when setting window but disabling streaming: {msg}" + ); + } + }); + + drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP" + } + + host_mesh.shutdown(&instance).await.expect("host shutdown"); + } + + #[tokio::test] + async fn flush_behaviors() { + let (_proc, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed"); + + let py_instance = PyInstance::from(&instance); + let py_proc_mesh = PyProcMesh::new_owned(proc_mesh); + let lock = hyperactor::config::global::lock(); + + // Case 1: forwarding disabled => `forwarder_mesh.is_none()`. + { + let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, false); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding disabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding disabled)"); + + // Call flush() and bring the PyPythonTask back out. + let flush_task = Python::with_gil(|py| { + let client_ref = client_py.borrow(py); + client_ref + .flush(&py_instance) + .expect("flush() PyPythonTask (forwarding disabled)") + }); + + // Await the returned PyPythonTask's future outside the + // GIL. + let flush_result = flush_task + .await_unit() + .await + .expect("flush failed (forwarding disabled)"); + + let _ = flush_result; + drop(client_py); // See "NOTE ON LIFECYCLE / CLEANUP" + } + + // Case 2: forwarding enabled => `forwarder_mesh.is_some()`. + { + let _guard = lock.override_key(MESH_ENABLE_LOG_FORWARDING, true); + + let client_task = LoggingMeshClient::spawn(&py_instance, &py_proc_mesh) + .expect("spawn PyPythonTask (forwarding enabled)"); + + let client_py: Py = client_task + .await_py() + .await + .expect("spawn failed (forwarding enabled)"); + + // Call flush() to exercise the barrier path, and pull the + // PyPythonTask out. + let flush_task = Python::with_gil(|py| { + client_py + .borrow(py) + .flush(&py_instance) + .expect("flush() PyPythonTask (forwarding enabled)") + }); + + // Await the returned PyPythonTask's future outside the + // GIL. + let flush_result = flush_task + .await_unit() + .await + .expect("flush failed (forwarding enabled)"); + + let _ = flush_result; + drop(client_py); // See note "NOTE ON LIFECYCLE / CLEANUP" + } + + host_mesh.shutdown(&instance).await.expect("host shutdown"); + } +} diff --git a/monarch_hyperactor/test/bootstrap.rs b/monarch_hyperactor/test/bootstrap.rs new file mode 100644 index 000000000..d141fc92c --- /dev/null +++ b/monarch_hyperactor/test/bootstrap.rs @@ -0,0 +1,31 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +use monarch_hyperactor as _; // Avoid "unused depdency" lint. + +#[tokio::main] +async fn main() { + // This causes folly to intercept SIGTERM. When run in + // '@fbcode//mode/dev-nosan' that translates into SEGFAULTs. + hyperactor::initialize_with_current_runtime(); + // SAFETY: Does not derefrence pointers or rely on undefined + // memory. No other threads are likely to be modifying it + // concurrently. + unsafe { + libc::signal(libc::SIGTERM, libc::SIG_DFL); + } + + // Initialize the embedded Python interpreter before any actor + // code runs. Some per-proc actors (e.g. LoggerRuntimeActor) call + // into Python during `new()`. If Python isn't initialized yet, + // PyO3 will panic ("The Python interpreter is not initialized"). + pyo3::prepare_freethreaded_python(); + + // Enter the hyperactor-mesh bootstrap protocol. + hyperactor_mesh::bootstrap_or_die().await; +}