Skip to content

Commit 463f504

Browse files
xtriggers: fix update and add is_retry, is_wallclock and is_xtriggered attrs
* Don't update xtriggers incrementally: * Xtriggers were being updated in the data store incrementally. * This doesn't work with GraphQL subscriptions (see cylc#6307). * This turns off incremental updates ensuring that all xtriggers are included in all updates. * Unblocks cylc/cylc-ui#2103 * Add is_retry, is_wallclock and is_xtriggered task attributes. * We would like to be able to incorporate xtriggers into the task icons used in the GUI and Tui. * These attributes allow clients to access the required information without having to subscribe to and monitor all xtriggers. * Unblocks cylc/cylc-ui#331
1 parent 3352164 commit 463f504

14 files changed

+343
-116
lines changed

cylc/flow/config.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1937,7 +1937,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
19371937
)
19381938

19391939
self.xtrigger_collator.add_trig(label, xtrig, self.fdir)
1940-
self.taskdefs[right].add_xtrig_label(label, seq)
1940+
self.taskdefs[right].add_xtrig_label(label, seq, xtrig.func_name)
19411941

19421942
def get_actual_first_point(self, start_point):
19431943
"""Get actual first cycle point for the workflow
@@ -2679,4 +2679,4 @@ def upgrade_clock_triggers(self):
26792679
# Add it to the task, for each sequence that the task appears in.
26802680
taskdef = self.get_taskdef(task_name)
26812681
for seq in taskdef.sequences:
2682-
taskdef.add_xtrig_label(label, seq)
2682+
taskdef.add_xtrig_label(label, seq, xtrig.func_name)

cylc/flow/data_messages.proto

+6
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ message PbTaskProxy {
231231
optional bool flow_wait = 27;
232232
optional PbRuntime runtime = 28;
233233
optional int32 graph_depth = 29;
234+
optional bool is_retry = 30;
235+
optional bool is_wallclock = 31;
236+
optional bool is_xtriggered = 32;
234237
}
235238

236239
message PbFamily {
@@ -270,6 +273,9 @@ message PbFamilyProxy {
270273
optional int32 is_runahead_total = 20;
271274
optional PbRuntime runtime = 21;
272275
optional int32 graph_depth = 22;
276+
optional bool is_retry = 23;
277+
optional bool is_wallclock = 24;
278+
optional bool is_xtriggered = 25;
273279
}
274280

275281
message PbEdge {

cylc/flow/data_messages_pb2.py

+34-34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cylc/flow/data_messages_pb2.pyi

+16-4
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ class PbTrigger(_message.Message):
304304
def __init__(self, id: _Optional[str] = ..., label: _Optional[str] = ..., message: _Optional[str] = ..., satisfied: bool = ..., time: _Optional[float] = ...) -> None: ...
305305

306306
class PbTaskProxy(_message.Message):
307-
__slots__ = ("stamp", "id", "task", "state", "cycle_point", "depth", "job_submits", "outputs", "namespace", "prerequisites", "jobs", "first_parent", "name", "is_held", "edges", "ancestors", "flow_nums", "external_triggers", "xtriggers", "is_queued", "is_runahead", "flow_wait", "runtime", "graph_depth")
307+
__slots__ = ("stamp", "id", "task", "state", "cycle_point", "depth", "job_submits", "outputs", "namespace", "prerequisites", "jobs", "first_parent", "name", "is_held", "edges", "ancestors", "flow_nums", "external_triggers", "xtriggers", "is_queued", "is_runahead", "flow_wait", "runtime", "graph_depth", "is_retry", "is_wallclock", "is_xtriggered")
308308
class OutputsEntry(_message.Message):
309309
__slots__ = ("key", "value")
310310
KEY_FIELD_NUMBER: _ClassVar[int]
@@ -350,6 +350,9 @@ class PbTaskProxy(_message.Message):
350350
FLOW_WAIT_FIELD_NUMBER: _ClassVar[int]
351351
RUNTIME_FIELD_NUMBER: _ClassVar[int]
352352
GRAPH_DEPTH_FIELD_NUMBER: _ClassVar[int]
353+
IS_RETRY_FIELD_NUMBER: _ClassVar[int]
354+
IS_WALLCLOCK_FIELD_NUMBER: _ClassVar[int]
355+
IS_XTRIGGERED_FIELD_NUMBER: _ClassVar[int]
353356
stamp: str
354357
id: str
355358
task: str
@@ -374,7 +377,10 @@ class PbTaskProxy(_message.Message):
374377
flow_wait: bool
375378
runtime: PbRuntime
376379
graph_depth: int
377-
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., task: _Optional[str] = ..., state: _Optional[str] = ..., cycle_point: _Optional[str] = ..., depth: _Optional[int] = ..., job_submits: _Optional[int] = ..., outputs: _Optional[_Mapping[str, PbOutput]] = ..., namespace: _Optional[_Iterable[str]] = ..., prerequisites: _Optional[_Iterable[_Union[PbPrerequisite, _Mapping]]] = ..., jobs: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., name: _Optional[str] = ..., is_held: bool = ..., edges: _Optional[_Iterable[str]] = ..., ancestors: _Optional[_Iterable[str]] = ..., flow_nums: _Optional[str] = ..., external_triggers: _Optional[_Mapping[str, PbTrigger]] = ..., xtriggers: _Optional[_Mapping[str, PbTrigger]] = ..., is_queued: bool = ..., is_runahead: bool = ..., flow_wait: bool = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ...) -> None: ...
380+
is_retry: bool
381+
is_wallclock: bool
382+
is_xtriggered: bool
383+
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., task: _Optional[str] = ..., state: _Optional[str] = ..., cycle_point: _Optional[str] = ..., depth: _Optional[int] = ..., job_submits: _Optional[int] = ..., outputs: _Optional[_Mapping[str, PbOutput]] = ..., namespace: _Optional[_Iterable[str]] = ..., prerequisites: _Optional[_Iterable[_Union[PbPrerequisite, _Mapping]]] = ..., jobs: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., name: _Optional[str] = ..., is_held: bool = ..., edges: _Optional[_Iterable[str]] = ..., ancestors: _Optional[_Iterable[str]] = ..., flow_nums: _Optional[str] = ..., external_triggers: _Optional[_Mapping[str, PbTrigger]] = ..., xtriggers: _Optional[_Mapping[str, PbTrigger]] = ..., is_queued: bool = ..., is_runahead: bool = ..., flow_wait: bool = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ..., is_retry: bool = ..., is_wallclock: bool = ..., is_xtriggered: bool = ...) -> None: ...
378384

379385
class PbFamily(_message.Message):
380386
__slots__ = ("stamp", "id", "name", "meta", "depth", "proxies", "parents", "child_tasks", "child_families", "first_parent", "runtime", "descendants")
@@ -405,7 +411,7 @@ class PbFamily(_message.Message):
405411
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., name: _Optional[str] = ..., meta: _Optional[_Union[PbMeta, _Mapping]] = ..., depth: _Optional[int] = ..., proxies: _Optional[_Iterable[str]] = ..., parents: _Optional[_Iterable[str]] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., descendants: _Optional[_Iterable[str]] = ...) -> None: ...
406412

407413
class PbFamilyProxy(_message.Message):
408-
__slots__ = ("stamp", "id", "cycle_point", "name", "family", "state", "depth", "first_parent", "child_tasks", "child_families", "is_held", "ancestors", "states", "state_totals", "is_held_total", "is_queued", "is_queued_total", "is_runahead", "is_runahead_total", "runtime", "graph_depth")
414+
__slots__ = ("stamp", "id", "cycle_point", "name", "family", "state", "depth", "first_parent", "child_tasks", "child_families", "is_held", "ancestors", "states", "state_totals", "is_held_total", "is_queued", "is_queued_total", "is_runahead", "is_runahead_total", "runtime", "graph_depth", "is_retry", "is_wallclock", "is_xtriggered")
409415
class StateTotalsEntry(_message.Message):
410416
__slots__ = ("key", "value")
411417
KEY_FIELD_NUMBER: _ClassVar[int]
@@ -434,6 +440,9 @@ class PbFamilyProxy(_message.Message):
434440
IS_RUNAHEAD_TOTAL_FIELD_NUMBER: _ClassVar[int]
435441
RUNTIME_FIELD_NUMBER: _ClassVar[int]
436442
GRAPH_DEPTH_FIELD_NUMBER: _ClassVar[int]
443+
IS_RETRY_FIELD_NUMBER: _ClassVar[int]
444+
IS_WALLCLOCK_FIELD_NUMBER: _ClassVar[int]
445+
IS_XTRIGGERED_FIELD_NUMBER: _ClassVar[int]
437446
stamp: str
438447
id: str
439448
cycle_point: str
@@ -455,7 +464,10 @@ class PbFamilyProxy(_message.Message):
455464
is_runahead_total: int
456465
runtime: PbRuntime
457466
graph_depth: int
458-
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., cycle_point: _Optional[str] = ..., name: _Optional[str] = ..., family: _Optional[str] = ..., state: _Optional[str] = ..., depth: _Optional[int] = ..., first_parent: _Optional[str] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., is_held: bool = ..., ancestors: _Optional[_Iterable[str]] = ..., states: _Optional[_Iterable[str]] = ..., state_totals: _Optional[_Mapping[str, int]] = ..., is_held_total: _Optional[int] = ..., is_queued: bool = ..., is_queued_total: _Optional[int] = ..., is_runahead: bool = ..., is_runahead_total: _Optional[int] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ...) -> None: ...
467+
is_retry: bool
468+
is_wallclock: bool
469+
is_xtriggered: bool
470+
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., cycle_point: _Optional[str] = ..., name: _Optional[str] = ..., family: _Optional[str] = ..., state: _Optional[str] = ..., depth: _Optional[int] = ..., first_parent: _Optional[str] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., is_held: bool = ..., ancestors: _Optional[_Iterable[str]] = ..., states: _Optional[_Iterable[str]] = ..., state_totals: _Optional[_Mapping[str, int]] = ..., is_held_total: _Optional[int] = ..., is_queued: bool = ..., is_queued_total: _Optional[int] = ..., is_runahead: bool = ..., is_runahead_total: _Optional[int] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ..., is_retry: bool = ..., is_wallclock: bool = ..., is_xtriggered: bool = ...) -> None: ...
459471

460472
class PbEdge(_message.Message):
461473
__slots__ = ("stamp", "id", "source", "target", "suicide", "cond")

cylc/flow/data_store_mgr.py

+66-25
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,17 @@ def apply_task_proxy_db_history(self):
14841484

14851485
self.db_load_task_proxies.clear()
14861486

1487+
def _populate_xtriggers(self, itask, tproxy):
1488+
"""Transfer xtriggers from the itask onto the PbTaskProxy."""
1489+
for label, (satisfied, _) in itask.state.xtriggers.items():
1490+
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
1491+
itask, label).get_signature()
1492+
xtrig = tproxy.xtriggers[sig]
1493+
xtrig.id = sig
1494+
xtrig.label = label
1495+
xtrig.satisfied = satisfied
1496+
self.xtrigger_tasks.setdefault(sig, set()).add((tproxy.id, label))
1497+
14871498
def _process_internal_task_proxy(
14881499
self,
14891500
itask: 'TaskProxy',
@@ -1517,14 +1528,7 @@ def _process_internal_task_proxy(
15171528
ext_trig.id = trig
15181529
ext_trig.satisfied = satisfied
15191530

1520-
for label, satisfied in itask.state.xtriggers.items():
1521-
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
1522-
itask, label).get_signature()
1523-
xtrig = tproxy.xtriggers[sig]
1524-
xtrig.id = sig
1525-
xtrig.label = label
1526-
xtrig.satisfied = satisfied
1527-
self.xtrigger_tasks.setdefault(sig, set()).add((tproxy.id, label))
1531+
self._populate_xtriggers(itask, tproxy)
15281532

15291533
if tproxy.state in self.latest_state_tasks:
15301534
tp_ref = itask.identity
@@ -2034,6 +2038,7 @@ def _family_ascent_point_update(self, fp_id):
20342038
if child_fam_id in self.updated_state_families:
20352039
continue
20362040
self._family_ascent_point_update(child_fam_id)
2041+
20372042
if fp_id in self.state_update_families:
20382043
fp_updated = self.updated[FAMILY_PROXIES]
20392044
tp_data = self.data[self.workflow_id][TASK_PROXIES]
@@ -2044,6 +2049,9 @@ def _family_ascent_point_update(self, fp_id):
20442049
is_held_total = 0
20452050
is_queued_total = 0
20462051
is_runahead_total = 0
2052+
is_retry = False
2053+
is_wallclock = False
2054+
is_xtriggered = False
20472055
graph_depth = self.n_edge_distance
20482056
for child_id in fam_node.child_families:
20492057
child_node = fp_updated.get(child_id, fp_data.get(child_id))
@@ -2082,12 +2090,38 @@ def _family_ascent_point_update(self, fp_id):
20822090
is_queued_total += 1
20832091

20842092
tp_runahead = tp_delta
2085-
if (tp_runahead is None
2086-
or not tp_runahead.HasField('is_runahead')):
2093+
if (
2094+
tp_runahead is None
2095+
or not tp_runahead.HasField('is_runahead')
2096+
):
20872097
tp_runahead = tp_node
20882098
if tp_runahead.is_runahead:
20892099
is_runahead_total += 1
20902100

2101+
tp_retry = tp_delta
2102+
if tp_retry is None or not tp_retry.HasField('is_retry'):
2103+
tp_retry = tp_node
2104+
if tp_retry.is_retry:
2105+
is_retry = True
2106+
2107+
tp_wallclock = tp_delta
2108+
if (
2109+
tp_wallclock is None
2110+
or not tp_wallclock.HasField('is_wallclock')
2111+
):
2112+
tp_wallclock = tp_node
2113+
if tp_wallclock.is_wallclock:
2114+
is_wallclock = True
2115+
2116+
tp_xtriggered = tp_delta
2117+
if (
2118+
tp_xtriggered is None
2119+
or not tp_xtriggered.HasField('is_xtriggered')
2120+
):
2121+
tp_xtriggered = tp_node
2122+
if tp_xtriggered.is_xtriggered:
2123+
is_xtriggered = True
2124+
20912125
tp_depth = tp_delta
20922126
if tp_depth is None or not tp_depth.HasField('graph_depth'):
20932127
tp_depth = tp_node
@@ -2106,7 +2140,10 @@ def _family_ascent_point_update(self, fp_id):
21062140
is_queued_total=is_queued_total,
21072141
is_runahead=(is_runahead_total > 0),
21082142
is_runahead_total=is_runahead_total,
2109-
graph_depth=graph_depth
2143+
is_retry=is_retry,
2144+
is_wallclock=is_wallclock,
2145+
is_xtriggered=is_xtriggered,
2146+
graph_depth=graph_depth,
21102147
)
21112148
fp_delta.states[:] = state_counter.keys()
21122149
# Use all states to clean up pruned counts
@@ -2296,7 +2333,14 @@ def delta_task_state(self, itask: 'TaskProxy') -> None:
22962333
tp_id, PbTaskProxy(id=tp_id)
22972334
)
22982335
tp_delta.stamp = f'{tp_id}@{update_time}'
2299-
for field in ('is_held', 'is_queued', 'is_runahead'):
2336+
for field in (
2337+
'is_held',
2338+
'is_queued',
2339+
'is_runahead',
2340+
'is_retry',
2341+
'is_wallclock',
2342+
'is_xtriggered',
2343+
):
23002344
val = getattr(itask.state, field)
23012345
if (
23022346
# only update the fields that have changed compared to store:
@@ -2518,7 +2562,7 @@ def delta_task_ext_trigger(
25182562
ext_trigger.time = update_time
25192563
self.updates_pending = True
25202564

2521-
def delta_task_xtrigger(self, sig, satisfied):
2565+
def delta_task_xtrigger(self, itask):
25222566
"""Create delta for change in task proxy xtrigger.
25232567
25242568
Args:
@@ -2529,18 +2573,15 @@ def delta_task_xtrigger(self, sig, satisfied):
25292573
satisfied (bool): Trigger message.
25302574
25312575
"""
2532-
update_time = time()
2533-
for tp_id, label in self.xtrigger_tasks.get(sig, set()):
2534-
# update task instance
2535-
tp_delta = self.updated[TASK_PROXIES].setdefault(
2536-
tp_id, PbTaskProxy(id=tp_id))
2537-
tp_delta.stamp = f'{tp_id}@{update_time}'
2538-
xtrigger = tp_delta.xtriggers[sig]
2539-
xtrigger.id = sig
2540-
xtrigger.label = label
2541-
xtrigger.satisfied = satisfied
2542-
xtrigger.time = update_time
2543-
self.updates_pending = True
2576+
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
2577+
if not tproxy:
2578+
return
2579+
2580+
# refresh all xtriggers (we can't support incremental update on this
2581+
# field at present see https://github.com/cylc/cylc-flow/issues/6307)
2582+
self._populate_xtriggers(itask, tproxy)
2583+
2584+
self.updates_pending = True
25442585

25452586
def delta_from_task_proxy(self, itask: TaskProxy) -> None:
25462587
"""Create delta from existing pool task proxy.

cylc/flow/network/schema.py

+29
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,18 @@ class Meta:
11241124
is_runahead = Boolean(
11251125
description='True if this task is held back by the "runahead limit".',
11261126
)
1127+
is_retry = Boolean(
1128+
description='True if this task has a scheduled retry.',
1129+
)
1130+
is_wallclock = Boolean(
1131+
description='True if this task has an unsatisfied wallclock trigger.',
1132+
)
1133+
is_xtriggered = Boolean(
1134+
description=sstrip(
1135+
'True if this task has an unsatisfied xtrigger'
1136+
' (excluding retry and wallclock xtriggers).'
1137+
),
1138+
)
11271139
flow_nums = String(
11281140
description='The flows this task instance belongs to.',
11291141
)
@@ -1304,6 +1316,23 @@ class Meta:
13041316
is_queued_total = Int()
13051317
is_runahead = Boolean()
13061318
is_runahead_total = Int()
1319+
is_retry = Boolean(
1320+
description=(
1321+
'True if this family contains a task that has a scheduled retry.'
1322+
),
1323+
)
1324+
is_wallclock = Boolean(
1325+
description=(
1326+
'True if this family contains a task that has an'
1327+
' unsatisfied wallclock trigger.'
1328+
),
1329+
)
1330+
is_xtriggered = Boolean(
1331+
description=sstrip(
1332+
'True if this family contains a task that has an unsatisfied'
1333+
' xtrigger (excluding retry and wallclock xtriggers).'
1334+
),
1335+
)
13071336
depth = Int()
13081337
graph_depth = Int(
13091338
description=sstrip('''

cylc/flow/task_events_mgr.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -1280,7 +1280,7 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
12801280
if label in itask.state.xtriggers:
12811281
# retry xtrigger already exists from a previous retry, modify it
12821282
self.xtrigger_mgr.mutate_trig(label, kwargs)
1283-
itask.state.xtriggers[label] = False
1283+
itask.state.update_xtrigger(label, False)
12841284
else:
12851285
# create a new retry xtrigger
12861286
xtrig = SubFuncContext(
@@ -1294,7 +1294,16 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
12941294
xtrig,
12951295
os.getenv("CYLC_WORKFLOW_RUN_DIR")
12961296
)
1297-
itask.state.add_xtrigger(label)
1297+
itask.state.add_xtrigger(label, label, False)
1298+
1299+
sig = label
1300+
1301+
(
1302+
self.data_store_mgr.xtrigger_tasks
1303+
.setdefault(sig, set())
1304+
.add((itask.identity, label))
1305+
)
1306+
self.data_store_mgr.delta_task_xtrigger(itask)
12981307

12991308
if itask.state_reset(TASK_STATUS_WAITING):
13001309
self.data_store_mgr.delta_task_state(itask)

cylc/flow/task_pool.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ def get_or_spawn_task(
772772
xtrig_label in (
773773
self.xtrigger_mgr.xtriggers.sequential_xtrigger_labels)
774774
for sequence, xtrig_labels in tdef.xtrig_labels.items()
775-
for xtrig_label in xtrig_labels
775+
for (xtrig_label, _) in xtrig_labels
776776
if sequence.is_valid(point)
777777
):
778778
is_xtrig_sequential = True

cylc/flow/task_proxy.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -541,8 +541,13 @@ def merge_flows(self, flow_nums: Set) -> None:
541541
)
542542

543543
def state_reset(
544-
self, status=None, is_held=None, is_queued=None, is_runahead=None,
545-
silent=False, forced=False
544+
self,
545+
status=None,
546+
is_held=None,
547+
is_queued=None,
548+
is_runahead=None,
549+
silent=False,
550+
forced=False
546551
) -> bool:
547552
"""Set new state and log the change. Return whether it changed.
548553
@@ -554,7 +559,7 @@ def state_reset(
554559
is_runahead = False
555560

556561
if self.state.reset(
557-
status, is_held, is_queued, is_runahead, forced
562+
status, is_held, is_queued, is_runahead, forced=forced
558563
):
559564
if not silent and not self.transient:
560565
LOG.info(f"[{before}] => {self.state}")

0 commit comments

Comments
 (0)