-
Notifications
You must be signed in to change notification settings - Fork 877
Description
I would like to resolve settings from CLI params, configuration files, and environment variables using Pydantic Settings. Is there a way to use their framework for resolving settings across multiple sources in conjunction with Metaflow?
This hydra example looks to me like a reasonable place to start, with the most salient part being:
@hydra.main(config_name="config", config_path=".", version_base=None)
def benchmark(cfg: DictConfig) -> None:
print("Tagging all runs as", TAG)
dict_conf = OmegaConf.to_container(cfg, resolve=True)
json_conf = json.dumps({"config": dict_conf})
env = os.environ.copy()
env.update({"METAFLOW_FLOW_CONFIG_VALUE": json_conf})
...
with Runner(...).run(...)
Taking this same approach where Pydantic Settings is used to resolve everything before passing a final json config value to Metaflow makes sense. What I am struggling with though is figuring out what the best way would be to incorporate CLI params as well.
I don't want to clobber or interfere with the existing CLIs exposed by FlowSpec, if possible. What I would like to do is something more like this instead:
import os
import sys
import json
from metaflow import FlowSpec, step, Config
from pydantic_settings import BaseSettings
class BasicFlow(FlowSpec):
config = Config("config")
@step
def start(self):
print(f"Processing with config: {self.config}")
assert self.config.foo == "override"
assert self.config.bar == 2
self.next(self.end)
@step
def end(self):
print(f"Flow completed successfully!")
class MyConfig(BaseSettings):
foo: str = "default"
bar: int = 1
if __name__ == "__main__":
# Invocation:
# python basic_flow_example.py foo=override bar=2 -- run
# Note: this main function actually gets invoked for every task in the flow,
# so this logic needs to only modify sys.argv on the first invocation
split_index = sys.argv.index("--") if "--" in sys.argv else None
if split_index:
overrides = sys.argv[1:split_index]
settings = MyConfig(**dict(arg.split('=') for arg in overrides))
os.environ["METAFLOW_FLOW_CONFIG_VALUE"] = json.dumps({"config": settings.model_dump()})
# Rebuild sys.argv without the custom config overrides
sys.argv = [sys.argv[0]] + sys.argv[split_index+1:]
# Launch Flow CLI rather than executing programmatically with Runner
print("Running CLI")
BasicFlow(use_cli=True)
Notably, this is launching the CLI for the flow as it usually would be with BasicFlow(use_cli=True)
rather than Runner
to preserve the whole CLI interface. I would rather do that.
This isn't a particularly robust example (for brevity), but it does seem workable. Might there be a better way to do it? Or other examples like this somewhere?
Here are the logs from running this flow:
% python basic_flow_example.py foo=override bar=2 -- run
Running CLI <--- *** First call ***
2025-08-13 11:51:34.427 Workflow starting (run-id 1755100294426481):
2025-08-13 11:51:34.434 [1755100294426481/start/1 (pid 92209)] Task is starting.
2025-08-13 11:51:34.542 [1755100294426481/start/1 (pid 92209)] Running CLI <--- *** Second call ***
2025-08-13 11:51:34.598 [1755100294426481/start/1 (pid 92209)] Processing with config: {'foo': 'override', 'bar': 2}
2025-08-13 11:51:34.620 [1755100294426481/start/1 (pid 92209)] Task finished successfully.
2025-08-13 11:51:34.623 [1755100294426481/end/2 (pid 92217)] Task is starting.
2025-08-13 11:51:34.730 [1755100294426481/end/2 (pid 92217)] Running CLI <--- *** Third call ***
2025-08-13 11:51:34.784 [1755100294426481/end/2 (pid 92217)] Flow completed successfully!
2025-08-13 11:51:34.808 [1755100294426481/end/2 (pid 92217)] Task finished successfully.
2025-08-13 11:51:34.809 Done!
That the main bock is invoked for every step wasn't something I was expecting, but I can see why the design probably works that way to support kubernetes/batch steps. That certainly makes modifying the behavior of the Flow CLIs a bit more complicated.