Skip to content

Commit

Permalink
Added test, more cleanup
Browse files Browse the repository at this point in the history
Specifically:
  - made config values immutable
  - cleaned up state stored in FlowSpec
  - added a test exercising configs in various places
  • Loading branch information
romain-intel committed Sep 10, 2024
1 parent 5f354b1 commit bcdd987
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 38 deletions.
77 changes: 65 additions & 12 deletions metaflow/config_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from metaflow._vendor import click

from .exception import MetaflowException, MetaflowInternalError

from .parameters import (
DeployTimeField,
Parameter,
Expand Down Expand Up @@ -45,8 +46,11 @@


def dump_config_values(flow: "FlowSpec"):
if flow._user_configs:
return {"user_configs": flow._user_configs}
from .flowspec import _FlowState # Prevent circular import

configs = flow._flow_state.get(_FlowState.CONFIGS)
if configs:
return {"user_configs": configs}
return {}


Expand Down Expand Up @@ -76,10 +80,34 @@ class ConfigValue(collections.abc.Mapping):
def __init__(self, data: Dict[Any, Any]):
self._data = data

for key, value in data.items():
if isinstance(value, dict):
value = ConfigValue(value)
setattr(self, key, value)
def __getattr__(self, key: str) -> Any:
"""
Access an element of this configuration
Parameters
----------
key : str
Element to access
Returns
-------
Any
Element of the configuration
"""
if key == "_data":
# Called during unpickling. Special case to not run into infinite loop
# below.
raise AttributeError(key)

if key in self._data:
return self[key]
raise AttributeError(key)

def __setattr__(self, name: str, value: Any) -> None:
# Prevent configuration modification
if name == "_data":
return super().__setattr__(name, value)
raise TypeError("ConfigValue is immutable")

def __getitem__(self, key: Any) -> Any:
"""
Expand Down Expand Up @@ -209,14 +237,15 @@ def get_config(cls, config_name: str) -> Optional[Dict[Any, Any]]:

def process_configs(self, ctx, param, value):
from .cli import echo_always, echo_dev_null # Prevent circular import
from .flowspec import _FlowState # Prevent circular import

flow_cls = getattr(current_flow, "flow_cls", None)
if flow_cls is None:
# This is an error
raise MetaflowInternalError(
"Config values should be processed for a FlowSpec"
)

flow_cls._flow_state[_FlowState.CONFIGS] = {}
# This function is called by click when processing all the --config options.
# The value passed in is a list of tuples (name, value).
# Click will provide:
Expand Down Expand Up @@ -277,7 +306,7 @@ def process_configs(self, ctx, param, value):
raise click.UsageError(
"Could not find configuration '%s' in INFO file" % val
)
flow_cls._user_configs[name] = read_value
flow_cls._flow_state[_FlowState.CONFIGS][name] = read_value
to_return[name] = ConfigValue(read_value)
else:
if self._parsers[name]:
Expand All @@ -290,7 +319,7 @@ def process_configs(self, ctx, param, value):
"Configuration value for '%s' is not valid JSON" % name
) from e
# TODO: Support YAML
flow_cls._user_configs[name] = read_value
flow_cls._flow_state[_FlowState.CONFIGS][name] = read_value
to_return[name] = ConfigValue(read_value)

if missing_configs.intersection(self._req_configs):
Expand Down Expand Up @@ -331,6 +360,23 @@ def __repr__(self):
ConfigArgType = Union[str, Dict[Any, Any]]


class MultipleTuple(click.Tuple):
# Small wrapper around a click.Tuple to allow the environment variable for
# configurations to be a JSON string. Otherwise the default behavior is splitting
# by whitespace which is totally not what we want
# You can now pass multiple configuration options through an environment variable
# using something like:
# METAFLOW_FLOW_CONFIG='{"config1": "filenameforconfig1.json", "config2": {"key1": "value1"}}'

def split_envvar_value(self, rv):
loaded = json.loads(rv)
return list(
item if isinstance(item, str) else json.dumps(item)
for pair in loaded.items()
for item in pair
)


class DelayEvaluator:
"""
Small wrapper that allows the evaluation of a Config() value in a delayed manner.
Expand All @@ -356,6 +402,8 @@ def __getattr__(self, name):
return self

def __call__(self, ctx=None, deploy_time=False):
from .flowspec import _FlowState # Prevent circular import

# Two additional arguments are only used by DeployTimeField which will call
# this function with those two additional arguments. They are ignored.
flow_cls = getattr(current_flow, "flow_cls", None)
Expand All @@ -372,7 +420,10 @@ def __call__(self, ctx=None, deploy_time=False):
return eval(
self._config_expr,
globals(),
{k: ConfigValue(v) for k, v in flow_cls._user_configs.items()},
{
k: ConfigValue(v)
for k, v in flow_cls._flow_state.get(_FlowState.CONFIGS, {}).items()
},
)


Expand Down Expand Up @@ -443,7 +494,9 @@ class MyFlow(FlowSpec):
"""

def _wrapper(flow_spec: "FlowSpec"):
flow_spec._config_funcs.append(f)
from .flowspec import _FlowState

flow_spec._flow_state.setdefault(_FlowState.CONFIG_FUNCS, []).append(f)
return flow_spec

return _wrapper
Expand Down Expand Up @@ -565,7 +618,7 @@ def config_options(cmd):
["--config", "config_options"],
nargs=2,
multiple=True,
type=click.Tuple([click.Choice(config_seen), PathOrStr()]),
type=MultipleTuple([click.Choice(config_seen), PathOrStr()]),
callback=ConfigInput(required_names, defaults, parsers).process_configs,
help=help_str,
envvar="METAFLOW_FLOW_CONFIG",
Expand Down
50 changes: 27 additions & 23 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import traceback
import reprlib

from enum import Enum
from itertools import islice
from types import FunctionType, MethodType
from typing import TYPE_CHECKING, Any, Callable, Generator, List, Optional, Tuple
Expand Down Expand Up @@ -62,23 +63,26 @@ def __getitem__(self, item):
return item or 0 # item is None for the control task, but it is also split 0


class _FlowState(Enum):
CONFIGS = 1
CONFIG_FUNCS = 2
CACHED_PARAMETERS = 3


class _FlowSpecMeta(type):
def __new__(cls, name, bases, dct):
f = super().__new__(cls, name, bases, dct)
# This makes sure to give _flow_decorators to each
# child class (and not share it with the FlowSpec base
# class). This is important to not make a "global"
# _flow_decorators. Same deal with user configurations
f._flow_decorators = {}
f._user_configs = {}
# We store some state in the flow class itself. This is primarily used to
# attach global state to a flow. It is *not* an actual global because of
# Runner/NBRunner. This is also created here in the meta class to avoid it being
# shared between different children classes.

# We also cache parameter names to avoid having to recompute what is a parameter
# in the dir of a flow
f._cached_parameters = None
# We should move _flow_decorators into this structure as well but keeping it
# out to limit the changes for now.
f._flow_decorators = {}

# Finally attach all functions that need to be evaluated once user configurations
# are available
f._config_funcs = []
# Keys are _FlowState enum values
f._flow_state = {}

return f

Expand Down Expand Up @@ -109,8 +113,8 @@ def start(self):
Tuple[str, ConfigValue]
Iterates over the configurations of the flow
"""
# When configs are parsed, they are loaded in _user_configs
for name, value in cls._user_configs.items():
# When configs are parsed, they are loaded in _flow_state[_FlowState.CONFIGS]
for name, value in cls._flow_state.get(_FlowState.CONFIGS, {}).items():
yield name, ConfigValue(value)

@property
Expand Down Expand Up @@ -149,9 +153,7 @@ class FlowSpec(metaclass=_FlowSpecMeta):
"_cached_input",
"_graph",
"_flow_decorators",
"_user_configs",
"_cached_parameters",
"_config_funcs",
"_flow_state",
"_steps",
"index",
"input",
Expand Down Expand Up @@ -220,7 +222,7 @@ def _process_config_funcs(self, config_options):
current_cls = self.__class__

# Fast path for no user configurations
if not self._config_funcs:
if not self._flow_state.get(_FlowState.CONFIG_FUNCS):
return self

# We need to convert all the user configurations from DelayedEvaluationParameters
Expand All @@ -242,7 +244,7 @@ def _process_config_funcs(self, config_options):

# Run all the functions. They will now be able to access the configuration
# values directly from the class
for func in self._config_funcs:
for func in self._flow_state[_FlowState.CONFIG_FUNCS]:
current_cls = func(current_cls)

# Reset all configs that were already present in the class.
Expand All @@ -253,7 +255,8 @@ def _process_config_funcs(self, config_options):

# We reset cached_parameters on the very off chance that the user added
# more configurations based on the configuration
current_cls._cached_parameters = None
if _FlowState.CACHED_PARAMETERS in current_cls._flow_state:
del current_cls._flow_state[_FlowState.CACHED_PARAMETERS]

# Set the current flow class we are in (the one we just created)
parameters.replace_flow_context(current_cls)
Expand Down Expand Up @@ -323,8 +326,9 @@ def _set_constants(self, graph, kwargs, config_options):

@classmethod
def _get_parameters(cls):
if cls._cached_parameters is not None:
for var in cls._cached_parameters:
cached = cls._flow_state.get(_FlowState.CACHED_PARAMETERS)
if cached is not None:
for var in cached:
yield var, getattr(cls, var)
return
build_list = []
Expand All @@ -338,7 +342,7 @@ def _get_parameters(cls):
if isinstance(val, Parameter):
build_list.append(var)
yield var, val
cls._cached_parameters = build_list
cls._flow_state[_FlowState.CACHED_PARAMETERS] = build_list

def _set_datastore(self, datastore):
self._datastore = datastore
Expand Down
6 changes: 4 additions & 2 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .datastore import TaskDataStoreSet
from .debug import debug
from .decorators import flow_decorators
from .flowspec import _FlowState
from .mflog import mflog, RUNTIME_LOG_SOURCE
from .util import to_unicode, compress_list, unicode_type
from .clone_util import clone_task_helper
Expand Down Expand Up @@ -1468,9 +1469,10 @@ def __init__(self, task):
# We also pass configuration options using the kv.<name> syntax which will cause
# the configuration options to be loaded from the CONFIG file (or local-config-file
# in the case of the local runtime)
if self.task.flow._user_configs:
configs = self.task.flow._flow_state.get(_FlowState.CONFIGS)
if configs:
self.top_level_options["config"] = [
(k, ConfigInput.make_key_name(k)) for k in self.task.flow._user_configs
(k, ConfigInput.make_key_name(k)) for k in configs
]

self.commands = ["step"]
Expand Down
2 changes: 1 addition & 1 deletion test/core/metaflow_test/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _flow_lines(self):
tags.extend(tag.split("(")[0] for tag in step.tags)

yield 0, "# -*- coding: utf-8 -*-"
yield 0, "from metaflow import Config, FlowSpec, step, Parameter, project, IncludeFile, JSONType, current, parallel"
yield 0, "from metaflow import Config, config_expr, eval_config, FlowSpec, step, Parameter, project, IncludeFile, JSONType, current, parallel"
yield 0, "from metaflow_test import assert_equals, assert_equals_metadata, assert_exception, ExpectationFailed, is_resumed, ResumeFromHere, TestRetry, try_to_get_card"
if tags:
yield 0, "from metaflow import %s" % ",".join(tags)
Expand Down
Loading

0 comments on commit bcdd987

Please sign in to comment.