Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
from airflow.utils.process_utils import (
kill_child_processes_by_pids,
reap_process_group,
set_new_process_group,
)
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks

Expand Down Expand Up @@ -471,8 +475,7 @@ def start(self):
"""
self.register_exit_signals()

# Start a new process group
os.setpgid(0, 0)
set_new_process_group()

self.log.info("Processing files using up to %s processes at a time ", self._parallelism)
self.log.info("Process each file at most once every %s seconds", self._file_process_interval)
Expand Down
15 changes: 15 additions & 0 deletions airflow/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,18 @@ def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
except psutil.NoSuchProcess:
# If process is dead remove the pidfile
pid_lock_file.break_lock()


def set_new_process_group() -> None:
"""
Tries to set current process to a new process group
That makes it easy to kill all sub-process of this at the OS-level,
rather than having to iterate the child processes.
If current process spawn by system call ``exec()`` than keep current process group
"""

if os.getpid() == os.getsid(0):
# If PID = SID than process a session leader, and it is not possible to change process group
return

os.setpgid(0, 0)
19 changes: 19 additions & 0 deletions tests/utils/test_process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
check_if_pidfile_process_is_running,
execute_in_subprocess,
execute_in_subprocess_with_kwargs,
set_new_process_group,
)


Expand Down Expand Up @@ -220,3 +221,21 @@ def test_raise_error_if_process_is_running(self):
f.flush()
with pytest.raises(AirflowException, match="is already running under PID"):
check_if_pidfile_process_is_running(f.name, process_name="test")


class TestSetNewProcessGroup(unittest.TestCase):
@mock.patch("os.setpgid")
def test_not_session_leader(self, mock_set_pid):
pid = os.getpid()
with mock.patch('os.getsid', autospec=True) as mock_get_sid:
mock_get_sid.return_value = pid + 1
set_new_process_group()
assert mock_set_pid.call_count == 1

@mock.patch("os.setpgid")
def test_session_leader(self, mock_set_pid):
pid = os.getpid()
with mock.patch('os.getsid', autospec=True) as mock_get_sid:
mock_get_sid.return_value = pid
set_new_process_group()
assert mock_set_pid.call_count == 0