Skip to content

Commit

Permalink
stop after cycle point: support offsets
Browse files Browse the repository at this point in the history
* Closes cylc#5939
* Support offsets (measured from the ICP) for the `stop after cycle point`
  to mirror the behaviour of the `final cycle point`.
* Add integration test to lock-down `stop after cycle point`
  interactions.
  • Loading branch information
oliver-sanders committed Jan 29, 2024
1 parent efb3016 commit 3ab928c
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 55 deletions.
21 changes: 19 additions & 2 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,20 @@ def get_script_common_text(this: str, example: Optional[str] = None):
''')
# NOTE: final cycle point is not a V_CYCLE_POINT to allow expressions
# such as '+P1Y' (relative to initial cycle point)
Conf('final cycle point', VDR.V_STRING, desc='''
Conf('final cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc='''
The (optional) last cycle point at which tasks are run.
Once all tasks have reached this cycle point, the
workflow will shut down.
This item can be overridden on the command line using
``cylc play --final-cycle-point`` or ``--fcp``.
Examples:
- ``2000`` - Shorthand for ``2000-01-01T00:00``.
- ``+P1D`` - The initial cycle point plus one day.
- ``2000 +P1D +P1Y`` - The year ``2000`` plus one year and one day.
''')
Conf('initial cycle point constraints', VDR.V_STRING_LIST, desc='''
Rules to allow only some initial datetime cycle points.
Expand Down Expand Up @@ -599,7 +605,7 @@ def get_script_common_text(this: str, example: Optional[str] = None):
{REPLACES}``[scheduling]hold after point``.
''')
Conf('stop after cycle point', VDR.V_CYCLE_POINT, desc='''
Conf('stop after cycle point', VDR.V_CYCLE_POINT_WITH_OFFSETS, desc='''
Shut down the workflow after all tasks pass this cycle point.
The stop cycle point can be overridden on the command line using
Expand All @@ -612,7 +618,18 @@ def get_script_common_text(this: str, example: Optional[str] = None):
choosing not to run that part of the graph. You can play
the workflow and continue.
Examples:
- ``2000`` - Shorthand for ``2000-01-01T00:00``.
- ``+P1D`` - The initial cycle point plus one day.
- ``2000 +P1D +P1Y`` - The year ``2000`` plus one year and one day.
.. versionadded:: 8.0.0
.. versionchanged:: 8.3.0
This now supports offsets (e.g. ``+P1D``) in the same way the
:cylc:conf:`[..]final cycle point` does.
''')
Conf('cycling mode', VDR.V_STRING, Calendar.MODE_GREGORIAN,
options=list(Calendar.MODES) + ['integer'], desc='''
Expand Down
11 changes: 9 additions & 2 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,8 +852,15 @@ def process_stop_cycle_point(self) -> None:
stopcp_str = self.cfg['scheduling']['stop after cycle point']

if stopcp_str is not None:
self.stop_point = get_point(stopcp_str).standardise()
if self.final_point and (self.stop_point > self.final_point):
self.stop_point = get_point_relative(
stopcp_str,
self.initial_point,
).standardise()
if (
self.final_point is not None
and self.stop_point is not None
and self.stop_point > self.final_point
):
LOG.warning(
f"Stop cycle point '{self.stop_point}' will have no "
"effect as it is after the final cycle "
Expand Down
28 changes: 28 additions & 0 deletions cylc/flow/parsec/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ class CylcConfigValidator(ParsecValidator):
V_CYCLE_POINT = 'V_CYCLE_POINT'
V_CYCLE_POINT_FORMAT = 'V_CYCLE_POINT_FORMAT'
V_CYCLE_POINT_TIME_ZONE = 'V_CYCLE_POINT_TIME_ZONE'
V_CYCLE_POINT_WITH_OFFSETS = 'V_CYCLE_POINT_WITH_OFFSETS'
V_INTERVAL = 'V_INTERVAL'
V_INTERVAL_LIST = 'V_INTERVAL_LIST'
V_PARAMETER_LIST = 'V_PARAMETER_LIST'
Expand Down Expand Up @@ -699,6 +700,30 @@ class CylcConfigValidator(ParsecValidator):
'-0830': 'UTC minus 8 hours and 30 minutes.'
}
),
V_CYCLE_POINT_WITH_OFFSETS: (
'cycle point with support for offsets',
'An integer of date-time cycle point.',
{
'1': 'An integer cycle point.',
'1 +P5': (
'An integer cycle point with offsets.',
' (this evaluates as ``6``)'
),
'+P5': (
'An integer cycle point offset.'
' This offset is added to the initial cycle point'
),
'2000-01-01T00:00Z': 'A date-time cycle point.',
'2000-01-01T00:00Z +P1D +P1M': (
'A date-time cycle point with offsets.'
' (this evaluates as ``2000-02-02T00:00Z``'
),
'2000-01-01T00:00Z +P1D': (
'A date-time offset.'
' This offset is added to the initial cycle point'
),
}
),
V_INTERVAL: (
'time interval',
'An ISO8601 duration.',
Expand Down Expand Up @@ -751,6 +776,9 @@ def __init__(self):
self.V_CYCLE_POINT: self.coerce_cycle_point,
self.V_CYCLE_POINT_FORMAT: self.coerce_cycle_point_format,
self.V_CYCLE_POINT_TIME_ZONE: self.coerce_cycle_point_time_zone,
# NOTE: This type exists for documentation reasons
# it doesn't actually process offsets, that happens later
self.V_CYCLE_POINT_WITH_OFFSETS: self.coerce_str,
self.V_INTERVAL: self.coerce_interval,
self.V_INTERVAL_LIST: self.coerce_interval_list,
self.V_PARAMETER_LIST: self.coerce_parameter_list,
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,7 @@ def command_stop(
point = TaskID.get_standardised_point(cycle_point)
if point is not None and self.pool.set_stop_point(point):
self.options.stopcp = str(point)
self.config.stop_point = point
self.workflow_db_mgr.put_workflow_stop_cycle_point(
self.options.stopcp)
elif clock_time is not None:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ def set_stop_point(self, stop_point: 'PointBase') -> bool:
LOG.info(f"Stop point unchanged: {stop_point}")
return False

LOG.info("Setting stop point: {stop_point}")
LOG.info(f"Setting stop point: {stop_point}")
self.stop_point = stop_point

if (
Expand Down
132 changes: 132 additions & 0 deletions tests/integration/test_stop_after_cycle_point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Test logic pertaining to the stop after cycle points.
This may be defined in different ways:
* In the workflow configuration.
* On the command line.
* Or loaded from the database.
When the workflow hits the "stop after" point, it should be wiped (i.e. set
to None).
"""

from typing import Optional

from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.id import Tokens
from cylc.flow.workflow_status import StopMode


async def test_stop_after_cycle_point(
flow,
scheduler,
run,
reflog,
complete,
):
"""Test the stop after cycle point.
This ensures:
* The stop after point gets loaded from the config.
* The workflow stops when it hits this point.
* The point gets wiped when the workflow hits this point.
* The point is stored/retrieved from the DB as appropriate.
"""
async def stops_after_cycle(schd) -> Optional[str]:
"""Run the workflow until it stops and return the cycle point."""
triggers = reflog(schd)
await complete(schd, timeout=2)
assert len(triggers) == 1 # only one task (i.e. cycle) should be run
return Tokens(list(triggers)[0][0], relative=True)['cycle']

def get_db_value(schd) -> Optional[str]:
"""Return the cycle point value stored in the DB."""
with schd.workflow_db_mgr.get_pri_dao() as pri_dao:
return dict(pri_dao.select_workflow_params())['stopcp']

config = {
'scheduling': {
'cycling mode': 'integer',
'initial cycle point': '1',
'stop after cycle point': '1',
'graph': {
'P1': 'a[-P1] => a',
},
},
}
id_ = flow(config)
schd = scheduler(id_, paused_start=False)
async with run(schd):
# the cycle point should be loaded from the workflow configuration
assert schd.config.stop_point == IntegerPoint('1')

# this value should *not* be written to the database
assert get_db_value(schd) is None

# the workflow should stop after cycle 1
assert await stops_after_cycle(schd) == '1'

# change the configured cycle point to "2"
config['scheduling']['stop after cycle point'] = '2'
id_ = flow(config, id_=id_)
schd = scheduler(id_, paused_start=False)
async with run(schd):
# the cycle point should be reloaded from the workflow configuration
assert schd.config.stop_point == IntegerPoint('2')

# this value should not be written to the database
assert get_db_value(schd) is None

# the workflow should stop after cycle 2
assert await stops_after_cycle(schd) == '2'

# override the configured value via the CLI option
schd = scheduler(id_, paused_start=False, **{'stopcp': '3'})
async with run(schd):
# the CLI should take precedence over the config
assert schd.config.stop_point == IntegerPoint('3')

# this value *should* be written to the database
assert get_db_value(schd) == '3'

# the workflow should stop after cycle 3
assert await stops_after_cycle(schd) == '3'

# once the workflow hits this point, it should get cleared
assert get_db_value(schd) is None

schd = scheduler(id_, paused_start=False)
async with run(schd):
# the workflow should fall back to the configured value
assert schd.config.stop_point == IntegerPoint('2')

# override this value whilst the workflow is running
schd.command_stop(
cycle_point=IntegerPoint('4'),
mode=StopMode.REQUEST_CLEAN,
)
assert schd.config.stop_point == IntegerPoint('4')

# the new *should* be written to the database
assert get_db_value(schd) == '4'

schd = scheduler(id_, paused_start=False)
async with run(schd):
# the workflow should stop after cycle 4
assert await stops_after_cycle(schd) == '4'
Loading

0 comments on commit 3ab928c

Please sign in to comment.