diff --git a/launch/dynamo-run/README.md b/launch/dynamo-run/README.md index 5503832807..4db701039b 100644 --- a/launch/dynamo-run/README.md +++ b/launch/dynamo-run/README.md @@ -211,6 +211,34 @@ async def generate(request): yield {"id":"1","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop"}],"created":1841762283,"model":"Llama-3.2-1B-Instruct","system_fingerprint":"local","object":"chat.completion.chunk"} ``` +Command line arguments are passed to the python engine like this: +``` +dynamo-run out=pystr:my_python_engine.py -- -n 42 --custom-arg Orange --yes +``` + +The python engine receives the arguments in `sys.argv`. The argument list will include some standard ones as well as anything after the `--`. + +This input: +``` +dynamo-run out=pystr:my_engine.py /opt/models/Llama-3.2-3B-Instruct/ --model-name llama_3.2 --tensor-parallel-size 4 -- -n 1 +``` + +is read like this: +``` +async def generate(request): + .. as before .. + +if __name__ == "__main__": + print(f"MAIN: {sys.argv}") +``` + +and produces this output: +``` +MAIN: ['my_engine.py', '--model-path', '/opt/models/Llama-3.2-3B-Instruct/', '--model-name', 'llama3.2', '--http-port', '8080', '--tensor-parallel-size', '4', '--base-gpu-id', '0', '--num-nodes', '1', '--node-rank', '0', '-n', '1'] +``` + +This allows quick iteration on the engine setup. Note how the `-n` `1` is included. Flags `--leader-addr` and `--model-config` will also be added if provided to `dynamo-run`. + ### Dynamo does the pre-processing If the Python engine wants to receive and return tokens - the prompt templating and tokenization is already done - run it like this: @@ -250,6 +278,8 @@ async def generate(request): yield {"token_ids":[13]} ``` +`pytok` supports the same ways of passing command line arguments as `pystr` - `initialize` or `main` with `sys.argv`. + ## trtllm TensorRT-LLM. Requires `clang` and `libclang-dev`. diff --git a/launch/dynamo-run/src/flags.rs b/launch/dynamo-run/src/flags.rs index dcf7863d8b..6a8fd18b7e 100644 --- a/launch/dynamo-run/src/flags.rs +++ b/launch/dynamo-run/src/flags.rs @@ -93,8 +93,7 @@ pub struct Flags { /// Internal use only. // Start the python vllm engine sub-process. - #[arg(long)] - #[clap(hide = true, default_value = "false")] + #[arg(long, hide = true, default_value = "false")] pub internal_vllm_process: bool, /// Internal use only. @@ -104,9 +103,52 @@ pub struct Flags { /// - the node rank (0 for first host, 1 for second host, etc) /// - the workers' rank (globally unique) /// - the GPU to use (locally unique) - #[arg(long)] - #[clap(hide = true, value_parser = parse_sglang_flags)] + #[arg(long, hide = true, value_parser = parse_sglang_flags)] pub internal_sglang_process: Option, + + /// Everything after a `--`. + /// These are the command line arguments to the python engine when using `pystr` or `pytok`. + #[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)] + pub last: Vec, +} + +impl Flags { + /// Convert the flags back to a command line. Including only the non-null values, but + /// include the defaults. Includes the canonicalized model path and normalized model name. + /// + /// Used to pass arguments to python engines via `pystr` and `pytok`. + pub fn as_vec(&self, path: &str, name: &str) -> Vec { + let mut out = vec![ + "--model-path".to_string(), + path.to_string(), + "--model-name".to_string(), + name.to_string(), + "--http-port".to_string(), + self.http_port.to_string(), + // Default 1 + "--tensor-parallel-size".to_string(), + self.tensor_parallel_size.to_string(), + // Default 0 + "--base-gpu-id".to_string(), + self.base_gpu_id.to_string(), + // Default 1 + "--num-nodes".to_string(), + self.num_nodes.to_string(), + // Default 0 + "--node-rank".to_string(), + self.node_rank.to_string(), + ]; + if let Some(model_config_path) = self.model_config.as_ref() { + out.push("--model-config".to_string()); + out.push(model_config_path.display().to_string()); + } + if let Some(leader) = self.leader_addr.as_ref() { + out.push("--leader-addr".to_string()); + out.push(leader.to_string()); + } + out.extend(self.last.clone()); + out + } } #[derive(Debug, Clone, Copy)] diff --git a/launch/dynamo-run/src/lib.rs b/launch/dynamo-run/src/lib.rs index 4270c3092b..9e7218ceb1 100644 --- a/launch/dynamo-run/src/lib.rs +++ b/launch/dynamo-run/src/lib.rs @@ -82,7 +82,8 @@ pub async fn run( // Turn relative paths into absolute paths let model_path = flags .model_path_pos - .or(flags.model_path_flag) + .clone() + .or(flags.model_path_flag.clone()) .and_then(|p| { if p.exists() { p.canonicalize().ok() @@ -93,6 +94,7 @@ pub async fn run( // Serve the model under the name provided, or the name of the GGUF file or HF repo. let model_name = flags .model_name + .clone() .or_else(|| { model_path .as_ref() @@ -338,8 +340,9 @@ pub async fn run( let Some(model_name) = model_name else { anyhow::bail!("Provide model service name as `--model-name `"); }; + let py_args = flags.as_vec(&path_str, &model_name); let p = std::path::PathBuf::from(path_str); - let engine = python::make_string_engine(cancel_token.clone(), &p).await?; + let engine = python::make_string_engine(cancel_token.clone(), &p, py_args).await?; EngineConfig::StaticFull { service_name: model_name, engine, @@ -354,8 +357,9 @@ pub async fn run( let Some(model_name) = model_name else { unreachable!("If we have a card we must have a model name"); }; + let py_args = flags.as_vec(&path_str, &model_name); let p = std::path::PathBuf::from(path_str); - let engine = python::make_token_engine(cancel_token.clone(), &p).await?; + let engine = python::make_token_engine(cancel_token.clone(), &p, py_args).await?; EngineConfig::StaticCore { service_name: model_name.clone(), engine, diff --git a/lib/llm/src/engines/python.rs b/lib/llm/src/engines/python.rs index acc35647e5..4d7df7d966 100644 --- a/lib/llm/src/engines/python.rs +++ b/lib/llm/src/engines/python.rs @@ -16,6 +16,7 @@ use std::ffi::CStr; use std::{path::Path, sync::Arc}; +use anyhow::Context; use dynamo_runtime::pipeline::error as pipeline_error; pub use dynamo_runtime::{ error, @@ -43,12 +44,11 @@ const PY_IMPORT: &CStr = cr#" import importlib.util import sys -module_name = file_path.split("/")[-1].replace(".py", "") -spec = importlib.util.spec_from_file_location(module_name, file_path) - +spec = importlib.util.spec_from_file_location("__main__", file_path) module = importlib.util.module_from_spec(spec) -sys.modules[module_name] = module +sys.argv = sys_argv +sys.modules["__main__"] = module spec.loader.exec_module(module) "#; @@ -56,10 +56,11 @@ spec.loader.exec_module(module) pub async fn make_string_engine( cancel_token: CancellationToken, py_file: &Path, + py_args: Vec, ) -> pipeline_error::Result { pyo3::prepare_freethreaded_python(); - let engine = new_engine(cancel_token, py_file).await?; + let engine = new_engine(cancel_token, py_file, py_args).await?; let engine: OpenAIChatCompletionsStreamingEngine = Arc::new(engine); Ok(engine) } @@ -68,10 +69,11 @@ pub async fn make_string_engine( pub async fn make_token_engine( cancel_token: CancellationToken, py_file: &Path, + py_args: Vec, ) -> pipeline_error::Result { pyo3::prepare_freethreaded_python(); - let engine = new_engine(cancel_token, py_file).await?; + let engine = new_engine(cancel_token, py_file, py_args).await?; let engine: ExecutionContext = Arc::new(engine); Ok(engine) } @@ -86,13 +88,30 @@ pub struct PythonServerStreamingEngine { async fn new_engine( cancel_token: CancellationToken, py_file: &Path, + py_args: Vec, ) -> anyhow::Result { let (tx, rx) = tokio::sync::oneshot::channel(); tokio::task::spawn_blocking(move || run_asyncio(tx)); let event_loop = rx.await?; - let user_module = python_file_to_module(py_file)?; - let generator = Python::with_gil(|py| user_module.getattr(py, "generate").unwrap()); + let user_module = + python_file_to_module(py_file, py_args).with_context(|| py_file.display().to_string())?; + let generator = Python::with_gil(|py| { + /* Leave commented, `initialize` may be needed to match Triton + if let Ok(initialize) = user_module.getattr(py, "initialize") { + initialize + .call1(py, (py_args,)) + .inspect_err(|err| { + println!(); + err.display(py); + }) + .with_context(|| "Failed calling python engine's initialize(args)")?; + }; + */ + user_module + .getattr(py, "generate") + .with_context(|| "generate") + })?; Ok(PythonServerStreamingEngine::new( cancel_token, Arc::new(generator), @@ -127,16 +146,25 @@ fn run_asyncio(tx: Sender>) { }); } -fn python_file_to_module(p: &Path) -> Result { +fn python_file_to_module(p: &Path, mut py_args: Vec) -> Result { + if let Some(filename) = p.file_name() { + py_args.insert(0, filename.to_string_lossy().to_string()); + }; let module: PyObject = Python::with_gil(|py| { - let globals = [("file_path", p.display().to_string())] + let py_file_path: PyObject = p.display().to_string().into_pyobject(py).unwrap().into(); + let py_sys_argv: PyObject = py_args.into_pyobject(py).unwrap().into(); + let globals = [("file_path", py_file_path), ("sys_argv", py_sys_argv)] .into_py_dict(py) - .unwrap(); + .context("into_py_dict")?; let locals = PyDict::new(py); - py.run(PY_IMPORT, Some(&globals), Some(&locals)).unwrap(); - let module = locals.get_item("module").unwrap().unwrap(); - module.extract().unwrap() - }); + py.run(PY_IMPORT, Some(&globals), Some(&locals)) + .context("PY_IMPORT")?; + let module = locals + .get_item("module") + .unwrap() + .context("get module after import")?; + module.extract().context("extract") + })?; Ok(module) }