Skip to content

Commit 24b8aa8

Browse files
xtriggers: fix update and add is_retry, is_wallclock and is_xtriggered attrs
* Don't update xtriggers incrementally: * Xtriggers were being updated in the data store incrementally. * This doesn't work with GraphQL subscriptions (see cylc#6307). * This turns off incremental updates ensuring that all xtriggers are included in all updates. * Unblocks cylc/cylc-ui#2103 * Add is_retry, is_wallclock and is_xtriggered task attributes. * We would like to be able to incorporate xtriggers into the task icons used in the GUI and Tui. * These attributes allow clients to access the required information without having to subscribe to and monitor all xtriggers. * Unblocks cylc/cylc-ui#331
1 parent 7284f25 commit 24b8aa8

12 files changed

+235
-85
lines changed

cylc/flow/config.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1937,7 +1937,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
19371937
)
19381938

19391939
self.xtrigger_collator.add_trig(label, xtrig, self.fdir)
1940-
self.taskdefs[right].add_xtrig_label(label, seq)
1940+
self.taskdefs[right].add_xtrig_label(label, seq, xtrig.func_name)
19411941

19421942
def get_actual_first_point(self, start_point):
19431943
"""Get actual first cycle point for the workflow
@@ -2679,4 +2679,4 @@ def upgrade_clock_triggers(self):
26792679
# Add it to the task, for each sequence that the task appears in.
26802680
taskdef = self.get_taskdef(task_name)
26812681
for seq in taskdef.sequences:
2682-
taskdef.add_xtrig_label(label, seq)
2682+
taskdef.add_xtrig_label(label, seq, xtrig.func_name)

cylc/flow/data_messages.proto

+3
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ message PbTaskProxy {
231231
optional bool flow_wait = 27;
232232
optional PbRuntime runtime = 28;
233233
optional int32 graph_depth = 29;
234+
optional bool is_retry = 30;
235+
optional bool is_wallclock = 31;
236+
optional bool is_xtriggered = 32;
234237
}
235238

236239
message PbFamily {

cylc/flow/data_messages_pb2.py

+34-34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cylc/flow/data_messages_pb2.pyi

+8-2
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ class PbTrigger(_message.Message):
304304
def __init__(self, id: _Optional[str] = ..., label: _Optional[str] = ..., message: _Optional[str] = ..., satisfied: bool = ..., time: _Optional[float] = ...) -> None: ...
305305

306306
class PbTaskProxy(_message.Message):
307-
__slots__ = ("stamp", "id", "task", "state", "cycle_point", "depth", "job_submits", "outputs", "namespace", "prerequisites", "jobs", "first_parent", "name", "is_held", "edges", "ancestors", "flow_nums", "external_triggers", "xtriggers", "is_queued", "is_runahead", "flow_wait", "runtime", "graph_depth")
307+
__slots__ = ("stamp", "id", "task", "state", "cycle_point", "depth", "job_submits", "outputs", "namespace", "prerequisites", "jobs", "first_parent", "name", "is_held", "edges", "ancestors", "flow_nums", "external_triggers", "xtriggers", "is_queued", "is_runahead", "flow_wait", "runtime", "graph_depth", "is_retry", "is_wallclock", "is_xtriggered")
308308
class OutputsEntry(_message.Message):
309309
__slots__ = ("key", "value")
310310
KEY_FIELD_NUMBER: _ClassVar[int]
@@ -350,6 +350,9 @@ class PbTaskProxy(_message.Message):
350350
FLOW_WAIT_FIELD_NUMBER: _ClassVar[int]
351351
RUNTIME_FIELD_NUMBER: _ClassVar[int]
352352
GRAPH_DEPTH_FIELD_NUMBER: _ClassVar[int]
353+
IS_RETRY_FIELD_NUMBER: _ClassVar[int]
354+
IS_WALLCLOCK_FIELD_NUMBER: _ClassVar[int]
355+
IS_XTRIGGERED_FIELD_NUMBER: _ClassVar[int]
353356
stamp: str
354357
id: str
355358
task: str
@@ -374,7 +377,10 @@ class PbTaskProxy(_message.Message):
374377
flow_wait: bool
375378
runtime: PbRuntime
376379
graph_depth: int
377-
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., task: _Optional[str] = ..., state: _Optional[str] = ..., cycle_point: _Optional[str] = ..., depth: _Optional[int] = ..., job_submits: _Optional[int] = ..., outputs: _Optional[_Mapping[str, PbOutput]] = ..., namespace: _Optional[_Iterable[str]] = ..., prerequisites: _Optional[_Iterable[_Union[PbPrerequisite, _Mapping]]] = ..., jobs: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., name: _Optional[str] = ..., is_held: bool = ..., edges: _Optional[_Iterable[str]] = ..., ancestors: _Optional[_Iterable[str]] = ..., flow_nums: _Optional[str] = ..., external_triggers: _Optional[_Mapping[str, PbTrigger]] = ..., xtriggers: _Optional[_Mapping[str, PbTrigger]] = ..., is_queued: bool = ..., is_runahead: bool = ..., flow_wait: bool = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ...) -> None: ...
380+
is_retry: bool
381+
is_wallclock: bool
382+
is_xtriggered: bool
383+
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., task: _Optional[str] = ..., state: _Optional[str] = ..., cycle_point: _Optional[str] = ..., depth: _Optional[int] = ..., job_submits: _Optional[int] = ..., outputs: _Optional[_Mapping[str, PbOutput]] = ..., namespace: _Optional[_Iterable[str]] = ..., prerequisites: _Optional[_Iterable[_Union[PbPrerequisite, _Mapping]]] = ..., jobs: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., name: _Optional[str] = ..., is_held: bool = ..., edges: _Optional[_Iterable[str]] = ..., ancestors: _Optional[_Iterable[str]] = ..., flow_nums: _Optional[str] = ..., external_triggers: _Optional[_Mapping[str, PbTrigger]] = ..., xtriggers: _Optional[_Mapping[str, PbTrigger]] = ..., is_queued: bool = ..., is_runahead: bool = ..., flow_wait: bool = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ..., is_retry: bool = ..., is_wallclock: bool = ..., is_xtriggered: bool = ...) -> None: ...
378384

379385
class PbFamily(_message.Message):
380386
__slots__ = ("stamp", "id", "name", "meta", "depth", "proxies", "parents", "child_tasks", "child_families", "first_parent", "runtime", "descendants")

cylc/flow/data_store_mgr.py

+31-22
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,17 @@ def apply_task_proxy_db_history(self):
14841484

14851485
self.db_load_task_proxies.clear()
14861486

1487+
def _populate_xtriggers(self, itask, tproxy):
1488+
"""Transfer xtriggers from the itask onto the PbTaskProxy."""
1489+
for label, (satisfied, _) in itask.state.xtriggers.items():
1490+
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
1491+
itask, label).get_signature()
1492+
xtrig = tproxy.xtriggers[sig]
1493+
xtrig.id = sig
1494+
xtrig.label = label
1495+
xtrig.satisfied = satisfied
1496+
self.xtrigger_tasks.setdefault(sig, set()).add((tproxy.id, label))
1497+
14871498
def _process_internal_task_proxy(
14881499
self,
14891500
itask: 'TaskProxy',
@@ -1517,14 +1528,7 @@ def _process_internal_task_proxy(
15171528
ext_trig.id = trig
15181529
ext_trig.satisfied = satisfied
15191530

1520-
for label, satisfied in itask.state.xtriggers.items():
1521-
sig = self.schd.xtrigger_mgr.get_xtrig_ctx(
1522-
itask, label).get_signature()
1523-
xtrig = tproxy.xtriggers[sig]
1524-
xtrig.id = sig
1525-
xtrig.label = label
1526-
xtrig.satisfied = satisfied
1527-
self.xtrigger_tasks.setdefault(sig, set()).add((tproxy.id, label))
1531+
self._populate_xtriggers(itask, tproxy)
15281532

15291533
if tproxy.state in self.latest_state_tasks:
15301534
tp_ref = itask.identity
@@ -2034,6 +2038,7 @@ def _family_ascent_point_update(self, fp_id):
20342038
if child_fam_id in self.updated_state_families:
20352039
continue
20362040
self._family_ascent_point_update(child_fam_id)
2041+
# TODO: consider accumulating attribute statuses onto the families
20372042
if fp_id in self.state_update_families:
20382043
fp_updated = self.updated[FAMILY_PROXIES]
20392044
tp_data = self.data[self.workflow_id][TASK_PROXIES]
@@ -2296,7 +2301,14 @@ def delta_task_state(self, itask: 'TaskProxy') -> None:
22962301
tp_id, PbTaskProxy(id=tp_id)
22972302
)
22982303
tp_delta.stamp = f'{tp_id}@{update_time}'
2299-
for field in ('is_held', 'is_queued', 'is_runahead'):
2304+
for field in (
2305+
'is_held',
2306+
'is_queued',
2307+
'is_runahead',
2308+
'is_retry',
2309+
'is_wallclock',
2310+
'is_xtriggered',
2311+
):
23002312
val = getattr(itask.state, field)
23012313
if (
23022314
# only update the fields that have changed compared to store:
@@ -2518,7 +2530,7 @@ def delta_task_ext_trigger(
25182530
ext_trigger.time = update_time
25192531
self.updates_pending = True
25202532

2521-
def delta_task_xtrigger(self, sig, satisfied):
2533+
def delta_task_xtrigger(self, itask):
25222534
"""Create delta for change in task proxy xtrigger.
25232535
25242536
Args:
@@ -2529,18 +2541,15 @@ def delta_task_xtrigger(self, sig, satisfied):
25292541
satisfied (bool): Trigger message.
25302542
25312543
"""
2532-
update_time = time()
2533-
for tp_id, label in self.xtrigger_tasks.get(sig, set()):
2534-
# update task instance
2535-
tp_delta = self.updated[TASK_PROXIES].setdefault(
2536-
tp_id, PbTaskProxy(id=tp_id))
2537-
tp_delta.stamp = f'{tp_id}@{update_time}'
2538-
xtrigger = tp_delta.xtriggers[sig]
2539-
xtrigger.id = sig
2540-
xtrigger.label = label
2541-
xtrigger.satisfied = satisfied
2542-
xtrigger.time = update_time
2543-
self.updates_pending = True
2544+
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
2545+
if not tproxy:
2546+
return
2547+
2548+
# refresh all xtriggers (we can't support incremental update on this
2549+
# field at present see https://github.com/cylc/cylc-flow/issues/6307)
2550+
self._populate_xtriggers(itask, tproxy)
2551+
2552+
self.updates_pending = True
25442553

25452554
def delta_from_task_proxy(self, itask: TaskProxy) -> None:
25462555
"""Create delta from existing pool task proxy.

cylc/flow/network/schema.py

+13
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ class SortArgs(InputObjectType):
197197
'sort': SortArgs(default_value=None),
198198
}
199199

200+
# TODO: consider making these args searchable
200201
PROXY_ARGS = {
201202
'ids': graphene.List(ID, default_value=[]),
202203
'exids': graphene.List(ID, default_value=[]),
@@ -1124,6 +1125,18 @@ class Meta:
11241125
is_runahead = Boolean(
11251126
description='True if this task is held back by the "runahead limit".',
11261127
)
1128+
is_retry = Boolean(
1129+
description='True if this task has a scheduled retry.',
1130+
)
1131+
is_wallclock = Boolean(
1132+
description='True if this task has an unsatisfied wallclock trigger.',
1133+
)
1134+
is_xtriggered = Boolean(
1135+
description=sstrip(
1136+
'True if this task has an unsatisfied xtrigger'
1137+
' (excluding retry and wallclock xtriggers).'
1138+
),
1139+
)
11271140
flow_nums = String(
11281141
description='The flows this task instance belongs to.',
11291142
)

cylc/flow/task_events_mgr.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -1294,7 +1294,12 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
12941294
xtrig,
12951295
os.getenv("CYLC_WORKFLOW_RUN_DIR")
12961296
)
1297-
itask.state.add_xtrigger(label)
1297+
itask.state.add_xtrigger(label, label, False)
1298+
1299+
sig = label
1300+
1301+
self.data_store_mgr.xtrigger_tasks.setdefault(sig, set()).add((itask.identity, label))
1302+
self.data_store_mgr.delta_task_xtrigger(itask)
12981303

12991304
if itask.state_reset(TASK_STATUS_WAITING):
13001305
self.data_store_mgr.delta_task_state(itask)

cylc/flow/task_pool.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ def get_or_spawn_task(
772772
xtrig_label in (
773773
self.xtrigger_mgr.xtriggers.sequential_xtrigger_labels)
774774
for sequence, xtrig_labels in tdef.xtrig_labels.items()
775-
for xtrig_label in xtrig_labels
775+
for (xtrig_label, _) in xtrig_labels
776776
if sequence.is_valid(point)
777777
):
778778
is_xtrig_sequential = True

cylc/flow/task_proxy.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -541,8 +541,13 @@ def merge_flows(self, flow_nums: Set) -> None:
541541
)
542542

543543
def state_reset(
544-
self, status=None, is_held=None, is_queued=None, is_runahead=None,
545-
silent=False, forced=False
544+
self,
545+
status=None,
546+
is_held=None,
547+
is_queued=None,
548+
is_runahead=None,
549+
silent=False,
550+
forced=False
546551
) -> bool:
547552
"""Set new state and log the change. Return whether it changed.
548553
@@ -554,7 +559,7 @@ def state_reset(
554559
is_runahead = False
555560

556561
if self.state.reset(
557-
status, is_held, is_queued, is_runahead, forced
562+
status, is_held, is_queued, is_runahead, forced=forced
558563
):
559564
if not silent and not self.transient:
560565
LOG.info(f"[{before}] => {self.state}")

0 commit comments

Comments
 (0)