|
8 | 8 | import warnings
|
9 | 9 | import weakref
|
10 | 10 | from pathlib import Path
|
| 11 | +from typing import Callable |
11 | 12 |
|
12 | 13 | import numpy as np
|
13 | 14 | import parse_cm.paths
|
|
16 | 17 | from cxotime import CxoTime
|
17 | 18 | from ska_helpers import retry
|
18 | 19 |
|
| 20 | +from kadi.config import conf |
19 | 21 | from kadi.paths import IDX_CMDS_PATH, PARS_DICT_PATH
|
20 | 22 |
|
21 | 23 | __all__ = [
|
22 | 24 | "read_backstop",
|
23 | 25 | "get_cmds_from_backstop",
|
24 | 26 | "CommandTable",
|
25 | 27 | "add_observations_to_cmds",
|
| 28 | + "filter_cmd_events_by_event", |
| 29 | + "SCS107_EVENTS", |
| 30 | + "filter_scs107_events", |
26 | 31 | ]
|
27 | 32 |
|
| 33 | + |
| 34 | +# Events to filter for getting as-planned commands after SCS-107 |
| 35 | +SCS107_EVENTS = ["SCS-107", "Observing not run", "Obsid", "RTS"] |
| 36 | + |
28 | 37 | logger = logging.getLogger(__name__)
|
29 | 38 |
|
30 | 39 |
|
@@ -1130,3 +1139,58 @@ def get_cxotime_now() -> str | None:
|
1130 | 1139 | )
|
1131 | 1140 |
|
1132 | 1141 | return cxotime_now
|
| 1142 | + |
| 1143 | + |
| 1144 | +def filter_cmd_events_date_stop(date_stop): |
| 1145 | + def func(cmd_events): |
| 1146 | + stop = CxoTime(date_stop) |
| 1147 | + # Filter table based on stop date. Need to use CxoTime on each event separately |
| 1148 | + # because the date format could be inconsistent. |
| 1149 | + ok = [CxoTime(cmd_event["Date"]).date <= stop.date for cmd_event in cmd_events] |
| 1150 | + logger.debug( |
| 1151 | + f"Filtering cmd_events to stop date {stop.date} " |
| 1152 | + f"({np.count_nonzero(ok)} vs {len(cmd_events)})" |
| 1153 | + ) |
| 1154 | + return ok |
| 1155 | + |
| 1156 | + return func |
| 1157 | + |
| 1158 | + |
| 1159 | +def filter_cmd_events_by_event(event_events: list[str]) -> Callable: |
| 1160 | + def func(cmd_events: Table) -> np.ndarray[bool]: |
| 1161 | + ok = ~np.isin(cmd_events["Event"], event_events) |
| 1162 | + logger.info( |
| 1163 | + f"Filtering cmd_events['Event'] that match {event_events} " |
| 1164 | + f"({np.count_nonzero(ok)} vs {len(cmd_events)})" |
| 1165 | + ) |
| 1166 | + return ok |
| 1167 | + |
| 1168 | + return func |
| 1169 | + |
| 1170 | + |
| 1171 | +filter_scs107_events = filter_cmd_events_by_event(SCS107_EVENTS) |
| 1172 | +filter_scs107_events.__doc__ = "Filter SCS-107 related events from command history." |
| 1173 | +filter_scs107_events.__name__ = "filter_scs107_events" |
| 1174 | + |
| 1175 | + |
| 1176 | +def filter_cmd_events_state(cmd_events: Table) -> np.ndarray[bool]: |
| 1177 | + """ |
| 1178 | + Filters command events based on State. |
| 1179 | +
|
| 1180 | + Parameters |
| 1181 | + ---------- |
| 1182 | + cmd_events : Table |
| 1183 | + Command events, where each event has a "State" attribute. |
| 1184 | +
|
| 1185 | + Returns |
| 1186 | + ------- |
| 1187 | + numpy.ndarray |
| 1188 | + A boolean array indicating which command events have allowed states. |
| 1189 | + """ |
| 1190 | + allowed_states = ["Predictive", "Definitive"] |
| 1191 | + # In-work can be used to validate the new event vs telemetry prior to make it |
| 1192 | + # operational. |
| 1193 | + if conf.include_in_work_command_events: |
| 1194 | + allowed_states.append("In-work") |
| 1195 | + ok = np.isin(cmd_events["State"], allowed_states) |
| 1196 | + return ok |
0 commit comments