Skip to content

Commit 8399b30

Browse files
committed
Event for tasks that are late w.r.t. cycle point
Add event settings to a task to report itself as *late" with respect to its (date-time) cycle point.
1 parent 2e054d1 commit 8399b30

File tree

32 files changed

+516
-199
lines changed

32 files changed

+516
-199
lines changed

doc/src/suite-design-guide/roadmap.tex

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ \subsection{List Item Override In Site Include-Files}
1212
\begin{lstlisting}
1313
[scheduling]
1414
[[special tasks]]
15-
clock-triggered = get-data-a, get-data-b
15+
clock-trigger = get-data-a, get-data-b
1616
#...
1717
#...
1818
\end{lstlisting}

lib/cylc/broadcast_mgr.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def load_db_broadcast_states(self, row_idx, row):
179179

180180
def match_ext_trigger(self, itask):
181181
"""Match external triggers for a waiting task proxy."""
182-
if not self.ext_triggers:
182+
if not self.ext_triggers or not itask.state.external_triggers:
183183
return
184184
has_changed = False
185185
for trig, satisfied in itask.state.external_triggers.items():

lib/cylc/cfgspec/suite.py

+2
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ def _coerce_parameter_list(value, keys, _):
397397
'submission timeout': vdr(vtype='interval'),
398398

399399
'expired handler': vdr(vtype='string_list'),
400+
'late offset': vdr(vtype='interval'),
401+
'late handler': vdr(vtype='string_list'),
400402
'submitted handler': vdr(vtype='string_list'),
401403
'started handler': vdr(vtype='string_list'),
402404
'succeeded handler': vdr(vtype='string_list'),

lib/cylc/rundb.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ class CylcSuiteDAO(object):
187187
TABLE_TASK_EVENTS = "task_events"
188188
TABLE_TASK_ACTION_TIMERS = "task_action_timers"
189189
TABLE_CHECKPOINT_ID = "checkpoint_id"
190+
TABLE_TASK_LATE_FLAGS = "task_late_flags"
190191
TABLE_TASK_OUTPUTS = "task_outputs"
191192
TABLE_TASK_POOL = "task_pool"
192193
TABLE_TASK_POOL_CHECKPOINTS = "task_pool_checkpoints"
@@ -272,6 +273,11 @@ class CylcSuiteDAO(object):
272273
["event"],
273274
["message"],
274275
],
276+
TABLE_TASK_LATE_FLAGS: [
277+
["cycle", {"is_primary_key": True}],
278+
["name", {"is_primary_key": True}],
279+
["value", {"datatype": "INTEGER"}],
280+
],
275281
TABLE_TASK_OUTPUTS: [
276282
["cycle", {"is_primary_key": True}],
277283
["name", {"is_primary_key": True}],
@@ -664,8 +670,8 @@ def select_task_pool_for_restart(self, callback, id_key=None):
664670
"""Select from task_pool+task_states+task_jobs for restart.
665671
666672
Invoke callback(row_idx, row) on each row, where each row contains:
667-
[cycle, name, spawned, status, hold_swap, submit_num, try_num,
668-
user_at_host, time_submit, time_run, timeout]
673+
[cycle, name, spawned, is_late, status, hold_swap, submit_num,
674+
try_num, user_at_host, time_submit, time_run, timeout, outputs]
669675
670676
If id_key is specified,
671677
select from task_pool table if id_key == CHECKPOINT_LATEST_ID.
@@ -676,6 +682,7 @@ def select_task_pool_for_restart(self, callback, id_key=None):
676682
%(task_pool)s.cycle,
677683
%(task_pool)s.name,
678684
%(task_pool)s.spawned,
685+
%(task_late_flags)s.value,
679686
%(task_pool)s.status,
680687
%(task_pool)s.hold_swap,
681688
%(task_states)s.submit_num,
@@ -691,6 +698,10 @@ def select_task_pool_for_restart(self, callback, id_key=None):
691698
%(task_states)s
692699
ON %(task_pool)s.cycle == %(task_states)s.cycle AND
693700
%(task_pool)s.name == %(task_states)s.name
701+
LEFT OUTER JOIN
702+
%(task_late_flags)s
703+
ON %(task_pool)s.cycle == %(task_late_flags)s.cycle AND
704+
%(task_pool)s.name == %(task_late_flags)s.name
694705
LEFT OUTER JOIN
695706
%(task_jobs)s
696707
ON %(task_pool)s.cycle == %(task_jobs)s.cycle AND
@@ -708,6 +719,7 @@ def select_task_pool_for_restart(self, callback, id_key=None):
708719
form_data = {
709720
"task_pool": self.TABLE_TASK_POOL,
710721
"task_states": self.TABLE_TASK_STATES,
722+
"task_late_flags": self.TABLE_TASK_LATE_FLAGS,
711723
"task_timeout_timers": self.TABLE_TASK_TIMEOUT_TIMERS,
712724
"task_jobs": self.TABLE_TASK_JOBS,
713725
"task_outputs": self.TABLE_TASK_OUTPUTS,

lib/cylc/scheduler.py

+53-52
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@
5959
from cylc.task_job_mgr import TaskJobManager
6060
from cylc.task_pool import TaskPool
6161
from cylc.task_proxy import TaskProxy, TaskProxySequenceBoundsError
62-
from cylc.task_state import TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED
62+
from cylc.task_state import (
63+
TASK_STATUSES_ACTIVE, TASK_STATUSES_NEVER_ACTIVE, TASK_STATUS_FAILED)
6364
from cylc.templatevars import load_template_vars
6465
from cylc.version import CYLC_VERSION
6566
from cylc.wallclock import (
66-
get_current_time_string, get_seconds_as_interval_string)
67+
get_current_time_string, get_seconds_as_interval_string,
68+
get_time_string_from_unix_time as time2str)
6769
from cylc.profiler import Profiler
6870

6971

@@ -133,15 +135,15 @@ def __init__(self, is_restart, options, args):
133135
self.config = None
134136

135137
self.is_restart = is_restart
136-
self._cli_initial_point_string = None
137-
self._cli_start_point_string = None
138+
self.cli_initial_point_string = None
139+
self.cli_start_point_string = None
138140
start_point_str = None
139141
if len(args) > 1:
140142
start_point_str = args[1]
141143
if getattr(self.options, 'warm', None):
142-
self._cli_start_point_string = start_point_str
144+
self.cli_start_point_string = start_point_str
143145
else:
144-
self._cli_initial_point_string = start_point_str
146+
self.cli_initial_point_string = start_point_str
145147
self.template_vars = load_template_vars(
146148
self.options.templatevars, self.options.templatevars_file)
147149

@@ -382,16 +384,7 @@ def configure(self):
382384
self.load_tasks_for_run()
383385
self.profiler.log_memory("scheduler.py: after load_tasks")
384386

385-
self.suite_db_mgr.put_suite_params(
386-
CYLC_VERSION,
387-
self.task_job_mgr.task_remote_mgr.uuid_str,
388-
self.run_mode,
389-
str(cylc.flags.utc),
390-
self.initial_point,
391-
self.final_point,
392-
self.pool.is_held,
393-
self.config.cfg['cylc']['cycle point format'],
394-
self._cli_start_point_string)
387+
self.suite_db_mgr.put_suite_params(self)
395388
self.suite_db_mgr.put_suite_template_vars(self.template_vars)
396389
self.suite_db_mgr.put_runtime_inheritance(self.config)
397390
self.configure_suite_environment()
@@ -446,8 +439,8 @@ def load_tasks_for_restart(self):
446439
"""Load tasks for restart."""
447440
self.suite_db_mgr.pri_dao.select_suite_params(
448441
self._load_suite_params_2, self.options.checkpoint)
449-
if self._cli_start_point_string:
450-
self.start_point = self._cli_start_point_string
442+
if self.cli_start_point_string:
443+
self.start_point = self.cli_start_point_string
451444
self.suite_db_mgr.pri_dao.select_broadcast_states(
452445
self.task_events_mgr.broadcast_mgr.load_db_broadcast_states,
453446
self.options.checkpoint)
@@ -901,15 +894,7 @@ def command_reload_suite(self):
901894
self.configure_suite_environment()
902895
if self.options.genref or self.options.reftest:
903896
self.configure_reftest(recon=True)
904-
self.suite_db_mgr.put_suite_params(
905-
CYLC_VERSION,
906-
self.task_job_mgr.task_remote_mgr.uuid_str,
907-
self.run_mode,
908-
str(cylc.flags.utc),
909-
self.initial_point,
910-
self.final_point,
911-
self.pool.is_held,
912-
self.config.cfg['cylc']['cycle point format'])
897+
self.suite_db_mgr.put_suite_params(self)
913898
cylc.flags.iflag = True
914899

915900
def set_suite_timer(self):
@@ -993,8 +978,8 @@ def load_suiterc(self, is_reload=False):
993978
self.config = SuiteConfig(
994979
self.suite, self.suiterc, self.template_vars,
995980
run_mode=self.run_mode,
996-
cli_initial_point_string=self._cli_initial_point_string,
997-
cli_start_point_string=self._cli_start_point_string,
981+
cli_initial_point_string=self.cli_initial_point_string,
982+
cli_start_point_string=self.cli_start_point_string,
998983
cli_final_point_string=self.options.final_point_string,
999984
is_reload=is_reload,
1000985
mem_log_func=self.profiler.log_memory,
@@ -1044,17 +1029,17 @@ def load_suiterc(self, is_reload=False):
10441029
self.run_mode = self.config.run_mode
10451030

10461031
def _load_suite_params_1(self, _, row):
1047-
"""Load previous initial/start cycle point.
1032+
"""Load previous initial cycle point or (warm) start cycle point.
10481033
1049-
For restart, it may be missing from "suite.rc", but was specified as a
1050-
command line argument on cold/warm start.
1034+
For restart, these may be missing from "suite.rc", but was specified as
1035+
a command line argument on cold/warm start.
10511036
"""
10521037
key, value = row
1053-
if key == 'initial_point':
1054-
self._cli_initial_point_string = value
1038+
if key == "initial_point":
1039+
self.cli_initial_point_string = value
10551040
self.task_events_mgr.pflag = True
1056-
elif key == 'warm_point':
1057-
self._cli_start_point_string = value
1041+
elif key in ["start_point", "warm_point"]:
1042+
self.cli_start_point_string = value
10581043
self.task_events_mgr.pflag = True
10591044
elif key == 'uuid_str':
10601045
self.task_job_mgr.task_remote_mgr.uuid_str = str(value)
@@ -1245,6 +1230,22 @@ def database_health_check(self):
12451230
# Something has to be very wrong here, so stop the suite
12461231
raise SchedulerError(str(exc))
12471232

1233+
def late_tasks_check(self):
1234+
"""Report tasks that are late for their clock triggers."""
1235+
now = time()
1236+
for itask in self.pool.get_tasks():
1237+
if (not itask.is_late and itask.get_late_time() and
1238+
itask.state.status in TASK_STATUSES_NEVER_ACTIVE and
1239+
now > itask.get_late_time()):
1240+
msg = '%s (late-time=%s)' % (
1241+
self.task_events_mgr.EVENT_LATE,
1242+
time2str(itask.get_late_time()))
1243+
itask.is_late = True
1244+
LOG.warning(msg, itask=itask)
1245+
self.task_events_mgr.setup_event_handlers(
1246+
itask, self.task_events_mgr.EVENT_LATE, msg)
1247+
self.suite_db_mgr.put_insert_task_late_flags(itask)
1248+
12481249
def timeout_check(self):
12491250
"""Check suite and task timers."""
12501251
self.check_suite_timer()
@@ -1373,8 +1374,9 @@ def run(self):
13731374

13741375
# PROCESS ALL TASKS whenever something has changed that might
13751376
# require renegotiation of dependencies, etc.
1376-
if self.process_tasks():
1377+
if self.should_process_tasks():
13771378
self.process_task_pool()
1379+
self.late_tasks_check()
13781380

13791381
self.process_queued_task_messages()
13801382
self.process_command_queue()
@@ -1481,22 +1483,13 @@ def check_suite_stalled(self):
14811483
if self._get_events_conf(self.EVENT_TIMEOUT):
14821484
self.set_suite_timer()
14831485

1484-
def process_tasks(self):
1486+
def should_process_tasks(self):
14851487
"""Return True if waiting tasks are ready."""
14861488
# do we need to do a pass through the main task processing loop?
14871489
process = False
14881490

1489-
# External triggers must be matched now. If any are matched pflag
1490-
# is set to tell process_tasks() that task processing is required.
1491-
broadcast_mgr = self.task_events_mgr.broadcast_mgr
1492-
broadcast_mgr.add_ext_triggers(self.ext_trigger_queue)
1493-
for itask in self.pool.get_tasks():
1494-
if (itask.state.external_triggers and
1495-
broadcast_mgr.match_ext_trigger(itask)):
1496-
process = True
1497-
14981491
if self.task_events_mgr.pflag:
1499-
# this flag is turned on by commands that change task state
1492+
# This flag is turned on by commands that change task state
15001493
process = True
15011494
self.task_events_mgr.pflag = False # reset
15021495

@@ -1505,10 +1498,18 @@ def process_tasks(self):
15051498
process = True
15061499
self.task_job_mgr.task_remote_mgr.ready = False # reset
15071500

1508-
self.pool.set_expired_tasks()
1509-
if self.pool.waiting_tasks_ready():
1510-
process = True
1511-
1501+
broadcast_mgr = self.task_events_mgr.broadcast_mgr
1502+
broadcast_mgr.add_ext_triggers(self.ext_trigger_queue)
1503+
now = time()
1504+
for itask in self.pool.get_tasks():
1505+
# External trigger matching and task expiry must be done
1506+
# regardless, so they need to be in separate "if ..." blocks.
1507+
if broadcast_mgr.match_ext_trigger(itask):
1508+
process = True
1509+
if self.pool.set_expired_task(itask, now):
1510+
process = True
1511+
if itask.is_ready(now):
1512+
process = True
15121513
if self.run_mode == 'simulation' and self.pool.sim_time_check(
15131514
self.message_queue):
15141515
process = True

lib/cylc/suite_db_mgr.py

+28-36
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
from tempfile import mkstemp
3333

3434
from cylc.broadcast_report import get_broadcast_change_iter
35+
import cylc.flags
3536
from cylc.rundb import CylcSuiteDAO
3637
from cylc.suite_logging import ERR, LOG
38+
from cylc.version import CYLC_VERSION
3739
from cylc.wallclock import get_current_time_string
3840

3941

@@ -251,44 +253,33 @@ def put_runtime_inheritance(self, config):
251253
"namespace": namespace,
252254
"inheritance": json.dumps(value)})
253255

254-
def put_suite_params(
255-
self, cylc_version, uuid_str, run_mode, UTC_mode, initial_point,
256-
final_point, is_held, cycle_point_format=None, warm_point=None):
257-
"""Put run mode, cylc version, UTC mode & initial & final cycle
258-
points in runtime database.
256+
def put_suite_params(self, schd):
257+
"""Put various suite parameters from schd in runtime database.
259258
260259
This method queues the relevant insert statements.
261260
262261
Arguments:
263-
cylc_version (str): Cylc version of the running suite.
264-
uuid_str (str): UUID of the suite that persists for restarts.
265-
run_mode (str): "live", "dummy", "simulation", etc.
266-
UTC_mode (boolean): Is UTC mode?
267-
initial_point (str): Initial cycle point.
268-
final_point (str): Final cycle point.
269-
is_held (boolean): Is suite held?
270-
cycle_point_format (str): Format string for cycle points.
271-
warm_point (str): Warn start cycle point.
262+
schd (cylc.scheduler.Scheduler): scheduler object.
272263
"""
273264
self.db_inserts_map[self.TABLE_SUITE_PARAMS].extend([
274-
{"key": "uuid_str", "value": uuid_str},
275-
{"key": "run_mode", "value": run_mode},
276-
{"key": "cylc_version", "value": cylc_version},
277-
{"key": "UTC_mode", "value": UTC_mode},
278-
{"key": "initial_point", "value": str(initial_point)},
279-
{"key": "final_point", "value": str(final_point)},
265+
{"key": "uuid_str",
266+
"value": schd.task_job_mgr.task_remote_mgr.uuid_str},
267+
{"key": "run_mode", "value": schd.run_mode},
268+
{"key": "cylc_version", "value": CYLC_VERSION},
269+
{"key": "UTC_mode", "value": cylc.flags.utc},
270+
{"key": "initial_point", "value": str(schd.initial_point)},
271+
{"key": "final_point", "value": str(schd.final_point)},
280272
])
281-
if cycle_point_format:
282-
self.db_inserts_map[self.TABLE_SUITE_PARAMS].append(
283-
{"key": "cycle_point_format", "value": str(cycle_point_format)}
284-
)
285-
if is_held:
286-
self.db_inserts_map[self.TABLE_SUITE_PARAMS].append(
287-
{"key": "is_held", "value": 1})
288-
if warm_point:
289-
self.db_inserts_map[self.TABLE_SUITE_PARAMS].append(
290-
{"key": "warm_point", "value": str(warm_point)}
291-
)
273+
if schd.config.cfg['cylc']['cycle point format']:
274+
self.db_inserts_map[self.TABLE_SUITE_PARAMS].append({
275+
"key": "cycle_point_format",
276+
"value": schd.config.cfg['cylc']['cycle point format']})
277+
if schd.pool.is_held:
278+
self.db_inserts_map[self.TABLE_SUITE_PARAMS].append({
279+
"key": "is_held", "value": 1})
280+
if schd.cli_start_point_string:
281+
self.db_inserts_map[self.TABLE_SUITE_PARAMS].append({
282+
"key": "start_point", "value": schd.cli_start_point_string})
292283

293284
def put_suite_template_vars(self, template_vars):
294285
"""Put template_vars in runtime database.
@@ -377,6 +368,12 @@ def put_insert_task_events(self, itask, args):
377368
"""Put INSERT statement for task_events table."""
378369
self._put_insert_task_x(CylcSuiteDAO.TABLE_TASK_EVENTS, itask, args)
379370

371+
def put_insert_task_late_flags(self, itask):
372+
"""If itask is late, put INSERT statement to task_late_flags table."""
373+
if itask.is_late:
374+
self._put_insert_task_x(
375+
CylcSuiteDAO.TABLE_TASK_LATE_FLAGS, itask, {"value": True})
376+
380377
def put_insert_task_jobs(self, itask, args):
381378
"""Put INSERT statement for task_jobs table."""
382379
self._put_insert_task_x(CylcSuiteDAO.TABLE_TASK_JOBS, itask, args)
@@ -413,11 +410,6 @@ def put_update_task_outputs(self, itask):
413410
CylcSuiteDAO.TABLE_TASK_OUTPUTS,
414411
itask, {"outputs": json.dumps(items)})
415412

416-
def put_update_task_states(self, itask, set_args):
417-
"""Put UPDATE statement for task_states table."""
418-
self._put_update_task_x(
419-
CylcSuiteDAO.TABLE_TASK_STATES, itask, set_args)
420-
421413
def _put_update_task_x(self, table_name, itask, set_args):
422414
"""Put UPDATE statement for a task_* table."""
423415
where_args = {

0 commit comments

Comments
 (0)