Skip to content

Commit e2a4f2b

Browse files
AdamGlusteinCarreau
authored andcommitted
Fix interrupt handling issues in csp: ensure first node is stopped and reset signaled flag across runs (#206)
* Fix various interrupt handling issues in csp Signed-off-by: Adam Glustein <[email protected]> * Add comment explaining signal handling in multiple engine threads Signed-off-by: Adam Glustein <[email protected]> --------- Signed-off-by: Adam Glustein <[email protected]>
1 parent 64559cb commit e2a4f2b

File tree

5 files changed

+114
-15
lines changed

5 files changed

+114
-15
lines changed

cpp/csp/engine/RootEngine.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,24 @@
1111
namespace csp
1212
{
1313

14-
static volatile bool g_SIGNALED = false;
14+
static volatile int g_SIGNAL_COUNT = 0;
15+
/*
16+
The signal count variable is maintained to ensure that multiple engine threads shutdown properly.
17+
18+
An interrupt should cause all running engines to stop, but should not affect future runs in the same process.
19+
Thus, each root engine keeps track of the signal count when its created. When an interrupt occurs, one engine thread
20+
handles the interrupt by incrementing the count. Then, all other root engines detect the signal by comparing their
21+
initial count to the current count.
22+
23+
Future runs after the interrupt remain unaffected since they are initialized with the updated signal count, and will
24+
only consider themselves "interupted" if another signal is received during their execution.
25+
*/
26+
1527
static struct sigaction g_prevSIGTERMaction;
1628

1729
static void handle_SIGTERM( int signum )
1830
{
19-
g_SIGNALED = true;
31+
g_SIGNAL_COUNT++;
2032
if( g_prevSIGTERMaction.sa_handler )
2133
(*g_prevSIGTERMaction.sa_handler)( signum );
2234
}
@@ -58,6 +70,7 @@ RootEngine::RootEngine( const Dictionary & settings ) : Engine( m_cycleStepTable
5870
m_cycleCount( 0 ),
5971
m_settings( settings ),
6072
m_inRealtime( false ),
73+
m_initSignalCount( g_SIGNAL_COUNT ),
6174
m_pushEventQueue( m_settings.queueWaitTime > TimeDelta::ZERO() )
6275
{
6376
if( settings.get<bool>( "profile", false ) )
@@ -78,7 +91,7 @@ RootEngine::~RootEngine()
7891

7992
bool RootEngine::interrupted() const
8093
{
81-
return g_SIGNALED;
94+
return g_SIGNAL_COUNT != m_initSignalCount;
8295
}
8396

8497
void RootEngine::preRun( DateTime start, DateTime end )
@@ -131,7 +144,7 @@ void RootEngine::processEndCycle()
131144
void RootEngine::runSim( DateTime end )
132145
{
133146
m_inRealtime = false;
134-
while( m_scheduler.hasEvents() && m_state == State::RUNNING && !g_SIGNALED )
147+
while( m_scheduler.hasEvents() && m_state == State::RUNNING && !interrupted() )
135148
{
136149
m_now = m_scheduler.nextTime();
137150
if( m_now > end )
@@ -161,7 +174,7 @@ void RootEngine::runRealtime( DateTime end )
161174

162175
m_inRealtime = true;
163176
bool haveEvents = false;
164-
while( m_state == State::RUNNING && !g_SIGNALED )
177+
while( m_state == State::RUNNING && !interrupted() )
165178
{
166179
TimeDelta waitTime;
167180
if( !m_pendingPushEvents.hasEvents() )

cpp/csp/engine/RootEngine.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class RootEngine : public Engine
127127
PendingPushEvents m_pendingPushEvents;
128128
Settings m_settings;
129129
bool m_inRealtime;
130+
int m_initSignalCount;
130131

131132
PushEventQueue m_pushEventQueue;
132133

cpp/csp/python/PyNode.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,18 +212,17 @@ void PyNode::start()
212212

213213
void PyNode::stop()
214214
{
215-
PyObjectPtr rv = PyObjectPtr::own( PyObject_CallMethod( m_gen.ptr(), "close", nullptr ) );
216-
if( !rv.ptr() )
215+
if( this -> rootEngine() -> interrupted() && PyErr_CheckSignals() == -1 )
217216
{
218-
if( PyErr_Occurred() == PyExc_KeyboardInterrupt )
219-
{
220-
PyErr_Clear();
221-
rv = PyObjectPtr::own( PyObject_CallMethod( m_gen.ptr(), "close", nullptr ) );
222-
}
223-
224-
if( !rv.ptr() )
225-
CSP_THROW( PythonPassthrough, "" );
217+
// When an interrupt occurs a KeyboardInterrupt exception is raised in Python, which we need to clear
218+
// before calling "close" on the generator. Else, the close method will fail due to the unhandled
219+
// exception, and we lose the state of the generator before the "finally" block that calls stop() is executed.
220+
PyErr_Clear();
226221
}
222+
223+
PyObjectPtr rv = PyObjectPtr::own( PyObject_CallMethod( m_gen.ptr(), "close", nullptr ) );
224+
if( !rv.ptr() )
225+
CSP_THROW( PythonPassthrough, "" );
227226
}
228227

229228
PyNode * PyNode::create( PyEngine * pyengine, PyObject * inputs, PyObject * outputs, PyObject * gen )

cpp/csp/python/csptestlibimpl.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,45 @@ EXPORT_CPPNODE( start_n2_throw );
6666

6767
}
6868

69+
namespace interrupt_stop_test
70+
{
71+
72+
using namespace csp::python;
73+
74+
void setStatus( const DialectGenericType & obj_, int64_t idx )
75+
{
76+
PyObjectPtr obj = PyObjectPtr::own( toPython( obj_ ) );
77+
PyObjectPtr list = PyObjectPtr::own( PyObject_GetAttrString( obj.get(), "stopped" ) );
78+
PyList_SET_ITEM( list.get(), idx, Py_True );
79+
}
80+
81+
DECLARE_CPPNODE( set_stop_index )
82+
{
83+
INIT_CPPNODE( set_stop_index ) {}
84+
85+
SCALAR_INPUT( DialectGenericType, obj_ );
86+
SCALAR_INPUT( int64_t, idx );
87+
88+
START() {}
89+
INVOKE() {}
90+
91+
STOP()
92+
{
93+
setStatus( obj_, idx );
94+
}
95+
};
96+
EXPORT_CPPNODE( set_stop_index );
97+
98+
}
99+
69100
}
70101

71102
}
72103

73104
// Test nodes
74105
REGISTER_CPPNODE( csp::cppnodes::testing::stop_start_test, start_n1_set_value );
75106
REGISTER_CPPNODE( csp::cppnodes::testing::stop_start_test, start_n2_throw );
107+
REGISTER_CPPNODE( csp::cppnodes::testing::interrupt_stop_test, set_stop_index );
76108

77109
static PyModuleDef _csptestlibimpl_module = {
78110
PyModuleDef_HEAD_INIT,

csp/tests/test_engine.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2064,6 +2064,60 @@ def g() -> ts[int]:
20642064
csp.run(g, starttime=datetime(2020, 1, 1), endtime=timedelta())
20652065
self.assertTrue(status["started"] and status["stopped"])
20662066

2067+
def test_interrupt_stops_all_nodes(self):
2068+
@csp.node
2069+
def n(l: list, idx: int):
2070+
with csp.stop():
2071+
l[idx] = True
2072+
2073+
@csp.node
2074+
def raise_interrupt():
2075+
with csp.alarms():
2076+
a = csp.alarm(bool)
2077+
with csp.start():
2078+
csp.schedule_alarm(a, timedelta(seconds=1), True)
2079+
if csp.ticked(a):
2080+
import signal
2081+
os.kill(os.getpid(), signal.SIGINT)
2082+
2083+
# Python nodes
2084+
@csp.graph
2085+
def g(l: list):
2086+
n(l, 0)
2087+
n(l, 1)
2088+
n(l, 2)
2089+
raise_interrupt()
2090+
2091+
stopped = [False, False, False]
2092+
with self.assertRaises(KeyboardInterrupt):
2093+
csp.run(g, stopped, starttime=datetime.utcnow(), endtime=timedelta(seconds=60), realtime=True)
2094+
2095+
for element in stopped:
2096+
self.assertTrue(element)
2097+
2098+
# C++ nodes
2099+
class RTI:
2100+
def __init__(self):
2101+
self.stopped = [False, False, False]
2102+
2103+
@csp.node(cppimpl=_csptestlibimpl.set_stop_index)
2104+
def n2(obj_: object, idx: int):
2105+
return
2106+
2107+
@csp.graph
2108+
def g2(rti: RTI):
2109+
n2(rti, 0)
2110+
n2(rti, 1)
2111+
n2(rti, 2)
2112+
raise_interrupt()
2113+
2114+
rti = RTI()
2115+
with self.assertRaises(KeyboardInterrupt):
2116+
csp.run(g2, rti, starttime=datetime.utcnow(), endtime=timedelta(seconds=60), realtime=True)
2117+
2118+
for element in rti.stopped:
2119+
self.assertTrue(element)
2120+
20672121

20682122
if __name__ == "__main__":
20692123
unittest.main()

0 commit comments

Comments
 (0)