From 3ab928ccb61a8e1e23cc481ee305f5a036956beb Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 29 Jan 2024 15:12:40 +0000 Subject: [PATCH] stop after cycle point: support offsets * Closes #5939 * Support offsets (measured from the ICP) for the `stop after cycle point` to mirror the behaviour of the `final cycle point`. * Add integration test to lock-down `stop after cycle point` interactions. --- cylc/flow/cfgspec/workflow.py | 21 ++- cylc/flow/config.py | 11 +- cylc/flow/parsec/validate.py | 28 ++++ cylc/flow/scheduler.py | 1 + cylc/flow/task_pool.py | 2 +- .../test_stop_after_cycle_point.py | 132 ++++++++++++++++++ tests/unit/test_config.py | 115 ++++++++------- 7 files changed, 255 insertions(+), 55 deletions(-) create mode 100644 tests/integration/test_stop_after_cycle_point.py diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index de919c27c0f..7ae7cf014b2 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -539,7 +539,7 @@ def get_script_common_text(this: str, example: Optional[str] = None): ''') # NOTE: final cycle point is not a V_CYCLE_POINT to allow expressions # such as '+P1Y' (relative to initial cycle point) - Conf('final cycle point', VDR.V_STRING, desc=''' + Conf('final cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc=''' The (optional) last cycle point at which tasks are run. Once all tasks have reached this cycle point, the @@ -547,6 +547,12 @@ def get_script_common_text(this: str, example: Optional[str] = None): This item can be overridden on the command line using ``cylc play --final-cycle-point`` or ``--fcp``. + + Examples: + + - ``2000`` - Shorthand for ``2000-01-01T00:00``. + - ``+P1D`` - The initial cycle point plus one day. + - ``2000 +P1D +P1Y`` - The year ``2000`` plus one year and one day. ''') Conf('initial cycle point constraints', VDR.V_STRING_LIST, desc=''' Rules to allow only some initial datetime cycle points. @@ -599,7 +605,7 @@ def get_script_common_text(this: str, example: Optional[str] = None): {REPLACES}``[scheduling]hold after point``. ''') - Conf('stop after cycle point', VDR.V_CYCLE_POINT, desc=''' + Conf('stop after cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc=''' Shut down the workflow after all tasks pass this cycle point. The stop cycle point can be overridden on the command line using @@ -612,7 +618,18 @@ def get_script_common_text(this: str, example: Optional[str] = None): choosing not to run that part of the graph. You can play the workflow and continue. + Examples: + + - ``2000`` - Shorthand for ``2000-01-01T00:00``. + - ``+P1D`` - The initial cycle point plus one day. + - ``2000 +P1D +P1Y`` - The year ``2000`` plus one year and one day. + .. versionadded:: 8.0.0 + + .. versionchanged:: 8.3.0 + + This now supports offsets (e.g. ``+P1D``) in the same way the + :cylc:conf:`[..]final cycle point` does. ''') Conf('cycling mode', VDR.V_STRING, Calendar.MODE_GREGORIAN, options=list(Calendar.MODES) + ['integer'], desc=''' diff --git a/cylc/flow/config.py b/cylc/flow/config.py index d80456266bf..c3b085af091 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -852,8 +852,15 @@ def process_stop_cycle_point(self) -> None: stopcp_str = self.cfg['scheduling']['stop after cycle point'] if stopcp_str is not None: - self.stop_point = get_point(stopcp_str).standardise() - if self.final_point and (self.stop_point > self.final_point): + self.stop_point = get_point_relative( + stopcp_str, + self.initial_point, + ).standardise() + if ( + self.final_point is not None + and self.stop_point is not None + and self.stop_point > self.final_point + ): LOG.warning( f"Stop cycle point '{self.stop_point}' will have no " "effect as it is after the final cycle " diff --git a/cylc/flow/parsec/validate.py b/cylc/flow/parsec/validate.py index 4ef66e19862..6e0e604d0c5 100644 --- a/cylc/flow/parsec/validate.py +++ b/cylc/flow/parsec/validate.py @@ -657,6 +657,7 @@ class CylcConfigValidator(ParsecValidator): V_CYCLE_POINT = 'V_CYCLE_POINT' V_CYCLE_POINT_FORMAT = 'V_CYCLE_POINT_FORMAT' V_CYCLE_POINT_TIME_ZONE = 'V_CYCLE_POINT_TIME_ZONE' + V_CYCLE_POINT_WITH_OFFSETS = 'V_CYCLE_POINT_WITH_OFFSETS' V_INTERVAL = 'V_INTERVAL' V_INTERVAL_LIST = 'V_INTERVAL_LIST' V_PARAMETER_LIST = 'V_PARAMETER_LIST' @@ -699,6 +700,30 @@ class CylcConfigValidator(ParsecValidator): '-0830': 'UTC minus 8 hours and 30 minutes.' } ), + V_CYCLE_POINT_WITH_OFFSETS: ( + 'cycle point with support for offsets', + 'An integer of date-time cycle point.', + { + '1': 'An integer cycle point.', + '1 +P5': ( + 'An integer cycle point with offsets.', + ' (this evaluates as ``6``)' + ), + '+P5': ( + 'An integer cycle point offset.' + ' This offset is added to the initial cycle point' + ), + '2000-01-01T00:00Z': 'A date-time cycle point.', + '2000-01-01T00:00Z +P1D +P1M': ( + 'A date-time cycle point with offsets.' + ' (this evaluates as ``2000-02-02T00:00Z``' + ), + '2000-01-01T00:00Z +P1D': ( + 'A date-time offset.' + ' This offset is added to the initial cycle point' + ), + } + ), V_INTERVAL: ( 'time interval', 'An ISO8601 duration.', @@ -751,6 +776,9 @@ def __init__(self): self.V_CYCLE_POINT: self.coerce_cycle_point, self.V_CYCLE_POINT_FORMAT: self.coerce_cycle_point_format, self.V_CYCLE_POINT_TIME_ZONE: self.coerce_cycle_point_time_zone, + # NOTE: This type exists for documentation reasons + # it doesn't actually process offsets, that happens later + self.V_CYCLE_POINT_WITH_OFFSETS: self.coerce_str, self.V_INTERVAL: self.coerce_interval, self.V_INTERVAL_LIST: self.coerce_interval_list, self.V_PARAMETER_LIST: self.coerce_parameter_list, diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 0e02f9384d9..ec3ba6f0c58 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -993,6 +993,7 @@ def command_stop( point = TaskID.get_standardised_point(cycle_point) if point is not None and self.pool.set_stop_point(point): self.options.stopcp = str(point) + self.config.stop_point = point self.workflow_db_mgr.put_workflow_stop_cycle_point( self.options.stopcp) elif clock_time is not None: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index a13ba34d3cb..daa4d8fe00e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1026,7 +1026,7 @@ def set_stop_point(self, stop_point: 'PointBase') -> bool: LOG.info(f"Stop point unchanged: {stop_point}") return False - LOG.info("Setting stop point: {stop_point}") + LOG.info(f"Setting stop point: {stop_point}") self.stop_point = stop_point if ( diff --git a/tests/integration/test_stop_after_cycle_point.py b/tests/integration/test_stop_after_cycle_point.py new file mode 100644 index 00000000000..663e03649f8 --- /dev/null +++ b/tests/integration/test_stop_after_cycle_point.py @@ -0,0 +1,132 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test logic pertaining to the stop after cycle points. + +This may be defined in different ways: +* In the workflow configuration. +* On the command line. +* Or loaded from the database. + +When the workflow hits the "stop after" point, it should be wiped (i.e. set +to None). +""" + +from typing import Optional + +from cylc.flow.cycling.integer import IntegerPoint +from cylc.flow.id import Tokens +from cylc.flow.workflow_status import StopMode + + +async def test_stop_after_cycle_point( + flow, + scheduler, + run, + reflog, + complete, +): + """Test the stop after cycle point. + + This ensures: + * The stop after point gets loaded from the config. + * The workflow stops when it hits this point. + * The point gets wiped when the workflow hits this point. + * The point is stored/retrieved from the DB as appropriate. + + """ + async def stops_after_cycle(schd) -> Optional[str]: + """Run the workflow until it stops and return the cycle point.""" + triggers = reflog(schd) + await complete(schd, timeout=2) + assert len(triggers) == 1 # only one task (i.e. cycle) should be run + return Tokens(list(triggers)[0][0], relative=True)['cycle'] + + def get_db_value(schd) -> Optional[str]: + """Return the cycle point value stored in the DB.""" + with schd.workflow_db_mgr.get_pri_dao() as pri_dao: + return dict(pri_dao.select_workflow_params())['stopcp'] + + config = { + 'scheduling': { + 'cycling mode': 'integer', + 'initial cycle point': '1', + 'stop after cycle point': '1', + 'graph': { + 'P1': 'a[-P1] => a', + }, + }, + } + id_ = flow(config) + schd = scheduler(id_, paused_start=False) + async with run(schd): + # the cycle point should be loaded from the workflow configuration + assert schd.config.stop_point == IntegerPoint('1') + + # this value should *not* be written to the database + assert get_db_value(schd) is None + + # the workflow should stop after cycle 1 + assert await stops_after_cycle(schd) == '1' + + # change the configured cycle point to "2" + config['scheduling']['stop after cycle point'] = '2' + id_ = flow(config, id_=id_) + schd = scheduler(id_, paused_start=False) + async with run(schd): + # the cycle point should be reloaded from the workflow configuration + assert schd.config.stop_point == IntegerPoint('2') + + # this value should not be written to the database + assert get_db_value(schd) is None + + # the workflow should stop after cycle 2 + assert await stops_after_cycle(schd) == '2' + + # override the configured value via the CLI option + schd = scheduler(id_, paused_start=False, **{'stopcp': '3'}) + async with run(schd): + # the CLI should take precedence over the config + assert schd.config.stop_point == IntegerPoint('3') + + # this value *should* be written to the database + assert get_db_value(schd) == '3' + + # the workflow should stop after cycle 3 + assert await stops_after_cycle(schd) == '3' + + # once the workflow hits this point, it should get cleared + assert get_db_value(schd) is None + + schd = scheduler(id_, paused_start=False) + async with run(schd): + # the workflow should fall back to the configured value + assert schd.config.stop_point == IntegerPoint('2') + + # override this value whilst the workflow is running + schd.command_stop( + cycle_point=IntegerPoint('4'), + mode=StopMode.REQUEST_CLEAN, + ) + assert schd.config.stop_point == IntegerPoint('4') + + # the new *should* be written to the database + assert get_db_value(schd) == '4' + + schd = scheduler(id_, paused_start=False) + async with run(schd): + # the workflow should stop after cycle 4 + assert await stops_after_cycle(schd) == '4' diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 67f5f0f559a..57b06433181 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -114,7 +114,7 @@ def test_xfunction_imports( """ flow_file.write_text(flow_config) workflow_config = WorkflowConfig( - workflow="name_a_tree", fpath=flow_file, options=Mock(spec=[]), + workflow="name_a_tree", fpath=flow_file, options=SimpleNamespace(spec=[]), xtrigger_mgr=xtrigger_mgr ) assert 'tree' in workflow_config.xtrigger_mgr.functx_map @@ -148,7 +148,7 @@ def test_xfunction_import_error(self, mock_glbl_cfg, tmp_path): WorkflowConfig( workflow="caiman_workflow", fpath=flow_file, - options=Mock(spec=[]) + options=SimpleNamespace(spec=[]) ) assert "not found" in str(excinfo.value) @@ -179,7 +179,7 @@ def test_xfunction_attribute_error(self, mock_glbl_cfg, tmp_path): flow_file.write_text(flow_config) with pytest.raises(XtriggerConfigError) as excinfo: WorkflowConfig(workflow="capybara_workflow", fpath=flow_file, - options=Mock(spec=[])) + options=SimpleNamespace(spec=[])) assert "not found" in str(excinfo.value) def test_xfunction_not_callable(self, mock_glbl_cfg, tmp_path): @@ -211,7 +211,7 @@ def test_xfunction_not_callable(self, mock_glbl_cfg, tmp_path): WorkflowConfig( workflow="workflow_with_not_callable", fpath=flow_file, - options=Mock(spec=[]) + options=SimpleNamespace(spec=[]) ) assert "callable" in str(excinfo.value) @@ -378,14 +378,16 @@ def test_process_icp( expected_err: Exception class expected to be raised plus the message. """ set_cycling_type(cycling_type, time_zone="+0530") - mocked_config = Mock(cycling_type=cycling_type) - mocked_config.cfg = { - 'scheduling': { - 'initial cycle point constraints': [], - **scheduling_cfg - } - } - mocked_config.options.icp = None + mocked_config = SimpleNamespace( + cycling_type=cycling_type, + options=SimpleNamespace(icp=None), + cfg={ + 'scheduling': { + 'initial cycle point constraints': [], + **scheduling_cfg + }, + }, + ) monkeypatch.setattr('cylc.flow.config.get_current_time_string', lambda: '20050102T0615+0530') @@ -461,9 +463,10 @@ def test_process_startcp( expected_err: Expected exception. """ set_cycling_type(ISO8601_CYCLING_TYPE, time_zone="+0530") - mocked_config = Mock(initial_point='18990501T0000+0530') - mocked_config.options.startcp = startcp - mocked_config.options.starttask = starttask + mocked_config = SimpleNamespace( + initial_point='18990501T0000+0530', + options=SimpleNamespace(startcp=startcp, starttask=starttask), + ) monkeypatch.setattr('cylc.flow.config.get_current_time_string', lambda: '20050102T0615+0530') if expected_err is not None: @@ -662,17 +665,20 @@ def test_process_fcp( expected_err: Exception class expected to be raised plus the message. """ set_cycling_type(cycling_type, time_zone='+0530') - mocked_config = Mock(cycling_type=cycling_type) - mocked_config.cfg = { - 'scheduling': { - 'final cycle point constraints': [], - **scheduling_cfg - } - } - mocked_config.initial_point = loader.get_point( - scheduling_cfg['initial cycle point']).standardise() - mocked_config.final_point = None - mocked_config.options.fcp = options_fcp + mocked_config = SimpleNamespace( + cycling_type=cycling_type, + cfg={ + 'scheduling': { + 'final cycle point constraints': [], + **scheduling_cfg, + }, + }, + initial_point=loader.get_point( + scheduling_cfg['initial cycle point'] + ).standardise(), + final_point = None, + options=SimpleNamespace(fcp=options_fcp), + ) if expected_err: err, msg = expected_err @@ -692,24 +698,28 @@ def test_process_fcp( [ pytest.param( None, None, None, None, None, - id="No stopcp" + id="no-stopcp" ), pytest.param( '1993', None, '1993', None, None, - id="From config by default" + id="stopcp" ), pytest.param( '1993', '1066', '1066', '1066', None, - id="From options" + id="stop-cp-and-cli-option" ), pytest.param( '1993', 'reload', '1993', None, None, - id="From cfg if --stopcp=reload on restart" + id="stop-cp-and-cli-reload-option" ), pytest.param( '3000', None, None, None, "will have no effect as it is after the final cycle point", - id="stopcp > fcp" + id="stopcp-beyond-fcp" + ), + pytest.param( + '+P12Y -P2Y', None, '2000', None, None, + id="stopcp-relative-to-icp" ), ] ) @@ -734,12 +744,13 @@ def test_process_stop_cycle_point( set_cycling_type(ISO8601_CYCLING_TYPE, dump_format='CCYY') caplog.set_level(logging.WARNING, CYLC_LOG) fcp = loader.get_point('2012').standardise() - mock_config = Mock( + mock_config = SimpleNamespace( cfg={ 'scheduling': { 'stop after cycle point': cfg_stopcp } }, + initial_point=ISO8601Point('1990'), final_point=fcp, stop_point=None, options=RunOptions(stopcp=options_stopcp), @@ -887,7 +898,7 @@ def test_prelim_process_graph( processing. expected_err: Exception class expected to be raised plus the message. """ - mock_config = Mock(cfg={ + mock_config = SimpleNamespace(cfg={ 'scheduling': scheduling_cfg }) @@ -913,13 +924,15 @@ def _test(utc_mode, expected, expected_warnings=0): UTC mode = {utc_mode['glbl']} ''' ) - mock_config = Mock() - mock_config.cfg = { - 'scheduler': { - 'UTC mode': utc_mode['workflow'] - } - } - mock_config.options.utc_mode = utc_mode['stored'] + mock_config = SimpleNamespace( + cfg={ + 'scheduler': { + 'UTC mode': utc_mode['workflow'] + } + }, + options=SimpleNamespace(utc_mode=utc_mode['stored']), + ) + WorkflowConfig.process_utc_mode(mock_config) assert mock_config.cfg['scheduler']['UTC mode'] is expected assert get_utc_mode() is expected @@ -963,13 +976,15 @@ def test_cycle_point_tz(caplog, monkeypatch): def _test(cp_tz, utc_mode, expected, expected_warnings=0): set_utc_mode(utc_mode) - mock_config = Mock() - mock_config.cfg = { - 'scheduler': { - 'cycle point time zone': cp_tz['workflow'] - } - } - mock_config.options.cycle_point_tz = cp_tz['stored'] + mock_config = SimpleNamespace( + cfg={ + 'scheduler': { + 'cycle point time zone': cp_tz['workflow'], + }, + }, + options=SimpleNamespace(cycle_point_tz=cp_tz['stored']), + ) + WorkflowConfig.process_cycle_point_tz(mock_config) assert mock_config.cfg['scheduler'][ 'cycle point time zone'] == expected @@ -1148,7 +1163,7 @@ def test_process_runahead_limit( set_cycling_type: Callable ) -> None: set_cycling_type(cycling_type) - mock_config = Mock(cycling_type=cycling_type) + mock_config = SimpleNamespace(cycling_type=cycling_type) mock_config.cfg = { 'scheduling': { 'runahead limit': runahead_limit @@ -1170,7 +1185,7 @@ def test_check_circular(opt, monkeypatch, caplog, tmp_flow_config): # ----- Setup ----- caplog.set_level(logging.WARNING, CYLC_LOG) - options = Mock(spec=[], is_validate=True) + options = SimpleNamespace(spec=[], is_validate=True) if opt: setattr(options, opt, True) @@ -1739,7 +1754,7 @@ def test_cylc_env_at_parsing( # Parse the workflow config then check the environment. WorkflowConfig( - workflow="name", fpath=flow_file, options=Mock(spec=[]), + workflow="name", fpath=flow_file, options=SimpleNamespace(spec=[]), run_dir=run_dir )