Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core-change] Top level options from step decorators. #2002

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ def version(obj):

@tracing.cli_entrypoint("cli/start")
@decorators.add_decorator_options
@decorators.add_step_decorator_options
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top level CLI Options added for only statically set decorators in code.

@click.command(
cls=click.CommandCollection,
sources=[cli] + plugins.get_plugin_cli(),
Expand Down Expand Up @@ -1009,6 +1010,8 @@ def start(
deco_options,
)

decorators._inject_step_decorator_options(ctx.obj.flow, deco_options)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: fixme : give better name


# In the case of run/resume, we will want to apply the TL decospecs
# *after* the run decospecs so that they don't take precedence. In other
# words, for the same decorator, we want `myflow.py run --with foo` to
Expand Down
60 changes: 60 additions & 0 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def add_decorator_options(cmd):
flow_cls = getattr(current_flow, "flow_cls", None)
if flow_cls is None:
return cmd

for deco in flow_decorators(flow_cls):
for option, kwargs in deco.options.items():
if option in seen:
Expand All @@ -220,6 +221,29 @@ def add_decorator_options(cmd):
else:
seen[option] = deco.name
cmd.params.insert(0, click.Option(("--" + option,), **kwargs))

return cmd


def add_step_decorator_options(cmd):
step_deco_dict = _get_all_step_decos()
seen = set()
step_deco_names = getattr(current_flow, "unique_step_decos_in_flow", None)
if step_deco_names is None:
return cmd

for deco_name in step_deco_names:
deco = step_deco_dict[deco_name]
for option, kwargs in deco.cli_options.items():
if option in seen:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would need to be global (across step and flow level decorators)

msg = (
"Step decorator '%s' uses an option '%s' which is also "
"used by another step decorator. " % (deco.name, option)
)
raise MetaflowInternalError(msg)
else:
seen.add(option)
cmd.params.insert(0, click.Option(("--" + option,), **kwargs))
return cmd


Expand Down Expand Up @@ -257,6 +281,34 @@ class MyDecorator(StepDecorator):
pass them around with every lifecycle call.
"""

cli_options = {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

named it cli_options because options seemed a like a fairly common attributes that user defined step decorators could also have

# `cli_options` is similar to the one the flow decorator. It will be used to
# pass options to the step decorator from the cli.

@classmethod
def step_options_init(cls, flow, options_dict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be combined with the config thing (ie: we probably don't need to add two new hooks)

"""
Called right after `flow_init` to pass down the options set in the cli.
Since step decorators can inject options via `cli_options`, this callback
helps set these options for statically set decorators since it is called before
the dynamically set decorators are attached (ie. decorators set via `--with`).

Its a class method to ensure that any decorator even attached via `--with`
(given its statically present in the code too) can access the options set in the cli.
"""
pass

def get_top_level_options(self):
"""
Return a list of option-value pairs that correspond to top-level
options that should be passed to subprocesses (tasks). The option
names should be a subset of the keys in self.options.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: fix me


If the decorator has a non-empty set of options in `self.cli_options`, you
probably want to return the assigned values in this method.
"""
return []

def step_init(
self, flow, graph, step_name, decorators, environment, flow_datastore, logger
):
Expand Down Expand Up @@ -510,6 +562,14 @@ def _attach_decorators_to_step(step, decospecs):
step.decorators.append(deco)


def _inject_step_decorator_options(flow, deco_options):
for step in flow:
for deco in step.decorators:
deco.step_options_init(flow, deco_options)

return


def _init_flow_decorators(
flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options
):
Expand Down
13 changes: 13 additions & 0 deletions metaflow/parameters.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed all of this so that click can set step decorator based Options in cli.py

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
current_flow = local()


def _figure_step_decos_for_current_step(flow_cls):
deco_set = set()
for attr_name in dir(flow_cls):
attr = getattr(flow_cls, attr_name)
if hasattr(attr, "is_step"):
for deco in attr.decorators:
deco_set.add(deco.name)
return list(deco_set)


@contextmanager
def flow_context(flow_cls):
"""
Expand All @@ -58,6 +68,9 @@ def flow_context(flow_cls):
current_flow.flow_cls_stack = getattr(current_flow, "flow_cls_stack", [])
current_flow.flow_cls_stack.insert(0, flow_cls)
current_flow.flow_cls = current_flow.flow_cls_stack[0]
current_flow.unique_step_decos_in_flow = _figure_step_decos_for_current_step(
flow_cls
)
try:
yield
finally:
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())

for decorator in node.decorators:
top_opts_dict.update(deco.get_top_level_options())

top_opts = list(dict_to_cli_options(top_opts_dict))

top_level = top_opts + [
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,9 @@ def _container_templates(self):
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())

for decorator in node.decorators:
top_opts_dict.update(deco.get_top_level_options())

top_level = list(dict_to_cli_options(top_opts_dict)) + [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,9 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())

for decorator in node.decorators:
top_opts_dict.update(deco.get_top_level_options())

top_opts = list(dict_to_cli_options(top_opts_dict))

top_level = top_opts + [
Expand Down
4 changes: 2 additions & 2 deletions metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
UUIDParameterType,
)
from metaflow._vendor.typeguard import TypeCheckError, check_type
from metaflow.decorators import add_decorator_options
from metaflow.decorators import add_decorator_options, add_step_decorator_options
from metaflow.exception import MetaflowException
from metaflow.includefile import FilePathClass
from metaflow.parameters import JSONTypeClass, flow_context
Expand Down Expand Up @@ -186,7 +186,7 @@ def from_cli(cls, flow_file: str, cli_collection: Callable) -> Callable:
flow_cls = extract_flow_class_from_file(flow_file)
flow_parameters = [p for _, p in flow_cls._get_parameters()]
with flow_context(flow_cls) as _:
add_decorator_options(cli_collection)
add_step_decorator_options(add_decorator_options(cli_collection))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this here because i assumed that low level API will also need it .


class_dict = {"__module__": "metaflow", "_API_NAME": flow_file}
command_groups = cli_collection.sources
Expand Down
4 changes: 4 additions & 0 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,10 @@ def __init__(self, task):
for deco in flow_decorators(self.task.flow):
self.top_level_options.update(deco.get_top_level_options())

# Extract the deco.get_top_level_options() equivalent for the step decorators.
for deco in self.task.decos:
self.top_level_options.update(deco.get_top_level_options())

self.commands = ["step"]
self.command_args = [self.task.step]
self.command_options = {
Expand Down
Loading