From 3aecddf1e6beacec60ec86cb76478b2856f31d54 Mon Sep 17 00:00:00 2001 From: Valay Dave Date: Wed, 28 Aug 2024 11:22:51 -0700 Subject: [PATCH] [cli-options-from-step-decorators] Top level options from step decorators. - CLI options injected by step decorators in the Top level CLI - Step decorators exposing an additional hooks in the lifecycle to accept options passed down from top level - top level option injection is only done by step decorators that are statically set in the code --- metaflow/cli.py | 3 + metaflow/decorators.py | 60 +++++++++++++++++++ metaflow/parameters.py | 13 ++++ metaflow/plugins/airflow/airflow.py | 3 + metaflow/plugins/argo/argo_workflows.py | 3 + .../aws/step_functions/step_functions.py | 3 + metaflow/runner/click_api.py | 4 +- metaflow/runtime.py | 4 ++ 8 files changed, 91 insertions(+), 2 deletions(-) diff --git a/metaflow/cli.py b/metaflow/cli.py index 29c6701cc7..4d9cd88ff1 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -843,6 +843,7 @@ def version(obj): @tracing.cli_entrypoint("cli/start") @decorators.add_decorator_options +@decorators.add_step_decorator_options @click.command( cls=click.CommandCollection, sources=[cli] + plugins.get_plugin_cli(), @@ -1009,6 +1010,8 @@ def start( deco_options, ) + decorators._inject_step_decorator_options(ctx.obj.flow, deco_options) + # In the case of run/resume, we will want to apply the TL decospecs # *after* the run decospecs so that they don't take precedence. In other # words, for the same decorator, we want `myflow.py run --with foo` to diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 262102551e..180c003aad 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -207,6 +207,7 @@ def add_decorator_options(cmd): flow_cls = getattr(current_flow, "flow_cls", None) if flow_cls is None: return cmd + for deco in flow_decorators(flow_cls): for option, kwargs in deco.options.items(): if option in seen: @@ -220,6 +221,29 @@ def add_decorator_options(cmd): else: seen[option] = deco.name cmd.params.insert(0, click.Option(("--" + option,), **kwargs)) + + return cmd + + +def add_step_decorator_options(cmd): + step_deco_dict = _get_all_step_decos() + seen = set() + step_deco_names = getattr(current_flow, "unique_step_decos_in_flow", None) + if step_deco_names is None: + return cmd + + for deco_name in step_deco_names: + deco = step_deco_dict[deco_name] + for option, kwargs in deco.cli_options.items(): + if option in seen: + msg = ( + "Step decorator '%s' uses an option '%s' which is also " + "used by another step decorator. " % (deco.name, option) + ) + raise MetaflowInternalError(msg) + else: + seen.add(option) + cmd.params.insert(0, click.Option(("--" + option,), **kwargs)) return cmd @@ -257,6 +281,34 @@ class MyDecorator(StepDecorator): pass them around with every lifecycle call. """ + cli_options = {} + # `cli_options` is similar to the one the flow decorator. It will be used to + # pass options to the step decorator from the cli. + + @classmethod + def step_options_init(cls, flow, options_dict): + """ + Called right after `flow_init` to pass down the options set in the cli. + Since step decorators can inject options via `cli_options`, this callback + helps set these options for statically set decorators since it is called before + the dynamically set decorators are attached (ie. decorators set via `--with`). + + Its a class method to ensure that any decorator even attached via `--with` + (given its statically present in the code too) can access the options set in the cli. + """ + pass + + def get_top_level_options(self): + """ + Return a list of option-value pairs that correspond to top-level + options that should be passed to subprocesses (tasks). The option + names should be a subset of the keys in self.options. + + If the decorator has a non-empty set of options in `self.cli_options`, you + probably want to return the assigned values in this method. + """ + return [] + def step_init( self, flow, graph, step_name, decorators, environment, flow_datastore, logger ): @@ -510,6 +562,14 @@ def _attach_decorators_to_step(step, decospecs): step.decorators.append(deco) +def _inject_step_decorator_options(flow, deco_options): + for step in flow: + for deco in step.decorators: + deco.step_options_init(flow, deco_options) + + return + + def _init_flow_decorators( flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options ): diff --git a/metaflow/parameters.py b/metaflow/parameters.py index eca634e7f6..8e27ce31d8 100644 --- a/metaflow/parameters.py +++ b/metaflow/parameters.py @@ -46,6 +46,16 @@ current_flow = local() +def _figure_step_decos_for_current_step(flow_cls): + deco_set = set() + for attr_name in dir(flow_cls): + attr = getattr(flow_cls, attr_name) + if hasattr(attr, "is_step"): + for deco in attr.decorators: + deco_set.add(deco.name) + return list(deco_set) + + @contextmanager def flow_context(flow_cls): """ @@ -58,6 +68,9 @@ def flow_context(flow_cls): current_flow.flow_cls_stack = getattr(current_flow, "flow_cls_stack", []) current_flow.flow_cls_stack.insert(0, flow_cls) current_flow.flow_cls = current_flow.flow_cls_stack[0] + current_flow.unique_step_decos_in_flow = _figure_step_decos_for_current_step( + flow_cls + ) try: yield finally: diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 0d18f1f9c5..bf15b75cb9 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -542,6 +542,9 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): for deco in flow_decorators(self.flow): top_opts_dict.update(deco.get_top_level_options()) + for decorator in node.decorators: + top_opts_dict.update(deco.get_top_level_options()) + top_opts = list(dict_to_cli_options(top_opts_dict)) top_level = top_opts + [ diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index d27f3be8d6..d631907cfa 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -1454,6 +1454,9 @@ def _container_templates(self): for deco in flow_decorators(self.flow): top_opts_dict.update(deco.get_top_level_options()) + for decorator in node.decorators: + top_opts_dict.update(deco.get_top_level_options()) + top_level = list(dict_to_cli_options(top_opts_dict)) + [ "--quiet", "--metadata=%s" % self.metadata.TYPE, diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index 0534cd2179..54040396b4 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -884,6 +884,9 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): for deco in flow_decorators(self.flow): top_opts_dict.update(deco.get_top_level_options()) + for decorator in node.decorators: + top_opts_dict.update(deco.get_top_level_options()) + top_opts = list(dict_to_cli_options(top_opts_dict)) top_level = top_opts + [ diff --git a/metaflow/runner/click_api.py b/metaflow/runner/click_api.py index 90569c8b4a..fe3b6ee190 100644 --- a/metaflow/runner/click_api.py +++ b/metaflow/runner/click_api.py @@ -32,7 +32,7 @@ UUIDParameterType, ) from metaflow._vendor.typeguard import TypeCheckError, check_type -from metaflow.decorators import add_decorator_options +from metaflow.decorators import add_decorator_options, add_step_decorator_options from metaflow.exception import MetaflowException from metaflow.includefile import FilePathClass from metaflow.parameters import JSONTypeClass, flow_context @@ -186,7 +186,7 @@ def from_cli(cls, flow_file: str, cli_collection: Callable) -> Callable: flow_cls = extract_flow_class_from_file(flow_file) flow_parameters = [p for _, p in flow_cls._get_parameters()] with flow_context(flow_cls) as _: - add_decorator_options(cli_collection) + add_step_decorator_options(add_decorator_options(cli_collection)) class_dict = {"__module__": "metaflow", "_API_NAME": flow_file} command_groups = cli_collection.sources diff --git a/metaflow/runtime.py b/metaflow/runtime.py index d5fbc0b683..e0a8435487 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -1448,6 +1448,10 @@ def __init__(self, task): for deco in flow_decorators(self.task.flow): self.top_level_options.update(deco.get_top_level_options()) + # Extract the deco.get_top_level_options() equivalent for the step decorators. + for deco in self.task.decos: + self.top_level_options.update(deco.get_top_level_options()) + self.commands = ["step"] self.command_args = [self.task.step] self.command_options = {