From d13baddba41c2bb46795be5951d0b6a8cbfa49c2 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Fri, 10 Jun 2022 14:43:47 +0400 Subject: [PATCH] misc: create new process group by `set_new_process_group` utility --- airflow/dag_processing/manager.py | 2 +- airflow/task/task_runner/standard_task_runner.py | 4 ++-- airflow/utils/process_utils.py | 5 +++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 38f5ffdbbf73a..3e18a42b5d894 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -217,7 +217,7 @@ def _run_processor_manager( # Make this process start as a new process group - that makes it easy # to kill all sub-process of this at the OS-level, rather than having # to iterate the child processes - os.setpgid(0, 0) + set_new_process_group() setproctitle("airflow scheduler -- DagFileProcessorManager") # Reload configurations and settings to avoid collision with parent process. diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py index 53f873ec1f7b0..7aaf05fba3bf4 100644 --- a/airflow/task/task_runner/standard_task_runner.py +++ b/airflow/task/task_runner/standard_task_runner.py @@ -25,7 +25,7 @@ from airflow.settings import CAN_FORK from airflow.task.task_runner.base_task_runner import BaseTaskRunner -from airflow.utils.process_utils import reap_process_group +from airflow.utils.process_utils import reap_process_group, set_new_process_group class StandardTaskRunner(BaseTaskRunner): @@ -53,7 +53,7 @@ def _start_by_fork(self): return psutil.Process(pid) else: # Start a new process group - os.setpgid(0, 0) + set_new_process_group() import signal signal.signal(signal.SIGINT, signal.SIG_DFL) diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py index 4ae1f4398005e..68d74ebe1e6c7 100644 --- a/airflow/utils/process_utils.py +++ b/airflow/utils/process_utils.py @@ -61,8 +61,9 @@ def reap_process_group( a SIGKILL will be send. :param process_group_id: process group id to kill. - The process that wants to create the group should run `os.setpgid(0, 0)` as the first - command it executes which will set group id = process_id. Effectively the process that is the + The process that wants to create the group should run + `airflow.utils.process_utils.set_new_process_group()` as the first command + it executes which will set group id = process_id. Effectively the process that is the "root" of the group has pid = gid and all other processes in the group have different pids but the same gid (equal the pid of the root process) :param logger: log handler