diff --git a/cosmos/providers/dbt/core/operators.py b/cosmos/providers/dbt/core/operators.py index ad8bda0d20..046a7761e9 100644 --- a/cosmos/providers/dbt/core/operators.py +++ b/cosmos/providers/dbt/core/operators.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import signal from typing import List, Sequence import yaml @@ -71,6 +72,9 @@ class DbtBaseOperator(BaseOperator): in ``skipped`` state (default: 99). If set to ``None``, any non-zero exit code will be treated as a failure. :type skip_exit_code: int + :param cancel_query_on_kill: If true, then cancel any running queries when the task's on_kill() is executed. + Otherwise, the query will keep running when the task is killed. + :type cancel_query_on_kill: bool :param dbt_executable_path: Path to dbt executable can be used with venv (i.e. /home/astro/.pyenv/versions/dbt_venv/bin/dbt) :type dbt_executable_path: str """ @@ -98,6 +102,7 @@ def __init__( append_env: bool = False, output_encoding: str = "utf-8", skip_exit_code: int = 99, + cancel_query_on_kill: bool = True, dbt_executable_path: str = "dbt", **kwargs, ) -> None: @@ -120,6 +125,7 @@ def __init__( self.append_env = append_env self.output_encoding = output_encoding self.skip_exit_code = skip_exit_code + self.cancel_query_on_kill = cancel_query_on_kill self.dbt_executable_path = dbt_executable_path super().__init__(**kwargs) @@ -260,7 +266,12 @@ def execute(self, context: Context): return result.output def on_kill(self) -> None: - self.subprocess_hook.send_sigterm() + if self.cancel_query_on_kill: + self.subprocess_hook.log.info("Sending SIGINT signal to process group") + if self.subprocess_hook.sub_process and hasattr(self.subprocess_hook.sub_process, "pid"): + os.killpg(os.getpgid(self.subprocess_hook.sub_process.pid), signal.SIGINT) + else: + self.subprocess_hook.send_sigterm() class DbtLSOperator(DbtBaseOperator):