Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate pending trials from parent/child for exc #631

Merged
merged 8 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/orion/client/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ def fetch_trials_by_status(self, status, with_evc_tree=False):
status, with_evc_tree=with_evc_tree
)

def fetch_pending_trials(self, with_evc_tree=False):
"""Fetch all trials with status new, interrupted or suspended

Trials are sorted based on ``Trial.submit_time``

:return: list of :class:`orion.core.worker.trial.Trial` objects
"""
return self._experiment.fetch_pending_trials(with_evc_tree=with_evc_tree)

def fetch_noncompleted_trials(self, with_evc_tree=False):
"""Fetch non-completed trials of this `Experiment` instance.

Expand Down
49 changes: 49 additions & 0 deletions src/orion/core/worker/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from orion.core.evc.adapters import BaseAdapter
from orion.core.evc.experiment import ExperimentNode
from orion.core.io.database import DuplicateKeyError
from orion.core.utils.exceptions import UnsupportedOperation
from orion.core.utils.flatten import flatten
from orion.core.utils.singleton import update_singletons
Expand Down Expand Up @@ -240,6 +241,8 @@ def reserve_trial(self, score_handle=None):

self.fix_lost_trials()

self.duplicate_pending_trials()

selected_trial = self._storage.reserve_trial(self)
log.debug("reserved trial (trial: %s)", selected_trial)
return selected_trial
Expand All @@ -265,6 +268,43 @@ def fix_lost_trials(self):
except FailedUpdate:
log.debug("failed")

def duplicate_pending_trials(self):
"""Find pending trials in EVC and duplicate them in current experiment.

An experiment cannot execute trials from parent experiments otherwise some trials
may have been executed in different environements of different experiment although they
belong to the same experiment. Instead, trials that are pending in parent and child
experiment are copied over to current experiment so that it can be reserved and executed.
The parent or child experiment will only see their original copy of the trial, and
the current experiment will only see the new copy of the trial.
"""
self._check_if_writable()
evc_pending_trials = self._select_evc_call(
with_evc_tree=True, function="fetch_pending_trials"
)
exp_pending_trials = self._select_evc_call(
with_evc_tree=False, function="fetch_pending_trials"
)

exp_trials_ids = set(
trial.compute_trial_hash(trial, ignore_experiment=True)
for trial in exp_pending_trials
)

for trial in evc_pending_trials:
if (
trial.compute_trial_hash(trial, ignore_experiment=True)
in exp_trials_ids
):
continue

trial.experiment = self.id
# Danger danger, race conditions!
try:
self._storage.register_trial(trial)
except DuplicateKeyError:
log.debug("Race condition while trying to duplicate trial %s", trial.id)

# pylint:disable=unused-argument
def update_completed_trial(self, trial, results_file=None):
"""Inform database about an evaluated `trial` with results.
Expand Down Expand Up @@ -354,6 +394,15 @@ def fetch_trials_by_status(self, status, with_evc_tree=False):
"""
return self._select_evc_call(with_evc_tree, "fetch_trials_by_status", status)

def fetch_pending_trials(self, with_evc_tree=False):
"""Fetch all trials with status new, interrupted or suspended

Trials are sorted based on `Trial.submit_time`

:return: list of `Trial` objects
"""
return self._select_evc_call(with_evc_tree, "fetch_pending_trials")

def fetch_noncompleted_trials(self, with_evc_tree=False):
"""Fetch non-completed trials of this `Experiment` instance.

Expand Down
13 changes: 8 additions & 5 deletions src/orion/core/worker/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ def configuration(self):
def observe(self, points, results):
"""See BaseParallelStrategy.observe"""
super(MaxParallelStrategy, self).observe(points, results)
self.max_result = max(
results = [
result["objective"] for result in results if result["objective"] is not None
)
]
if results:
self.max_result = max(results)

def lie(self, trial):
"""See BaseParallelStrategy.lie"""
Expand Down Expand Up @@ -175,9 +177,10 @@ def observe(self, points, results):
objective_values = [
result["objective"] for result in results if result["objective"] is not None
]
self.mean_result = sum(value for value in objective_values) / float(
len(objective_values)
)
if objective_values:
self.mean_result = sum(value for value in objective_values) / float(
len(objective_values)
)

def lie(self, trial):
"""See BaseParallelStrategy.lie"""
Expand Down
85 changes: 85 additions & 0 deletions src/orion/testing/evc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import contextlib
import copy

from orion.client import build_experiment, get_experiment


@contextlib.contextmanager
def disable_duplication(monkeypatch):
def stub(self):
pass

with monkeypatch.context() as m:
m.setattr(
"orion.core.worker.experiment.Experiment.duplicate_pending_trials", stub
)

yield


def generate_trials(exp, trials):
"""Generate trials for each item in trials.

Items of trials can be either dictionary of valid hyperparameters based on exp.space and status
or `None`.

If status not provided, 'new' is used by default.

For items that are `None`, trials are suggested with exp.suggest().
"""
for trial_config in trials:
trial_config = copy.deepcopy(trial_config)
status = trial_config.pop("status", None) if trial_config else None
if trial_config:
trial = exp.insert(params=trial_config)
else:
with exp.suggest() as trial:
# Releases suggested trial when leaving with-clause.
pass

if status is not None:
print(status)
exp._experiment._storage.set_trial_status(trial, status)

print([trial.status for trial in exp.fetch_trials()])


def build_root_experiment(space=None, trials=None):
"""Build a root experiment and generate trials."""
if space is None:
space = {"x": "uniform(0, 100)", "y": "uniform(0, 100)", "z": "uniform(0, 100)"}
if trials is None:
trials = [{"x": i, "y": i * 2, "z": i ** 2} for i in range(4)]

root = build_experiment(name="root", max_trials=len(trials), space=space)

generate_trials(root, trials)


def build_child_experiment(space=None, trials=None, name="child", parent="root"):
"""Build a child experiment by branching from `parent` and generate trials."""
if trials is None:
trials = [None for i in range(6)]

max_trials = get_experiment(parent).max_trials + len(trials)

child = build_experiment(
name=name,
space=space,
max_trials=max_trials,
branching={"branch_from": parent, "enable": True},
)
assert child.name == name
assert child.version == 1

generate_trials(child, trials)


def build_grand_child_experiment(space=None, trials=None):
"""Build a grand-child experiment by branching from `child` and generate trials."""
if trials is None:
trials = [None for i in range(5)]

build_child_experiment(
space=space, trials=trials, name="grand-child", parent="child"
)
20 changes: 10 additions & 10 deletions tests/functional/branching/test_branching.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,15 +831,15 @@ def test_run_entire_full_x_full_y(init_entire):

orion.core.cli.main(
(
"-vv hunt --max-trials 20 --pool-size 1 -n full_x_full_y "
"-vv hunt --max-trials 30 --pool-size 1 -n full_x_full_y "
"./black_box_with_y.py "
"-x~uniform(-10,10) "
"-y~uniform(-10,10,default_value=1)"
).split(" ")
)

assert len(experiment.fetch_trials(with_evc_tree=True)) == 39
assert len(experiment.fetch_trials()) == 20
assert len(experiment.fetch_trials(with_evc_tree=True)) == 30
assert len(experiment.fetch_trials(with_evc_tree=False)) == 30


def test_run_entire_full_x_full_y_no_args(init_entire):
Expand All @@ -850,11 +850,11 @@ def test_run_entire_full_x_full_y_no_args(init_entire):
assert len(experiment.fetch_trials()) == 4

orion.core.cli.main(
("-vv hunt --max-trials 20 --pool-size 1 -n full_x_full_y").split(" ")
("-vv hunt --max-trials 30 --pool-size 1 -n full_x_full_y").split(" ")
)

assert len(experiment.fetch_trials(with_evc_tree=True)) == 39
assert len(experiment.fetch_trials()) == 20
assert len(experiment.fetch_trials(with_evc_tree=True)) == 30
assert len(experiment.fetch_trials(with_evc_tree=False)) == 30


def test_new_algo(init_full_x_new_algo):
Expand All @@ -872,8 +872,8 @@ def test_new_algo(init_full_x_new_algo):
("-vv hunt --max-trials 20 --pool-size 1 -n full_x_new_algo").split(" ")
)

assert len(experiment.fetch_trials(with_evc_tree=True)) == 21
assert len(experiment.fetch_trials()) == 20
assert len(experiment.fetch_trials(with_evc_tree=True)) == 20
assert len(experiment.fetch_trials(with_evc_tree=False)) == 20


def test_new_algo_not_resolved(init_full_x, capsys):
Expand Down Expand Up @@ -1002,8 +1002,8 @@ def test_new_cli(init_full_x_new_cli):
("-vv hunt --max-trials 20 --pool-size 1 -n full_x_new_cli").split(" ")
)

assert len(experiment.fetch_trials(with_evc_tree=True)) == 21
assert len(experiment.fetch_trials()) == 20
assert len(experiment.fetch_trials(with_evc_tree=True)) == 20
assert len(experiment.fetch_trials(with_evc_tree=False)) == 20


@pytest.mark.usefixtures("init_full_x")
Expand Down
Loading