diff --git a/src/orion/core/io/experiment_builder.py b/src/orion/core/io/experiment_builder.py index f990a9d73..1e83dd51d 100644 --- a/src/orion/core/io/experiment_builder.py +++ b/src/orion/core/io/experiment_builder.py @@ -99,7 +99,7 @@ NoNameError, RaceCondition, ) -from orion.core.worker.experiment import Experiment +from orion.core.worker.experiment import Experiment, Mode from orion.core.worker.primary_algo import create_algo from orion.storage.base import get_storage, setup_storage @@ -312,7 +312,7 @@ def build_view(name, version=None): return load(name, version=version, mode="r") -def load(name, version=None, mode="r"): +def load(name: str, version: int | None = None, mode: Mode = "r"): """Load experiment from database An experiment view provides all reading operations of standard experiment but prevents the @@ -351,7 +351,9 @@ def load(name, version=None, mode="r"): return create_experiment(mode=mode, **db_config) -def create_experiment(name, version, mode, space, **kwargs): +def create_experiment( + name: str, version: int, mode: Mode, space: Space | dict[str, str], **kwargs +) -> Experiment: """Instantiate the experiment and its attribute objects All unspecified arguments will be replaced by system's defaults (orion.core.config.*). @@ -382,36 +384,39 @@ def create_experiment(name, version, mode, space, **kwargs): Configuration of the storage backend. """ - experiment = Experiment(name=name, version=version, mode=mode) - experiment._id = kwargs.get("_id", None) # pylint:disable=protected-access - experiment.max_trials = kwargs.get( - "max_trials", orion.core.config.experiment.max_trials - ) - experiment.max_broken = kwargs.get( - "max_broken", orion.core.config.experiment.max_broken - ) - experiment.space = _instantiate_space(space) - experiment.algorithms = _instantiate_algo( - experiment.space, - experiment.max_trials, - kwargs.get("algorithms"), + space = _instantiate_space(space) + _id = kwargs.get("_id", None) + max_trials = kwargs.pop("max_trials", orion.core.config.experiment.max_trials) + max_broken = kwargs.pop("max_broken", orion.core.config.experiment.max_broken) + working_dir = kwargs.pop("working_dir", orion.core.config.experiment.working_dir) + algo_config = kwargs.pop("algorithms", None) + algorithms = _instantiate_algo( + space=space, + max_trials=max_trials, + config=algo_config, ignore_unavailable=mode != "x", ) - # TODO: Remove for v0.4 - _instantiate_strategy(kwargs.get("producer", {}).get("strategy")) - experiment.working_dir = kwargs.get( - "working_dir", orion.core.config.experiment.working_dir - ) - experiment.metadata = kwargs.get( - "metadata", {"user": kwargs.get("user", getpass.getuser())} - ) - experiment.refers = kwargs.get( + metadata = kwargs.pop("metadata", {"user": kwargs.pop("user", getpass.getuser())}) + refers: dict = kwargs.pop( "refers", {"parent_id": None, "root_id": None, "adapter": []} ) - experiment.refers["adapter"] = _instantiate_adapters( - experiment.refers.get("adapter", []) + refers["adapter"] = _instantiate_adapters(refers.get("adapter", [])) + # TODO: Remove for v0.4 + strategy_config: dict | None = kwargs.pop("producer", {}).get("strategy") + _instantiate_strategy(strategy_config) + experiment = Experiment( + name=name, + version=version, + mode=mode, + space=space, + _id=_id, + max_trials=max_trials, + max_broken=max_broken, + algorithms=algorithms, + working_dir=working_dir, + metadata=metadata, + refers=refers, ) - log.debug( "Created experiment with config:\n%s", pprint.pformat(experiment.configuration) ) diff --git a/src/orion/core/worker/experiment.py b/src/orion/core/worker/experiment.py index 07ffd1973..769dc4739 100644 --- a/src/orion/core/worker/experiment.py +++ b/src/orion/core/worker/experiment.py @@ -6,6 +6,8 @@ Manage history of trials corresponding to a black box process. """ +from __future__ import annotations + import contextlib import copy import datetime @@ -14,16 +16,21 @@ from dataclasses import dataclass, field import pandas +from typing_extensions import Literal +from orion.algo.base import BaseAlgorithm +from orion.algo.space import Space from orion.core.evc.adapters import BaseAdapter from orion.core.evc.experiment import ExperimentNode from orion.core.io.database import DuplicateKeyError +from orion.core.io.space_builder import SpaceBuilder from orion.core.utils.exceptions import UnsupportedOperation from orion.core.utils.flatten import flatten from orion.core.utils.singleton import update_singletons from orion.storage.base import FailedUpdate, get_storage log = logging.getLogger(__name__) +Mode = Literal["r", "w", "x"] @dataclass @@ -133,22 +140,36 @@ class Experiment: ) non_branching_attrs = ("max_trials", "max_broken") - def __init__(self, name, version=None, mode="r"): - self._id = None + def __init__( + self, + name: str, + space: Space | dict[str, str], + version: int | None = 1, + mode: Mode = "r", + _id: str | int | None = None, + max_trials: int | None = None, + max_broken: int | None = None, + algorithms: BaseAlgorithm | None = None, + working_dir: str | None = None, + metadata: dict | None = None, + refers: dict | None = None, + ): + self._id = _id self.name = name + self.space: Space = ( + space if isinstance(space, Space) else SpaceBuilder().build(space) + ) self.version = version if version else 1 self._mode = mode - self._node = None - self.refers = {} - self.metadata = {} - self.max_trials = None - self.max_broken = None - self.space = None - self.algorithms = None - self.working_dir = None + self.refers = refers or {} + self.metadata = metadata or {} + self.max_trials = max_trials + self.max_broken = max_broken - self._storage = get_storage() + self.algorithms = algorithms + self.working_dir = working_dir + self._storage = get_storage() self._node = ExperimentNode(self.name, self.version, experiment=self) def _check_if_writable(self): @@ -386,7 +407,9 @@ def register_trial(self, trial, status="new"): self._storage.register_trial(trial) @contextlib.contextmanager - def acquire_algorithm_lock(self, timeout=60, retry_interval=1): + def acquire_algorithm_lock( + self, timeout: int | float = 60, retry_interval: int | float = 1 + ): """Acquire lock on algorithm This method should be called using a ``with``-clause. diff --git a/tests/unittests/core/worker/test_experiment.py b/tests/unittests/core/worker/test_experiment.py index 788680460..1efda3af4 100644 --- a/tests/unittests/core/worker/test_experiment.py +++ b/tests/unittests/core/worker/test_experiment.py @@ -16,9 +16,10 @@ import orion.core import orion.core.utils.backward as backward import orion.core.worker.experiment +from orion.algo.space import Space from orion.core.io.space_builder import SpaceBuilder from orion.core.utils.exceptions import UnsupportedOperation -from orion.core.worker.experiment import Experiment +from orion.core.worker.experiment import Experiment, Mode from orion.core.worker.primary_algo import create_algo from orion.core.worker.trial import Trial from orion.storage.base import LockedAlgorithmState, get_storage @@ -180,20 +181,20 @@ class TestReserveTrial: """Calls to interface `Experiment.reserve_trial`.""" @pytest.mark.usefixtures("setup_pickleddb_database") - def test_reserve_none(self): + def test_reserve_none(self, space: Space): """Find nothing, return None.""" with OrionState(experiments=[], trials=[]): - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) trial = exp.reserve_trial() assert trial is None - def test_reserve_success(self, random_dt): + def test_reserve_success(self, random_dt, space: Space): """Successfully find new trials in db and reserve the first one""" storage_config = {"type": "legacy", "database": {"type": "EphemeralDB"}} with OrionState( trials=generate_trials(["new", "reserved"]), storage=storage_config ) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] trial = exp.reserve_trial() @@ -205,17 +206,17 @@ def test_reserve_success(self, random_dt): assert trial.to_dict() == cfg.trials[1] - def test_reserve_when_exhausted(self): + def test_reserve_when_exhausted(self, space: Space): """Return None once all the trials have been allocated""" statuses = ["new", "reserved", "interrupted", "completed", "broken"] with OrionState(trials=generate_trials(statuses)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] assert exp.reserve_trial() is not None assert exp.reserve_trial() is not None assert exp.reserve_trial() is None - def test_fix_lost_trials(self): + def test_fix_lost_trials(self, space: Space): """Test that a running trial with an old heartbeat is set to interrupted.""" trial = copy.deepcopy(base_trial) trial["status"] = "reserved" @@ -223,14 +224,14 @@ def test_fix_lost_trials(self): seconds=60 * 10 ) with OrionState(trials=[trial]) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] assert len(exp.fetch_trials_by_status("reserved")) == 1 exp.fix_lost_trials() assert len(exp.fetch_trials_by_status("reserved")) == 0 - def test_fix_only_lost_trials(self): + def test_fix_only_lost_trials(self, space: Space): """Test that an old trial is set to interrupted but not a recent one.""" lost_trial, running_trial = generate_trials(["reserved"] * 2) lost_trial["heartbeat"] = datetime.datetime.utcnow() - datetime.timedelta( @@ -239,7 +240,7 @@ def test_fix_only_lost_trials(self): running_trial["heartbeat"] = datetime.datetime.utcnow() with OrionState(trials=[lost_trial, running_trial]) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] assert len(exp.fetch_trials_by_status("reserved")) == 2 @@ -254,7 +255,7 @@ def test_fix_only_lost_trials(self): assert len(failedover_trials) == 1 assert failedover_trials[0].to_dict()["params"] == lost_trial["params"] - def test_fix_lost_trials_race_condition(self, monkeypatch, caplog): + def test_fix_lost_trials_race_condition(self, monkeypatch, caplog, space: Space): """Test that a lost trial fixed by a concurrent process does not cause error.""" trial = copy.deepcopy(base_trial) trial["status"] = "interrupted" @@ -262,7 +263,7 @@ def test_fix_lost_trials_race_condition(self, monkeypatch, caplog): seconds=60 * 10 ) with OrionState(trials=[trial]) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] assert len(exp.fetch_trials_by_status("interrupted")) == 1 @@ -291,7 +292,7 @@ def fetch_lost_trials(self, query): assert len(exp.fetch_trials_by_status("interrupted")) == 1 assert len(exp.fetch_trials_by_status("reserved")) == 0 - def test_fix_lost_trials_configurable_hb(self): + def test_fix_lost_trials_configurable_hb(self, space: Space): """Test that heartbeat is correctly being configured.""" trial = copy.deepcopy(base_trial) trial["status"] = "reserved" @@ -299,7 +300,7 @@ def test_fix_lost_trials_configurable_hb(self): seconds=60 * 2 ) with OrionState(trials=[trial]) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] assert len(exp.fetch_trials_by_status("reserved")) == 1 @@ -318,9 +319,11 @@ def test_fix_lost_trials_configurable_hb(self): class TestAcquireAlgorithmLock: - def test_acquire_algorithm_lock_successful(self, new_config, algorithm): + def test_acquire_algorithm_lock_successful( + self, new_config, algorithm, space: Space + ): with OrionState(experiments=[new_config]) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = 0 exp.algorithms = algorithm @@ -343,9 +346,11 @@ def test_acquire_algorithm_lock_successful(self, new_config, algorithm): with exp.acquire_algorithm_lock(timeout=0.2, retry_interval=0.1): assert algorithm.state_dict == new_state_dict - def test_acquire_algorithm_lock_with_different_config(self, new_config, algorithm): + def test_acquire_algorithm_lock_with_different_config( + self, new_config, algorithm, space: Space + ): with OrionState(experiments=[new_config]) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = 0 algorithm_original_config = algorithm.configuration exp.algorithms = algorithm @@ -360,9 +365,11 @@ def test_acquire_algorithm_lock_with_different_config(self, new_config, algorith with exp.acquire_algorithm_lock(timeout=0.2, retry_interval=0.1): pass - def test_acquire_algorithm_lock_timeout(self, new_config, algorithm, mocker): + def test_acquire_algorithm_lock_timeout( + self, new_config, algorithm, mocker, space: Space + ): with OrionState(experiments=[new_config]) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = 0 exp.algorithms = algorithm @@ -378,10 +385,10 @@ def test_acquire_algorithm_lock_timeout(self, new_config, algorithm, mocker): ) -def test_update_completed_trial(random_dt): +def test_update_completed_trial(random_dt, space: Space): """Successfully push a completed trial into database.""" with OrionState(trials=generate_trials(["new"])) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] trial = exp.reserve_trial() @@ -408,10 +415,10 @@ def test_update_completed_trial(random_dt): @pytest.mark.usefixtures("with_user_tsirif") -def test_register_trials(tmp_path, random_dt): +def test_register_trials(tmp_path, random_dt, space: Space): """Register a list of newly proposed trials/parameters.""" with OrionState(): - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = 0 exp.working_dir = tmp_path @@ -437,11 +444,10 @@ def test_register_trials(tmp_path, random_dt): class TestToPandas: """Test suite for ``Experiment.to_pandas``""" - def test_empty(self, space): + def test_empty(self, space: Space): """Test panda frame creation when there is no trials""" with OrionState(): - exp = Experiment("supernaekei", mode="x") - exp.space = space + exp = Experiment("supernaekei", mode="x", space=space) assert exp.to_pandas().shape == (0, 8) assert list(exp.to_pandas().columns) == [ "id", @@ -454,14 +460,13 @@ def test_empty(self, space): "/index", ] - def test_data(self, space): + def test_data(self, space: Space): """Verify the data in the panda frame is coherent with database""" with OrionState( trials=generate_trials(["new", "reserved", "completed"]) ) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] - exp.space = space df = exp.to_pandas() assert df.shape == (3, 8) assert list(df["id"]) == [trial["id"] for trial in cfg.trials] @@ -480,17 +485,17 @@ def test_data(self, space): assert list(df["/index"]) == [1, 0, 2] -def test_fetch_all_trials(): +def test_fetch_all_trials(space: Space): """Fetch a list of all trials""" with OrionState(trials=generate_trials(["new", "reserved", "completed"])) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] trials = list(map(lambda trial: trial.to_dict(), exp.fetch_trials({}))) assert trials == cfg.trials -def test_fetch_pending_trials(): +def test_fetch_pending_trials(space: Space): """Fetch a list of the trials that are pending trials.status in ['new', 'interrupted', 'suspended'] @@ -498,7 +503,7 @@ def test_fetch_pending_trials(): pending_stati = ["new", "interrupted", "suspended"] statuses = pending_stati + ["completed", "broken", "reserved"] with OrionState(trials=generate_trials(statuses)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] trials = exp.fetch_pending_trials() @@ -506,7 +511,7 @@ def test_fetch_pending_trials(): assert {trial.status for trial in trials} == set(pending_stati) -def test_fetch_non_completed_trials(): +def test_fetch_non_completed_trials(space: Space): """Fetch a list of the trials that are not completed trials.status in ['new', 'interrupted', 'suspended', 'broken'] @@ -514,7 +519,7 @@ def test_fetch_non_completed_trials(): non_completed_stati = ["new", "interrupted", "suspended", "reserved"] statuses = non_completed_stati + ["completed"] with OrionState(trials=generate_trials(statuses)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] trials = exp.fetch_noncompleted_trials() @@ -522,12 +527,12 @@ def test_fetch_non_completed_trials(): assert {trial.status for trial in trials} == set(non_completed_stati) -def test_is_done_property_with_pending(algorithm): +def test_is_done_property_with_pending(algorithm, space: Space): """Check experiment stopping conditions when there is pending trials.""" completed = ["completed"] * 10 reserved = ["reserved"] * 5 with OrionState(trials=generate_trials(completed + reserved)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] exp.algorithms = algorithm @@ -546,12 +551,12 @@ def test_is_done_property_with_pending(algorithm): assert not exp.is_done -def test_is_done_property_no_pending(algorithm): +def test_is_done_property_no_pending(algorithm, space: Space): """Check experiment stopping conditions when there is no pending trials.""" completed = ["completed"] * 10 broken = ["broken"] * 5 with OrionState(trials=generate_trials(completed + broken)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] exp.algorithms = algorithm @@ -567,13 +572,13 @@ def test_is_done_property_no_pending(algorithm): assert exp.is_done -def test_broken_property(): +def test_broken_property(space: Space): """Check experiment stopping conditions for maximum number of broken.""" MAX_BROKEN = 5 statuses = (["reserved"] * 10) + (["broken"] * (MAX_BROKEN - 1)) with OrionState(trials=generate_trials(statuses)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] exp.max_broken = MAX_BROKEN @@ -582,7 +587,7 @@ def test_broken_property(): statuses = (["reserved"] * 10) + (["broken"] * (MAX_BROKEN)) with OrionState(trials=generate_trials(statuses)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] exp.max_broken = MAX_BROKEN @@ -590,13 +595,13 @@ def test_broken_property(): assert exp.is_broken -def test_configurable_broken_property(): +def test_configurable_broken_property(space: Space): """Check if max_broken changes after configuration.""" MAX_BROKEN = 5 statuses = (["reserved"] * 10) + (["broken"] * (MAX_BROKEN)) with OrionState(trials=generate_trials(statuses)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] exp.max_broken = MAX_BROKEN @@ -608,12 +613,12 @@ def test_configurable_broken_property(): assert not exp.is_broken -def test_experiment_stats(): +def test_experiment_stats(space: Space): """Check that property stats is returning a proper summary of experiment's results.""" NUM_COMPLETED = 3 statuses = (["completed"] * NUM_COMPLETED) + (["reserved"] * 2) with OrionState(trials=generate_trials(statuses)) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] exp.metadata = {"datetime": datetime.datetime.utcnow()} stats = exp.stats @@ -625,11 +630,11 @@ def test_experiment_stats(): assert stats.duration == stats.finish_time - stats.start_time -def test_experiment_pickleable(): +def test_experiment_pickleable(space: Space): """Test experiment instance is pickleable""" with OrionState(trials=generate_trials(["new"])) as cfg: - exp = Experiment("supernaekei", mode="x") + exp = Experiment("supernaekei", mode="x", space=space) exp._id = cfg.trials[0]["experiment"] exp_trials = exp.fetch_trials() @@ -745,13 +750,16 @@ def compare_unsupported(attr_name, restricted_exp, execution_exp): restricted_attr(**kwargs.get(attr_name, {})) -def create_experiment(mode, space, algorithm): - experiment = Experiment("supernaekei", mode=mode) - experiment.space = space - experiment.algorithms = algorithm - experiment.max_broken = 5 - experiment.max_trials = 5 - experiment._id = 1 +def create_experiment(mode: Mode, space: Space, algorithm): + experiment = Experiment( + "supernaekei", + mode=mode, + space=space, + algorithms=algorithm, + max_broken=5, + max_trials=5, + _id=1, + ) return experiment