Skip to content

Commit 1d6dec8

Browse files
: v1/logging: avoid spinning up LogForwardActor mesh when redundant (#1722)
Summary: this diff changes `LoggingMeshClient::spawn` so that log forwarding is no longer unconditional. previously we always spawned a `LogForwardActor` mesh across the `ProcMesh` and stored it in the client. now we read `MESH_ENABLE_LOG_FORWARDING` and only create that mesh if the flag is set. when forwarding is disabled we don’t spawn any `LogForwardActor` actors at all and we record `forwarder_mesh` as `None`. `logger_mesh` (the per-proc `LoggerRuntimeActor` that manages python logging state) is still always spawned. `client_actor` is still spawned locally in the caller's process. because `forwarder_mesh` is now an `Option`, the rest of the API is updated to match the new contract. `set_mode` now either propagates `stream_to_client` to the forwarders when they exist, silently accepts"don’t stream" when they don’t, or returns an error if the caller tries to enable streaming in a configuration where we never created forwarders. `flush` now no-ops successfully in the case where there is no forwarding mesh instead of assuming that those actors are always present. the diff also adds the minimal test harness needed to exercise this logic end-to-end. there is a dedicated bootstrap binary in `monarch_hyperactor` (`monarch_hyperactor_test_bootstrap`) which initializes python and then runs the mesh bootstrap protocol, so remote procs can safely construct `LoggerRuntimeActor `without panicking on `Python::with_gil`. we also add `HostMesh::local_with_bootstrap`, which lets tests stand up a single-host mesh using an explicit `BootstrapCommand` instead of relying on the implicit "current process" path. we also add `ensure_python` and `AwaitPyExt `on the rust side so the tokio tests can stand up a `ProcMesh`, call the python-facing `spawn()`, await the returned `PyPythonTask`, and inspect the resulting `LoggingMeshClient`. finally, there is a test that brings up a tiny mesh, forces the forwarding flag off and on, and in each case asserts that `forwarder_mesh` is `None` or `Some(..)` accordingly. follow-up diffs will extend coverage to `set_mode`, `flush`, and shutdown semantics, now that the plumbing exists. Differential Revision: D85919326
1 parent a89c85f commit 1d6dec8

File tree

7 files changed

+437
-25
lines changed

7 files changed

+437
-25
lines changed

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,18 +216,40 @@ enum HostMeshAllocation {
216216
}
217217

218218
impl HostMesh {
219-
/// Fork a new `HostMesh` from this process, returning the new `HostMesh`
220-
/// to the parent (owning) process, while running forever in child processes
221-
/// (i.e., individual procs).
219+
/// Bring up a local single-host mesh and, in the launcher
220+
/// process, return a `HostMesh` handle for it.
222221
///
223-
/// All of the code preceding the call to `local` will run in each child proc;
224-
/// thus it is important to call `local` early in the lifetime of the program,
225-
/// and to ensure that it is reached unconditionally.
222+
/// There are two execution modes:
226223
///
227-
/// This is intended for testing, development, examples.
224+
/// - bootstrap-child mode: if `Bootstrap::get_from_env()` says
225+
/// this process was launched as a bootstrap child, we call
226+
/// `boot.bootstrap().await`, which hands control to the
227+
/// bootstrap logic for this process (as defined by the
228+
/// `BootstrapCommand` the parent used to spawn it). if that
229+
/// call returns, we log the error and terminate. this branch
230+
/// does not produce a `HostMesh`.
231+
///
232+
/// - launcher mode: otherwise, we are the process that is setting
233+
/// up the mesh. we create a `Host`, spawn a `HostMeshAgent` in
234+
/// it, and build a single-host `HostMesh` around that. that
235+
/// `HostMesh` is returned to the caller.
236+
///
237+
/// This API is intended for tests, examples, and local bring-up,
238+
/// not production.
228239
///
229240
/// TODO: fix up ownership
230241
pub async fn local() -> v1::Result<HostMesh> {
242+
Self::local_with_bootstrap(BootstrapCommand::current()?).await
243+
}
244+
245+
/// Same as [`local`], but the caller supplies the
246+
/// `BootstrapCommand` instead of deriving it from the current
247+
/// process.
248+
///
249+
/// The provided `bootstrap_cmd` is used when spawning bootstrap
250+
/// children and determines the behavior of
251+
/// `boot.bootstrap().await` in those children.
252+
pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> v1::Result<HostMesh> {
231253
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
232254
let err = boot.bootstrap().await;
233255
tracing::error!("failed to bootstrap local host mesh process: {}", err);
@@ -236,7 +258,7 @@ impl HostMesh {
236258

237259
let addr = config::global::get_cloned(DEFAULT_TRANSPORT).any();
238260

239-
let manager = BootstrapProcManager::new(BootstrapCommand::current()?)?;
261+
let manager = BootstrapProcManager::new(bootstrap_cmd)?;
240262
let (host, _handle) = Host::serve(manager, addr).await?;
241263
let addr = host.addr().clone();
242264
let host_mesh_agent = host

monarch_hyperactor/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,process_allocator-oss,test_monarch_hyperactor]
1+
# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,monarch_hyperactor_test_bootstrap,process_allocator-oss,test_monarch_hyperactor]
22

33
[package]
44
name = "monarch_hyperactor"
@@ -7,6 +7,11 @@ authors = ["Meta"]
77
edition = "2021"
88
license = "BSD-3-Clause"
99

10+
[[bin]]
11+
name = "monarch_hyperactor_test_bootstrap"
12+
path = "test/bootstrap.rs"
13+
edition = "2024"
14+
1015
[[bin]]
1116
name = "process_allocator"
1217
edition = "2024"
@@ -52,6 +57,7 @@ tokio-util = { version = "0.7.15", features = ["full"] }
5257
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
5358

5459
[dev-dependencies]
60+
buck-resources = "1"
5561
dir-diff = "0.3"
5662

5763
[features]

monarch_hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub mod selection;
3232
pub mod shape;
3333
pub mod supervision;
3434
pub mod telemetry;
35+
mod testresource;
3536
pub mod v1;
3637
pub mod value_mesh;
3738

monarch_hyperactor/src/pytokio.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ use hyperactor::config::CONFIG;
5050
use hyperactor::config::ConfigAttr;
5151
use monarch_types::SerializablePyErr;
5252
use pyo3::IntoPyObjectExt;
53+
#[cfg(test)]
54+
use pyo3::PyClass;
5355
use pyo3::exceptions::PyRuntimeError;
5456
use pyo3::exceptions::PyStopIteration;
5557
use pyo3::exceptions::PyTimeoutError;
@@ -547,3 +549,52 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul
547549

548550
Ok(())
549551
}
552+
553+
/// Ensure the embedded Python interpreter is initialized exactly
554+
/// once.
555+
///
556+
/// Safe to call from multiple threads, multiple times.
557+
#[cfg(test)]
558+
pub(crate) fn ensure_python() {
559+
static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
560+
INIT.get_or_init(|| {
561+
pyo3::prepare_freethreaded_python();
562+
});
563+
}
564+
565+
#[cfg(test)]
566+
// Helper: let us "await" a `PyPythonTask` in Rust.
567+
//
568+
// Semantics:
569+
// - consume the `PyPythonTask`,
570+
// - take the inner future,
571+
// - `.await` it on tokio to get `Py<PyAny>`,
572+
// - turn that into `Py<T>`.
573+
pub(crate) trait AwaitPyExt {
574+
async fn await_py<T: PyClass>(self) -> Result<Py<T>, PyErr>;
575+
}
576+
577+
#[cfg(test)]
578+
impl AwaitPyExt for PyPythonTask {
579+
async fn await_py<T: PyClass>(mut self) -> Result<Py<T>, PyErr> {
580+
// Take ownership of the inner future.
581+
let fut = self
582+
.take_task()
583+
.expect("PyPythonTask already consumed in await_py");
584+
585+
// Await a Result<Py<PyAny>, PyErr>.
586+
let py_any: Py<PyAny> = fut.await?;
587+
588+
// Convert Py<PyAny> -> Py<T>.
589+
Python::with_gil(|py| {
590+
let bound_any = py_any.bind(py);
591+
592+
// Try extract a Py<T>.
593+
let obj: Py<T> = bound_any
594+
.extract::<Py<T>>()
595+
.expect("spawn() did not return expected Python type");
596+
597+
Ok(obj)
598+
})
599+
}
600+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#![cfg(test)]
10+
11+
use std::path::PathBuf;
12+
13+
/// Fetch the named (BUCK) named resource, heuristically falling back on
14+
/// the cargo-built path when possible. Beware! This is not actually a
15+
/// true cargo dependency, so the binaries have to be built independently.
16+
///
17+
/// We should convert these tests to integration tests, so that cargo can
18+
/// also manage the binaries.
19+
pub fn get<S>(name: S) -> PathBuf
20+
where
21+
S: AsRef<str>,
22+
{
23+
let name = name.as_ref().to_owned();
24+
// TODO: actually check if we're running in Buck context or not.
25+
if let Ok(path) = buck_resources::get(name.clone()) {
26+
return path;
27+
}
28+
29+
assert!(
30+
name.starts_with("monarch/monarch_hyperactor/"),
31+
"invalid resource {}: must start with \"monarch/monarch_hyperactor/\"",
32+
name
33+
);
34+
35+
let path: PathBuf = name
36+
.replace(
37+
"monarch/monarch_hyperactor/",
38+
"../target/debug/monarch_hyperactor_test_",
39+
)
40+
.into();
41+
42+
assert!(
43+
path.exists(),
44+
"no cargo-built resource at {}",
45+
path.display()
46+
);
47+
48+
path
49+
}

0 commit comments

Comments
 (0)