Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions launch/dynamo-run/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,34 @@ async def generate(request):
yield {"id":"1","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop"}],"created":1841762283,"model":"Llama-3.2-1B-Instruct","system_fingerprint":"local","object":"chat.completion.chunk"}
```

Command line arguments are passed to the python engine like this:
```
dynamo-run out=pystr:my_python_engine.py -- -n 42 --custom-arg Orange --yes
```

The python engine receives the arguments in `sys.argv`. The argument list will include some standard ones as well as anything after the `--`.

This input:
```
dynamo-run out=pystr:my_engine.py /opt/models/Llama-3.2-3B-Instruct/ --model-name llama_3.2 --tensor-parallel-size 4 -- -n 1
```

is read like this:
```
async def generate(request):
.. as before ..

if __name__ == "__main__":
print(f"MAIN: {sys.argv}")
```

and produces this output:
```
MAIN: ['my_engine.py', '--model-path', '/opt/models/Llama-3.2-3B-Instruct/', '--model-name', 'llama3.2', '--http-port', '8080', '--tensor-parallel-size', '4', '--base-gpu-id', '0', '--num-nodes', '1', '--node-rank', '0', '-n', '1']
```

This allows quick iteration on the engine setup. Note how the `-n` `1` is included. Flags `--leader-addr` and `--model-config` will also be added if provided to `dynamo-run`.

### Dynamo does the pre-processing

If the Python engine wants to receive and return tokens - the prompt templating and tokenization is already done - run it like this:
Expand Down Expand Up @@ -250,6 +278,8 @@ async def generate(request):
yield {"token_ids":[13]}
```

`pytok` supports the same ways of passing command line arguments as `pystr` - `initialize` or `main` with `sys.argv`.

## trtllm

TensorRT-LLM. Requires `clang` and `libclang-dev`.
Expand Down
50 changes: 46 additions & 4 deletions launch/dynamo-run/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ pub struct Flags {

/// Internal use only.
// Start the python vllm engine sub-process.
#[arg(long)]
#[clap(hide = true, default_value = "false")]
#[arg(long, hide = true, default_value = "false")]
pub internal_vllm_process: bool,

/// Internal use only.
Expand All @@ -104,9 +103,52 @@ pub struct Flags {
/// - the node rank (0 for first host, 1 for second host, etc)
/// - the workers' rank (globally unique)
/// - the GPU to use (locally unique)
#[arg(long)]
#[clap(hide = true, value_parser = parse_sglang_flags)]
#[arg(long, hide = true, value_parser = parse_sglang_flags)]
pub internal_sglang_process: Option<SgLangFlags>,

/// Everything after a `--`.
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
pub last: Vec<String>,
}

impl Flags {
/// Convert the flags back to a command line. Including only the non-null values, but
/// include the defaults. Includes the canonicalized model path and normalized model name.
///
/// Used to pass arguments to python engines via `pystr` and `pytok`.
pub fn as_vec(&self, path: &str, name: &str) -> Vec<String> {
let mut out = vec![
"--model-path".to_string(),
path.to_string(),
"--model-name".to_string(),
name.to_string(),
"--http-port".to_string(),
self.http_port.to_string(),
// Default 1
"--tensor-parallel-size".to_string(),
self.tensor_parallel_size.to_string(),
// Default 0
"--base-gpu-id".to_string(),
self.base_gpu_id.to_string(),
// Default 1
"--num-nodes".to_string(),
self.num_nodes.to_string(),
// Default 0
"--node-rank".to_string(),
self.node_rank.to_string(),
];
if let Some(model_config_path) = self.model_config.as_ref() {
out.push("--model-config".to_string());
out.push(model_config_path.display().to_string());
}
if let Some(leader) = self.leader_addr.as_ref() {
out.push("--leader-addr".to_string());
out.push(leader.to_string());
}
out.extend(self.last.clone());
out
}
}

#[derive(Debug, Clone, Copy)]
Expand Down
10 changes: 7 additions & 3 deletions launch/dynamo-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ pub async fn run(
// Turn relative paths into absolute paths
let model_path = flags
.model_path_pos
.or(flags.model_path_flag)
.clone()
.or(flags.model_path_flag.clone())
.and_then(|p| {
if p.exists() {
p.canonicalize().ok()
Expand All @@ -93,6 +94,7 @@ pub async fn run(
// Serve the model under the name provided, or the name of the GGUF file or HF repo.
let model_name = flags
.model_name
.clone()
.or_else(|| {
model_path
.as_ref()
Expand Down Expand Up @@ -338,8 +340,9 @@ pub async fn run(
let Some(model_name) = model_name else {
anyhow::bail!("Provide model service name as `--model-name <this>`");
};
let py_args = flags.as_vec(&path_str, &model_name);
let p = std::path::PathBuf::from(path_str);
let engine = python::make_string_engine(cancel_token.clone(), &p).await?;
let engine = python::make_string_engine(cancel_token.clone(), &p, py_args).await?;
EngineConfig::StaticFull {
service_name: model_name,
engine,
Expand All @@ -354,8 +357,9 @@ pub async fn run(
let Some(model_name) = model_name else {
unreachable!("If we have a card we must have a model name");
};
let py_args = flags.as_vec(&path_str, &model_name);
let p = std::path::PathBuf::from(path_str);
let engine = python::make_token_engine(cancel_token.clone(), &p).await?;
let engine = python::make_token_engine(cancel_token.clone(), &p, py_args).await?;
EngineConfig::StaticCore {
service_name: model_name.clone(),
engine,
Expand Down
58 changes: 43 additions & 15 deletions lib/llm/src/engines/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::ffi::CStr;
use std::{path::Path, sync::Arc};

use anyhow::Context;
use dynamo_runtime::pipeline::error as pipeline_error;
pub use dynamo_runtime::{
error,
Expand Down Expand Up @@ -43,23 +44,23 @@ const PY_IMPORT: &CStr = cr#"
import importlib.util
import sys

module_name = file_path.split("/")[-1].replace(".py", "")
spec = importlib.util.spec_from_file_location(module_name, file_path)

spec = importlib.util.spec_from_file_location("__main__", file_path)
module = importlib.util.module_from_spec(spec)

sys.modules[module_name] = module
sys.argv = sys_argv
sys.modules["__main__"] = module
spec.loader.exec_module(module)
"#;

/// An engine that takes and returns strings, feeding them to a python written engine
pub async fn make_string_engine(
cancel_token: CancellationToken,
py_file: &Path,
py_args: Vec<String>,
) -> pipeline_error::Result<OpenAIChatCompletionsStreamingEngine> {
pyo3::prepare_freethreaded_python();

let engine = new_engine(cancel_token, py_file).await?;
let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine);
Ok(engine)
}
Expand All @@ -68,10 +69,11 @@ pub async fn make_string_engine(
pub async fn make_token_engine(
cancel_token: CancellationToken,
py_file: &Path,
py_args: Vec<String>,
) -> pipeline_error::Result<ExecutionContext> {
pyo3::prepare_freethreaded_python();

let engine = new_engine(cancel_token, py_file).await?;
let engine = new_engine(cancel_token, py_file, py_args).await?;
let engine: ExecutionContext = Arc::new(engine);
Ok(engine)
}
Expand All @@ -86,13 +88,30 @@ pub struct PythonServerStreamingEngine {
async fn new_engine(
cancel_token: CancellationToken,
py_file: &Path,
py_args: Vec<String>,
) -> anyhow::Result<PythonServerStreamingEngine> {
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::task::spawn_blocking(move || run_asyncio(tx));
let event_loop = rx.await?;

let user_module = python_file_to_module(py_file)?;
let generator = Python::with_gil(|py| user_module.getattr(py, "generate").unwrap());
let user_module =
python_file_to_module(py_file, py_args).with_context(|| py_file.display().to_string())?;
let generator = Python::with_gil(|py| {
/* Leave commented, `initialize` may be needed to match Triton
if let Ok(initialize) = user_module.getattr(py, "initialize") {
initialize
.call1(py, (py_args,))
.inspect_err(|err| {
println!();
err.display(py);
})
.with_context(|| "Failed calling python engine's initialize(args)")?;
};
*/
user_module
.getattr(py, "generate")
.with_context(|| "generate")
})?;
Ok(PythonServerStreamingEngine::new(
cancel_token,
Arc::new(generator),
Expand Down Expand Up @@ -127,16 +146,25 @@ fn run_asyncio(tx: Sender<Arc<PyObject>>) {
});
}

fn python_file_to_module(p: &Path) -> Result<PyObject> {
fn python_file_to_module(p: &Path, mut py_args: Vec<String>) -> Result<PyObject> {
if let Some(filename) = p.file_name() {
py_args.insert(0, filename.to_string_lossy().to_string());
};
let module: PyObject = Python::with_gil(|py| {
let globals = [("file_path", p.display().to_string())]
let py_file_path: PyObject = p.display().to_string().into_pyobject(py).unwrap().into();
let py_sys_argv: PyObject = py_args.into_pyobject(py).unwrap().into();
let globals = [("file_path", py_file_path), ("sys_argv", py_sys_argv)]
.into_py_dict(py)
.unwrap();
.context("into_py_dict")?;
let locals = PyDict::new(py);
py.run(PY_IMPORT, Some(&globals), Some(&locals)).unwrap();
let module = locals.get_item("module").unwrap().unwrap();
module.extract().unwrap()
});
py.run(PY_IMPORT, Some(&globals), Some(&locals))
.context("PY_IMPORT")?;
let module = locals
.get_item("module")
.unwrap()
.context("get module after import")?;
module.extract().context("extract")
})?;
Ok(module)
}

Expand Down
Loading