1
1
from base import *
2
2
from utils .logger import SubprocessLogger
3
3
import shlex
4
+ from concurrent .futures import ThreadPoolExecutor , as_completed
4
5
5
6
6
7
class ProcessHandler :
@@ -154,66 +155,62 @@ def stop_process(self, process_name, key_type=None):
154
155
process_description = (
155
156
f"{ process_name } w/ { key_type } " if key_type else process_name
156
157
)
157
- self .logger .info (f"Stopping { process_description } " )
158
+ self .logger .info (f"Initiating shutdown for { process_description } " )
158
159
159
160
process = self .process_names .get (process_name )
160
- self .logger .debug (f"Process { process_name } found: { process } " )
161
161
if process :
162
+ self .logger .debug (f"Process { process_name } found: { process } " )
162
163
process .terminate ()
163
- time .sleep (2 )
164
- try :
165
- for attempt in range (3 ):
166
- self .logger .debug (
167
- f"Waiting for { process_description } to terminate (attempt { attempt + 1 } )..."
168
- )
164
+ max_attempts = 1 if process_name == "riven_backend" else 3
165
+ attempt = 0
166
+ while attempt < max_attempts :
167
+ self .logger .debug (
168
+ f"Waiting for { process_description } to terminate (attempt { attempt + 1 } )..."
169
+ )
170
+ try :
169
171
process .wait (timeout = 10 )
170
- self .logger .debug (f"Wating on process: { process } " )
171
172
if process .poll () is None :
172
173
self .logger .info (
173
174
f"{ process_description } process terminated gracefully."
174
175
)
175
176
break
176
- else :
177
+ except subprocess . TimeoutExpired :
177
178
self .logger .warning (
178
- f"{ process_description } process did not terminate gracefully, forcing stop."
179
- )
180
- process .kill ()
181
- process .wait ()
182
- self .logger .info (
183
- f"{ process_description } process killed forcefully."
179
+ f"{ process_description } process did not terminate within 10 seconds on attempt { attempt + 1 } ."
184
180
)
185
- except subprocess .TimeoutExpired :
181
+ attempt += 1
182
+ time .sleep (5 )
183
+ if process .poll () is None :
186
184
self .logger .warning (
187
- f"{ process_description } process did not terminate gracefully , forcing stop ."
185
+ f"{ process_description } process did not terminate, forcing shutdown ."
188
186
)
189
187
process .kill ()
190
188
process .wait ()
191
189
self .logger .info (
192
- f"{ process_description } process killed forcefully."
190
+ f"{ process_description } process forcefully terminated ."
193
191
)
194
-
195
192
if self .subprocess_loggers .get (process_name ):
196
193
self .subprocess_loggers [process_name ].stop_logging_stdout ()
197
194
self .subprocess_loggers [process_name ].stop_monitoring_stderr ()
198
195
del self .subprocess_loggers [process_name ]
199
-
196
+ self . logger . debug ( f"Stopped logging for { process_description } " )
200
197
self .process_names .pop (process_name , None )
201
198
process_info = self .processes .pop (process .pid , None )
202
199
if process_info :
203
200
self .logger .debug (
204
- f"Removed process { process_name } with PID { process .pid } from tracking."
201
+ f"Removed { process_description } with PID { process .pid } from tracking."
205
202
)
203
+ self .logger .info (f"{ process_description } shutdown completed." )
206
204
else :
207
205
self .logger .warning (
208
- f"{ process_description } process not found or already stopped."
206
+ f"{ process_description } was not found or has already been stopped."
209
207
)
210
208
except Exception as e :
211
209
self .logger .error (
212
- f"Error stopping subprocess for { process_description } : { e } "
210
+ f"Error occurred while stopping { process_description } : { e } "
213
211
)
214
212
215
213
def shutdown (self , signum = None , frame = None , exit_code = 0 ):
216
- """Handle shutdown process by stopping all running processes and cleaning up."""
217
214
self .shutting_down = True
218
215
self .logger .info ("Shutdown signal received. Cleaning up..." )
219
216
processes_to_stop = [
@@ -226,15 +223,26 @@ def shutdown(self, signum=None, frame=None, exit_code=0):
226
223
"pgAdmin" ,
227
224
"pgAgent" ,
228
225
]
229
- for process_name in processes_to_stop :
230
- self .stop_process (process_name )
226
+
227
+ with ThreadPoolExecutor () as executor :
228
+ futures = {
229
+ executor .submit (self .stop_process , process_name ): process_name
230
+ for process_name in processes_to_stop
231
+ }
232
+
233
+ for future in as_completed (futures ):
234
+ process_name = futures [future ]
235
+ try :
236
+ future .result ()
237
+ self .logger .info (f"{ process_name } has been stopped successfully." )
238
+ except Exception as e :
239
+ self .logger .error (f"Error stopping { process_name } : { e } " )
240
+ time .sleep (5 )
231
241
self .unmount_all ()
232
242
self .logger .info ("Shutdown complete." )
233
243
sys .exit (exit_code )
234
244
235
245
def unmount_all (self ):
236
- """Unmount all mount points in a specified directory."""
237
- RCLONEDIR = "/path/to/rclone/mounts"
238
246
for mount_point in os .listdir (RCLONEDIR ):
239
247
full_path = os .path .join (RCLONEDIR , mount_point )
240
248
if os .path .ismount (full_path ):
0 commit comments