Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions hyperactor_mesh/src/v1/host_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,40 @@ enum HostMeshAllocation {
}

impl HostMesh {
/// Fork a new `HostMesh` from this process, returning the new `HostMesh`
/// to the parent (owning) process, while running forever in child processes
/// (i.e., individual procs).
/// Bring up a local single-host mesh and, in the launcher
/// process, return a `HostMesh` handle for it.
///
/// All of the code preceding the call to `local` will run in each child proc;
/// thus it is important to call `local` early in the lifetime of the program,
/// and to ensure that it is reached unconditionally.
/// There are two execution modes:
///
/// This is intended for testing, development, examples.
/// - bootstrap-child mode: if `Bootstrap::get_from_env()` says
/// this process was launched as a bootstrap child, we call
/// `boot.bootstrap().await`, which hands control to the
/// bootstrap logic for this process (as defined by the
/// `BootstrapCommand` the parent used to spawn it). if that
/// call returns, we log the error and terminate. this branch
/// does not produce a `HostMesh`.
///
/// - launcher mode: otherwise, we are the process that is setting
/// up the mesh. we create a `Host`, spawn a `HostMeshAgent` in
/// it, and build a single-host `HostMesh` around that. that
/// `HostMesh` is returned to the caller.
///
/// This API is intended for tests, examples, and local bring-up,
/// not production.
///
/// TODO: fix up ownership
pub async fn local() -> v1::Result<HostMesh> {
Self::local_with_bootstrap(BootstrapCommand::current()?).await
}

/// Same as [`local`], but the caller supplies the
/// `BootstrapCommand` instead of deriving it from the current
/// process.
///
/// The provided `bootstrap_cmd` is used when spawning bootstrap
/// children and determines the behavior of
/// `boot.bootstrap().await` in those children.
pub async fn local_with_bootstrap(bootstrap_cmd: BootstrapCommand) -> v1::Result<HostMesh> {
if let Ok(Some(boot)) = Bootstrap::get_from_env() {
let err = boot.bootstrap().await;
tracing::error!("failed to bootstrap local host mesh process: {}", err);
Expand All @@ -236,7 +258,7 @@ impl HostMesh {

let addr = config::global::get_cloned(DEFAULT_TRANSPORT).any();

let manager = BootstrapProcManager::new(BootstrapCommand::current()?)?;
let manager = BootstrapProcManager::new(bootstrap_cmd)?;
let (host, _handle) = Host::serve(manager, addr).await?;
let addr = host.addr().clone();
let host_mesh_agent = host
Expand Down
8 changes: 7 additions & 1 deletion monarch_hyperactor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,process_allocator-oss,test_monarch_hyperactor]
# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,monarch_hyperactor_test_bootstrap,process_allocator-oss,test_monarch_hyperactor]

[package]
name = "monarch_hyperactor"
Expand All @@ -7,6 +7,11 @@ authors = ["Meta"]
edition = "2021"
license = "BSD-3-Clause"

[[bin]]
name = "monarch_hyperactor_test_bootstrap"
path = "test/bootstrap.rs"
edition = "2024"

[[bin]]
name = "process_allocator"
edition = "2024"
Expand Down Expand Up @@ -52,6 +57,7 @@ tokio-util = { version = "0.7.15", features = ["full"] }
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }

[dev-dependencies]
buck-resources = "1"
dir-diff = "0.3"

[features]
Expand Down
1 change: 1 addition & 0 deletions monarch_hyperactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod selection;
pub mod shape;
pub mod supervision;
pub mod telemetry;
mod testresource;
pub mod v1;
pub mod value_mesh;

Expand Down
71 changes: 71 additions & 0 deletions monarch_hyperactor/src/pytokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use hyperactor::config::CONFIG;
use hyperactor::config::ConfigAttr;
use monarch_types::SerializablePyErr;
use pyo3::IntoPyObjectExt;
#[cfg(test)]
use pyo3::PyClass;
use pyo3::exceptions::PyRuntimeError;
use pyo3::exceptions::PyStopIteration;
use pyo3::exceptions::PyTimeoutError;
Expand Down Expand Up @@ -547,3 +549,72 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul

Ok(())
}

/// Ensure the embedded Python interpreter is initialized exactly
/// once.
///
/// Safe to call from multiple threads, multiple times.
#[cfg(test)]
pub(crate) fn ensure_python() {
static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
INIT.get_or_init(|| {
pyo3::prepare_freethreaded_python();
});
}

#[cfg(test)]
// Helper: let us "await" a `PyPythonTask` in Rust.
//
// Semantics:
// - consume the `PyPythonTask`,
// - take the inner future,
// - `.await` it on tokio to get `Py<PyAny>`,
// - turn that into `Py<T>`.
pub(crate) trait AwaitPyExt {
async fn await_py<T: PyClass>(self) -> Result<Py<T>, PyErr>;

// For tasks whose future just resolves to (), i.e. no object,
// just "did it work?"
async fn await_unit(self) -> Result<(), PyErr>;
}

#[cfg(test)]
impl AwaitPyExt for PyPythonTask {
async fn await_py<T: PyClass>(mut self) -> Result<Py<T>, PyErr> {
// Take ownership of the inner future.
let fut = self
.take_task()
.expect("PyPythonTask already consumed in await_py");

// Await a Result<Py<PyAny>, PyErr>.
let py_any: Py<PyAny> = fut.await?;

// Convert Py<PyAny> -> Py<T>.
Python::with_gil(|py| {
let bound_any = py_any.bind(py);

// Try extract a Py<T>.
let obj: Py<T> = bound_any
.extract::<Py<T>>()
.expect("spawn() did not return expected Python type");

Ok(obj)
})
}

async fn await_unit(mut self) -> Result<(), PyErr> {
let fut = self
.take_task()
.expect("PyPythonTask already consumed in await_unit");

// Await it. This still gives us a Py<PyAny> because
// Python-side return values are always materialized as 'some
// object'. For "no value" / None, that's just a PyAny(None).
let py_any: Py<PyAny> = fut.await?;

// We don't need to extract anything. Just drop it.
drop(py_any);

Ok(())
}
}
49 changes: 49 additions & 0 deletions monarch_hyperactor/src/testresource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#![cfg(test)]

use std::path::PathBuf;

/// Fetch the named (BUCK) named resource, heuristically falling back on
/// the cargo-built path when possible. Beware! This is not actually a
/// true cargo dependency, so the binaries have to be built independently.
///
/// We should convert these tests to integration tests, so that cargo can
/// also manage the binaries.
pub fn get<S>(name: S) -> PathBuf
where
S: AsRef<str>,
{
let name = name.as_ref().to_owned();
// TODO: actually check if we're running in Buck context or not.
if let Ok(path) = buck_resources::get(name.clone()) {
return path;
}

assert!(
name.starts_with("monarch/monarch_hyperactor/"),
"invalid resource {}: must start with \"monarch/monarch_hyperactor/\"",
name
);

let path: PathBuf = name
.replace(
"monarch/monarch_hyperactor/",
"../target/debug/monarch_hyperactor_test_",
)
.into();

assert!(
path.exists(),
"no cargo-built resource at {}",
path.display()
);

path
}
Loading
Loading