diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 3dc72caa99866..a262cf1d4205d 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -49,7 +49,11 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.mixins import MultiprocessingStartMethodMixin from airflow.utils.net import get_hostname -from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group +from airflow.utils.process_utils import ( + kill_child_processes_by_pids, + reap_process_group, + set_new_process_group, +) from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks @@ -471,8 +475,7 @@ def start(self): """ self.register_exit_signals() - # Start a new process group - os.setpgid(0, 0) + set_new_process_group() self.log.info("Processing files using up to %s processes at a time ", self._parallelism) self.log.info("Process each file at most once every %s seconds", self._file_process_interval) diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py index ca8fc2433e27f..1cbb8e8b6cb05 100644 --- a/airflow/utils/process_utils.py +++ b/airflow/utils/process_utils.py @@ -311,3 +311,18 @@ def check_if_pidfile_process_is_running(pid_file: str, process_name: str): except psutil.NoSuchProcess: # If process is dead remove the pidfile pid_lock_file.break_lock() + + +def set_new_process_group() -> None: + """ + Tries to set current process to a new process group + That makes it easy to kill all sub-process of this at the OS-level, + rather than having to iterate the child processes. + If current process spawn by system call ``exec()`` than keep current process group + """ + + if os.getpid() == os.getsid(0): + # If PID = SID than process a session leader, and it is not possible to change process group + return + + os.setpgid(0, 0) diff --git a/tests/utils/test_process_utils.py b/tests/utils/test_process_utils.py index ab76a051723ec..3154fa8805859 100644 --- a/tests/utils/test_process_utils.py +++ b/tests/utils/test_process_utils.py @@ -39,6 +39,7 @@ check_if_pidfile_process_is_running, execute_in_subprocess, execute_in_subprocess_with_kwargs, + set_new_process_group, ) @@ -220,3 +221,21 @@ def test_raise_error_if_process_is_running(self): f.flush() with pytest.raises(AirflowException, match="is already running under PID"): check_if_pidfile_process_is_running(f.name, process_name="test") + + +class TestSetNewProcessGroup(unittest.TestCase): + @mock.patch("os.setpgid") + def test_not_session_leader(self, mock_set_pid): + pid = os.getpid() + with mock.patch('os.getsid', autospec=True) as mock_get_sid: + mock_get_sid.return_value = pid + 1 + set_new_process_group() + assert mock_set_pid.call_count == 1 + + @mock.patch("os.setpgid") + def test_session_leader(self, mock_set_pid): + pid = os.getpid() + with mock.patch('os.getsid', autospec=True) as mock_get_sid: + mock_get_sid.return_value = pid + set_new_process_group() + assert mock_set_pid.call_count == 0