19
19
20
20
from __future__ import print_function , unicode_literals
21
21
22
+ import signal
22
23
import sys
23
24
import os
24
25
import subprocess
@@ -82,6 +83,31 @@ def _process_exists(pid):
82
83
return True
83
84
84
85
86
+ def _job_pid (name ):
87
+ """Get PID of job or `None` if job does not exist.
88
+
89
+ Args:
90
+ name (str): Name of job.
91
+
92
+ Returns:
93
+ int: PID of job process (or `None` if job doesn't exist).
94
+ """
95
+ pidfile = _pid_file (name )
96
+ if not os .path .exists (pidfile ):
97
+ return
98
+
99
+ with open (pidfile , 'rb' ) as fp :
100
+ pid = int (fp .read ())
101
+
102
+ if _process_exists (pid ):
103
+ return pid
104
+
105
+ try :
106
+ os .unlink (pidfile )
107
+ except Exception : # pragma: no cover
108
+ pass
109
+
110
+
85
111
def is_running (name ):
86
112
"""Test whether task ``name`` is currently running.
87
113
@@ -91,26 +117,18 @@ def is_running(name):
91
117
:rtype: bool
92
118
93
119
"""
94
- pidfile = _pid_file (name )
95
- if not os .path .exists (pidfile ):
96
- return False
97
-
98
- with open (pidfile , 'rb' ) as file_obj :
99
- pid = int (file_obj .read ().strip ())
100
-
101
- if _process_exists (pid ):
120
+ if _job_pid (name ) is not None :
102
121
return True
103
122
104
- elif os .path .exists (pidfile ):
105
- os .unlink (pidfile )
106
-
107
123
return False
108
124
109
125
110
- def _background (stdin = '/dev/null' , stdout = '/dev/null' ,
126
+ def _background (pidfile , stdin = '/dev/null' , stdout = '/dev/null' ,
111
127
stderr = '/dev/null' ): # pragma: no cover
112
128
"""Fork the current process into a background daemon.
113
129
130
+ :param pidfile: file to write PID of daemon process to.
131
+ :type pidfile: filepath
114
132
:param stdin: where to read input
115
133
:type stdin: filepath
116
134
:param stdout: where to write stdout output
@@ -119,24 +137,31 @@ def _background(stdin='/dev/null', stdout='/dev/null',
119
137
:type stderr: filepath
120
138
121
139
"""
122
- def _fork_and_exit_parent (errmsg ):
140
+ def _fork_and_exit_parent (errmsg , wait = False , write = False ):
123
141
try :
124
142
pid = os .fork ()
125
143
if pid > 0 :
144
+ if write : # write PID of child process to `pidfile`
145
+ tmp = pidfile + '.tmp'
146
+ with open (tmp , 'wb' ) as fp :
147
+ fp .write (str (pid ))
148
+ os .rename (tmp , pidfile )
149
+ if wait : # wait for child process to exit
150
+ os .waitpid (pid , 0 )
126
151
os ._exit (0 )
127
152
except OSError as err :
128
153
_log ().critical ('%s: (%d) %s' , errmsg , err .errno , err .strerror )
129
154
raise err
130
155
131
- # Do first fork.
132
- _fork_and_exit_parent ('fork #1 failed' )
156
+ # Do first fork and wait for second fork to finish .
157
+ _fork_and_exit_parent ('fork #1 failed' , wait = True )
133
158
134
159
# Decouple from parent environment.
135
160
os .chdir (wf ().workflowdir )
136
161
os .setsid ()
137
162
138
- # Do second fork.
139
- _fork_and_exit_parent ('fork #2 failed' )
163
+ # Do second fork and write PID to pidfile .
164
+ _fork_and_exit_parent ('fork #2 failed' , write = True )
140
165
141
166
# Now I am a daemon!
142
167
# Redirect standard file descriptors.
@@ -151,10 +176,30 @@ def _fork_and_exit_parent(errmsg):
151
176
os .dup2 (se .fileno (), sys .stderr .fileno ())
152
177
153
178
179
+ def kill (name , sig = signal .SIGTERM ):
180
+ """Send a signal to job ``name`` via :func:`os.kill`.
181
+
182
+ .. versionadded:: 1.29
183
+
184
+ Args:
185
+ name (str): Name of the job
186
+ sig (int, optional): Signal to send (default: SIGTERM)
187
+
188
+ Returns:
189
+ bool: `False` if job isn't running, `True` if signal was sent.
190
+ """
191
+ pid = _job_pid (name )
192
+ if pid is None :
193
+ return False
194
+
195
+ os .kill (pid , sig )
196
+ return True
197
+
198
+
154
199
def run_in_background (name , args , ** kwargs ):
155
200
r"""Cache arguments then call this script again via :func:`subprocess.call`.
156
201
157
- :param name: name of task
202
+ :param name: name of job
158
203
:type name: unicode
159
204
:param args: arguments passed as first argument to :func:`subprocess.call`
160
205
:param \**kwargs: keyword arguments to :func:`subprocess.call`
@@ -183,18 +228,20 @@ def run_in_background(name, args, **kwargs):
183
228
argcache = _arg_cache (name )
184
229
185
230
# Cache arguments
186
- with open (argcache , 'wb' ) as file_obj :
187
- pickle .dump ({'args' : args , 'kwargs' : kwargs }, file_obj )
231
+ with open (argcache , 'wb' ) as fp :
232
+ pickle .dump ({'args' : args , 'kwargs' : kwargs }, fp )
188
233
_log ().debug ('[%s] command cached: %s' , name , argcache )
189
234
190
235
# Call this script
191
236
cmd = ['/usr/bin/python' , __file__ , name ]
192
237
_log ().debug ('[%s] passing job to background runner: %r' , name , cmd )
193
238
retcode = subprocess .call (cmd )
239
+
194
240
if retcode : # pragma: no cover
195
- _log ().error ('[%s] background runner failed with %d' , retcode )
241
+ _log ().error ('[%s] background runner failed with %d' , name , retcode )
196
242
else :
197
243
_log ().debug ('[%s] background job started' , name )
244
+
198
245
return retcode
199
246
200
247
@@ -209,12 +256,17 @@ def main(wf): # pragma: no cover
209
256
name = wf .args [0 ]
210
257
argcache = _arg_cache (name )
211
258
if not os .path .exists (argcache ):
212
- log .critical ('[%s] command cache not found: %r' , name , argcache )
213
- return 1
259
+ msg = '[{0}] command cache not found: {1}' .format (name , argcache )
260
+ log .critical (msg )
261
+ raise IOError (msg )
262
+
263
+ # Fork to background and run command
264
+ pidfile = _pid_file (name )
265
+ _background (pidfile )
214
266
215
267
# Load cached arguments
216
- with open (argcache , 'rb' ) as file_obj :
217
- data = pickle .load (file_obj )
268
+ with open (argcache , 'rb' ) as fp :
269
+ data = pickle .load (fp )
218
270
219
271
# Cached arguments
220
272
args = data ['args' ]
@@ -223,28 +275,18 @@ def main(wf): # pragma: no cover
223
275
# Delete argument cache file
224
276
os .unlink (argcache )
225
277
226
- pidfile = _pid_file (name )
227
-
228
- # Fork to background
229
- _background ()
230
-
231
- # Write PID to file
232
- with open (pidfile , 'wb' ) as file_obj :
233
- file_obj .write (str (os .getpid ()))
234
-
235
- # Run the command
236
278
try :
279
+ # Run the command
237
280
log .debug ('[%s] running command: %r' , name , args )
238
281
239
282
retcode = subprocess .call (args , ** kwargs )
240
283
241
284
if retcode :
242
285
log .error ('[%s] command failed with status %d' , name , retcode )
243
-
244
286
finally :
245
- if os .path . exists (pidfile ):
246
- os . unlink ( pidfile )
247
- log .debug ('[%s] job complete' , name )
287
+ os .unlink (pidfile )
288
+
289
+ log .debug ('[%s] job complete' , name )
248
290
249
291
250
292
if __name__ == '__main__' : # pragma: no cover
0 commit comments