Skip to content

Commit

Permalink
More WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
romain-intel committed Aug 21, 2024
1 parent 3a037a4 commit b03a453
Show file tree
Hide file tree
Showing 14 changed files with 590 additions and 198 deletions.
2 changes: 2 additions & 0 deletions metaflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class and related decorators.

from .parameters import Parameter, JSONTypeClass

from .user_configs import Config, FlowConfig, config_expr, eval_config

JSONType = JSONTypeClass()

# data layer
Expand Down
42 changes: 37 additions & 5 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import json
import os
import sys
import traceback
Expand Down Expand Up @@ -46,6 +47,7 @@
resolve_identity,
write_latest_run_id,
)
from .user_configs import LocalFileInput

ERASE_TO_EOL = "\033[K"
HIGHLIGHT = "red"
Expand Down Expand Up @@ -522,7 +524,7 @@ def init(obj, run_id=None, task_id=None, tags=None, **kwargs):
obj.monitor,
run_id=run_id,
)
obj.flow._set_constants(obj.graph, kwargs)
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
runtime.persist_constants(task_id=task_id)


Expand Down Expand Up @@ -771,7 +773,7 @@ def run(
write_latest_run_id(obj, runtime.run_id)
write_file(run_id_file, runtime.run_id)

obj.flow._set_constants(obj.graph, kwargs)
obj.flow._set_constants(obj.graph, kwargs, obj.config_options)
runtime.print_workflow_info()
runtime.persist_constants()
write_file(
Expand All @@ -783,6 +785,7 @@ def run(
"/".join((obj.flow.name, runtime.run_id)),
),
)

runtime.execute()


Expand Down Expand Up @@ -842,7 +845,7 @@ def version(obj):


@tracing.cli_entrypoint("cli/start")
@decorators.add_decorator_options
@decorators.add_decorator_and_config_options
@click.command(
cls=click.CommandCollection,
sources=[cli] + plugins.get_plugin_cli(),
Expand Down Expand Up @@ -910,6 +913,15 @@ def version(obj):
type=click.Choice(MONITOR_SIDECARS),
help="Monitoring backend type",
)
@click.option(
"--local-info-file",
type=LocalFileInput(exists=True, readable=True, dir_okay=False, resolve_path=True),
required=False,
default=None,
help="A filename containing a subset of the INFO file. Internal use only.",
hidden=True,
is_eager=True,
)
@click.pass_context
def start(
ctx,
Expand All @@ -923,7 +935,8 @@ def start(
pylint=None,
event_logger=None,
monitor=None,
**deco_options
local_info_file=None,
**deco_and_config_options
):
global echo
if quiet:
Expand All @@ -940,11 +953,17 @@ def start(
echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False)
echo(" for *%s*" % resolve_identity(), fg="magenta")

# At this point, we are able to resolve the user-configuration options so we can
# process all those decorators that the user added that will modify the flow based
# on those configurations. It is important to do this as early as possible since it
# actually modifies the flow itself
ctx.obj.flow = ctx.obj.flow._process_config_funcs(deco_and_config_options)

cli_args._set_top_kwargs(ctx.params)
ctx.obj.echo = echo
ctx.obj.echo_always = echo_always
ctx.obj.is_quiet = quiet
ctx.obj.graph = FlowGraph(ctx.obj.flow.__class__)
ctx.obj.graph = ctx.obj.flow._graph
ctx.obj.logger = logger
ctx.obj.check = _check
ctx.obj.pylint = pylint
Expand Down Expand Up @@ -996,6 +1015,19 @@ def start(
ctx.obj.monitor,
)

ctx.obj.config_options = {
k: v
for k, v in deco_and_config_options.items()
if k in ctx.command.config_options
}
deco_options = {
k: v
for k, v in deco_and_config_options.items()
if k not in ctx.command.config_options
}

decorators._resolve_configs(ctx.obj.flow)

# It is important to initialize flow decorators early as some of the
# things they provide may be used by some of the objects initialized after.
decorators._init_flow_decorators(
Expand Down
62 changes: 60 additions & 2 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)

from .parameters import current_flow
from .user_configs import DelayEvaluator

from metaflow._vendor import click

Expand Down Expand Up @@ -123,6 +124,30 @@ def __init__(self, attributes=None, statically_defined=False):
else:
raise InvalidDecoratorAttribute(self.name, k, self.defaults)

def resolve_configs(self):
"""
Resolve any configuration options that may be set in the decorator's attributes
"""

def _resolve_delayed_evaluator(v):
if isinstance(v, DelayEvaluator):
return v()
if isinstance(v, dict):
return {
_resolve_delayed_evaluator(k): _resolve_delayed_evaluator(v)
for k, v in v.items()
}
if isinstance(v, list):
return [_resolve_delayed_evaluator(x) for x in v]
if isinstance(v, tuple):
return tuple(_resolve_delayed_evaluator(x) for x in v)
if isinstance(v, set):
return {_resolve_delayed_evaluator(x) for x in v}
return v

for k, v in self.attributes.items():
self.attributes[k] = _resolve_delayed_evaluator(v)

@classmethod
def _parse_decorator_spec(cls, deco_spec):
if len(deco_spec) == 0:
Expand Down Expand Up @@ -202,11 +227,28 @@ def get_top_level_options(self):


# compare this to parameters.add_custom_parameters
def add_decorator_options(cmd):
seen = {}
def add_decorator_and_config_options(cmd):
config_seen = {}
flow_cls = getattr(current_flow, "flow_cls", None)
if flow_cls is None:
return cmd

parameters = [p for _, p in flow_cls._get_parameters() if p.IS_FLOW_PARAMETER]
# Add configuration options
for arg in parameters[::-1]:
kwargs = arg.option_kwargs(False)
if arg.name in config_seen:
msg = (
"Multiple configurations use the same name '%s'. Please change the "
"names of some of your configurations" % arg.name
)
raise MetaflowException(msg)
config_seen[arg.name] = arg
cmd.params.insert(0, click.Option(("--" + arg.name,), **kwargs))

cmd.config_options = set(config_seen.keys())
seen = {}
# Add decorator options
for deco in flow_decorators(flow_cls):
for option, kwargs in deco.options.items():
if option in seen:
Expand All @@ -217,6 +259,12 @@ def add_decorator_options(cmd):
% (deco.name, option, seen[option])
)
raise MetaflowInternalError(msg)
elif option in config_seen:
msg = (
"Flow decorator '%s' uses an option '%s' which is also "
"used by a configuration. Please change the name of the "
"configuration" % (deco.name, option)
)
else:
seen[option] = deco.name
cmd.params.insert(0, click.Option(("--" + option,), **kwargs))
Expand Down Expand Up @@ -510,6 +558,16 @@ def _attach_decorators_to_step(step, decospecs):
step.decorators.append(deco)


def _resolve_configs(flow):
# We get the datastore for the _parameters step which can contain
for decorators in flow._flow_decorators.values():
for deco in decorators:
deco.resolve_configs()
for step in flow:
for deco in step.decorators:
deco.resolve_configs()


def _init_flow_decorators(
flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options
):
Expand Down
Loading

0 comments on commit b03a453

Please sign in to comment.