Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature.skip_mode
Browse files Browse the repository at this point in the history
* upstream/master: (25 commits)
  Log subprocpool stdout/stderr at debug level
  Refactor run mode restart check to avoid unclosed DB connection
  Docs: add version-added admonition [skip ci]
  Hilary review 2
  change log entry generated
  Reimplement xtrigger `sequential` arg post-merge & add tests
  hold spawn on multi seq xtrigs
  Update cylc/flow/cfgspec/workflow.py
  hilary review changes
  Integration tests added
  Review fixes 2
  tests added
  review fixes
  handle restart/reload/remove
  add sequential arg option with wall_clock default True
  default reversed, xtrigger argument added
  parentless sequential xtrigger spawning
  Preserve n-window depth on reload
  Fix type annotation & tidy (cylc#6027)
  Remove burninated flake8 code (cylc#6022)
  ...
  • Loading branch information
wxtim committed Mar 28, 2024
2 parents d40d986 + 9eabebb commit 3507a3b
Show file tree
Hide file tree
Showing 37 changed files with 1,137 additions and 287 deletions.
1 change: 1 addition & 0 deletions changes.d/5738.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optionally spawn parentless xtriggered tasks sequentially - i.e., one at a time, after the previous xtrigger is satisfied, instead of all at once out to the runahead limit. The `wall_clock` xtrigger is now sequential by default.
1 change: 1 addition & 0 deletions changes.d/5959.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue where workflow "timeout" events were not fired in all situations when they should have been.
1 change: 1 addition & 0 deletions changes.d/6011.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a `cylc vip` bug causing remote re-invocation to fail if using `--workflow-name` option.
1 change: 1 addition & 0 deletions changes.d/6029.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Workflow graph window extent is now preserved on reload.
18 changes: 16 additions & 2 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,23 @@ def get_script_common_text(this: str, example: Optional[str] = None):
:ref:`SequentialTasks`.
''')

Conf('sequential xtriggers', VDR.V_BOOLEAN, False,
desc='''
If ``True``, tasks that only depend on xtriggers will not spawn
until the xtrigger of previous (cycle point) instance is satisfied.
Otherwise, they will all spawn at once out to the runahead limit.
This setting can be overridden by the reserved keyword argument
``sequential`` in individual xtrigger declarations.
One sequential xtrigger on a parentless task with multiple
xtriggers will cause sequential spawning.
.. versionadded:: 8.3.0
''')
with Conf('xtriggers', desc='''
This section is for *External Trigger* function declarations -
see :ref:`Section External Triggers`.
This section is for *External Trigger* function declarations -
see :ref:`Section External Triggers`.
'''):
Conf('<xtrigger name>', VDR.V_XTRIGGER, desc='''
Any user-defined event trigger function declarations and
Expand Down
31 changes: 15 additions & 16 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
WorkflowFiles,
check_deprecation,
)
from cylc.flow.workflow_status import RunMode
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
Expand Down Expand Up @@ -520,6 +521,10 @@ def __init__(

self.process_runahead_limit()

run_mode = self.run_mode()
if run_mode in {RunMode.SIMULATION, RunMode.DUMMY}:
configure_sim_modes(self.taskdefs.values(), run_mode)

self.configure_workflow_state_polling_tasks()

self._check_task_event_handlers()
Expand Down Expand Up @@ -1489,20 +1494,9 @@ def process_config_env(self):
os.environ['PATH'] = os.pathsep.join([
os.path.join(self.fdir, 'bin'), os.environ['PATH']])

def run_mode(self, *reqmodes):
"""Return the run mode.
Combine command line option with configuration setting.
If "reqmodes" is specified, return the boolean (mode in reqmodes).
Otherwise, return the mode as a str.
"""
mode = getattr(self.options, 'run_mode', None)
if not mode:
mode = 'live'
if reqmodes:
return mode in reqmodes
else:
return mode
def run_mode(self) -> str:
"""Return the run mode."""
return RunMode.get(self.options)

def _check_task_event_handlers(self):
"""Check custom event handler templates can be expanded.
Expand Down Expand Up @@ -1700,16 +1694,21 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
self.taskdefs[right].add_dependency(dependency, seq)

validator = XtriggerNameValidator.validate
for label in self.cfg['scheduling']['xtriggers']:
xtrigs = self.cfg['scheduling']['xtriggers']
for label in xtrigs:
valid, msg = validator(label)
if not valid:
raise WorkflowConfigError(
f'Invalid xtrigger name "{label}" - {msg}'
)

if self.xtrigger_mgr is not None:
self.xtrigger_mgr.sequential_xtriggers_default = (
self.cfg['scheduling']['sequential xtriggers']
)
for label in xtrig_labels:
try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
xtrig = xtrigs[label]
except KeyError:
if label != 'wall_clock':
raise WorkflowConfigError(f"xtrigger not defined: {label}")
Expand Down
15 changes: 9 additions & 6 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class DataStoreMgr:
ERR_PREFIX_JOBID_MATCH = 'No matching jobs found: '
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: '

def __init__(self, schd):
def __init__(self, schd, n_edge_distance=1):
self.schd = schd
self.id_ = Tokens(
user=self.schd.owner,
Expand All @@ -477,7 +477,7 @@ def __init__(self, schd):
self.updated_state_families = set()
# Update workflow state totals once more post delta application.
self.state_update_follow_on = False
self.n_edge_distance = 1
self.n_edge_distance = n_edge_distance
self.next_n_edge_distance = None
self.latest_state_tasks = {
state: deque(maxlen=LATEST_STATE_TASKS_QUEUE_SIZE)
Expand Down Expand Up @@ -530,7 +530,7 @@ def initiate_data_model(self, reloaded=False):
"""
# Reset attributes/data-store on reload:
if reloaded:
self.__init__(self.schd)
self.__init__(self.schd, self.n_edge_distance)

# Static elements
self.generate_definition_elements()
Expand Down Expand Up @@ -1192,7 +1192,10 @@ def generate_ghost_task(
point,
flow_nums,
submit_num=0,
data_mode=True
data_mode=True,
sequential_xtrigger_labels=(
self.schd.xtrigger_mgr.sequential_xtrigger_labels
),
)

is_orphan = False
Expand Down Expand Up @@ -2110,11 +2113,11 @@ def _family_ascent_point_update(self, fp_id):
self.state_update_families.add(fam_node.first_parent)
self.state_update_families.remove(fp_id)

def set_graph_window_extent(self, n_edge_distance):
def set_graph_window_extent(self, n_edge_distance: int) -> None:
"""Set what the max edge distance will change to.
Args:
n_edge_distance (int):
n_edge_distance:
Maximum edge distance from active node.
"""
Expand Down
13 changes: 7 additions & 6 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,24 +839,25 @@ def put_messages(
)
return (True, f'Messages queued: {len(messages)}')

def set_graph_window_extent(self, n_edge_distance):
def set_graph_window_extent(
self, n_edge_distance: int
) -> Tuple[bool, str]:
"""Set data-store graph window to new max edge distance.
Args:
n_edge_distance (int):
n_edge_distance:
Max edge distance 0..n from active node.
Returns:
tuple: (outcome, message)
outcome (bool)
outcome
True if command successfully queued.
message (str)
message
Information about outcome.
"""
if n_edge_distance >= 0:
self.schd.data_store_mgr.set_graph_window_extent(n_edge_distance)
return (True, f'Maximum edge distance set to {n_edge_distance}')
else:
return (False, 'Edge distance cannot be negative')
return (False, 'Edge distance cannot be negative')
4 changes: 2 additions & 2 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,9 +790,9 @@ def cleanup_sysargv(
new_args[1] = script_name

# replace source path with workflow ID.
if str(source) in sys.argv:
if str(source) in new_args:
new_args.remove(str(source))
if workflow_id not in sys.argv:
if workflow_id not in new_args:
new_args.append(workflow_id)

sys.argv = new_args
Expand Down
12 changes: 6 additions & 6 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,18 @@ def make_symlink_dir(path: Union[Path, str], target: Union[Path, str]) -> bool:
# correct symlink already exists
return False
# symlink name is in use by a physical file or directory
# log and return
LOG.debug(
f"Unable to create symlink to {target}. "
f"The path {path} already exists.")
LOG.warning(
f"Path {path} already exists. Cannot create symlink to {target}."
)
return False
elif path.is_symlink():
# remove a bad symlink.
try:
path.unlink()
except OSError:
except OSError as exc:
raise WorkflowFilesError(
f"Error when symlinking. Failed to unlink bad symlink {path}.")
f"Failed to remove broken symlink {path}\n{exc}"
)
try:
target.mkdir(parents=True, exist_ok=False)
except FileExistsError:
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/run_modes/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union)
from time import time

from metomi.isodatetime.parsers import DurationParser

from cylc.flow import LOG
from cylc.flow.cycling import PointBase
from cylc.flow.cycling.loader import get_point
Expand All @@ -35,7 +37,6 @@
)
from cylc.flow.wallclock import get_unix_time_from_time_string

from metomi.isodatetime.parsers import DurationParser

if TYPE_CHECKING:
from cylc.flow.task_events_mgr import TaskEventsManager
Expand Down
37 changes: 11 additions & 26 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,19 +617,6 @@ def select_workflow_params_restart_count(self):
result = self.connect().execute(stmt).fetchone()
return int(result[0]) if result else 0

def select_workflow_params_run_mode(self):
"""Return original run_mode for workflow_params."""
stmt = rf"""
SELECT
value
FROM
{self.TABLE_WORKFLOW_PARAMS}
WHERE
key == 'run_mode'
""" # nosec (table name is code constant)
result = self.connect().execute(stmt).fetchone()
return result[0] if result else None

def select_workflow_template_vars(self, callback):
"""Select from workflow_template_vars.
Expand Down Expand Up @@ -787,7 +774,7 @@ def select_task_job_platforms(self):

def select_prev_instances(
self, name: str, point: str
) -> List[Tuple[int, bool, Set[int], int]]:
) -> List[Tuple[int, bool, Set[int], str]]:
"""Select task_states table info about previous instances of a task.
Flow merge results in multiple entries for the same submit number.
Expand All @@ -799,19 +786,17 @@ def select_prev_instances(
r"SELECT flow_nums,submit_num,flow_wait,status FROM %(name)s"
r" WHERE name==? AND cycle==?"
) % {"name": self.TABLE_TASK_STATES}
ret = []
for flow_nums_str, submit_num, flow_wait, status in (
self.connect().execute(stmt, (name, point,))
):
ret.append(
(
submit_num,
flow_wait == 1,
deserialise(flow_nums_str),
status
)
return [
(
submit_num,
flow_wait == 1,
deserialise(flow_nums_str),
status
)
return ret
for flow_nums_str, submit_num, flow_wait, status in (
self.connect().execute(stmt, (name, point,))
)
]

def select_latest_flow_nums(self):
"""Return a list of the most recent previous flow numbers."""
Expand Down
Loading

0 comments on commit 3507a3b

Please sign in to comment.