Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
5fdf852
addding slack warning
giovannicorsetti Apr 5, 2023
65e4adc
remove unused import
giovannicorsetti Apr 5, 2023
bd957da
add error handling for parsing function
giovannicorsetti Apr 5, 2023
b4122bd
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 5, 2023
14ff9b0
format
giovannicorsetti Apr 5, 2023
ec5cdf1
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 5, 2023
79c3308
handling case where tests are not defined
giovannicorsetti Apr 6, 2023
2456fa9
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 6, 2023
0506c9d
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 6, 2023
7ae0dda
creating unified function for WARN and ERROR
giovannicorsetti Apr 6, 2023
ff4d2eb
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 6, 2023
48dbef7
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 6, 2023
5f789d2
Slack message containing warnings
giovannicorsetti Apr 6, 2023
01499c2
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 6, 2023
9cea7d6
make line shorter
giovannicorsetti Apr 6, 2023
a7926a1
Creating on_warning_callback argument (TODO: document)
giovannicorsetti Apr 6, 2023
831f14b
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 6, 2023
cdeaef7
patch for render.py
giovannicorsetti Apr 6, 2023
f208954
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 6, 2023
aa7492e
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 6, 2023
9f16455
improving robustness & adding local doc
giovannicorsetti Apr 7, 2023
e6d4c79
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 7, 2023
abc215e
adding warn documentation
giovannicorsetti Apr 7, 2023
c97c7bc
Merge branch 'main' into main
CorsettiS Apr 8, 2023
5014aff
slightly change documentation of functions
giovannicorsetti Apr 11, 2023
054f93d
Improve warn function performance & make it more readable
giovannicorsetti Apr 11, 2023
5a19af4
merge current master into it
giovannicorsetti Apr 11, 2023
0fcc04b
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 11, 2023
867f4fb
merge with version 0.6
giovannicorsetti Apr 13, 2023
00a3ee5
implementing changes for local version
giovannicorsetti Apr 13, 2023
3fd4d79
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 13, 2023
7eafcdb
using black for formatting
giovannicorsetti Apr 13, 2023
0464d19
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 13, 2023
45b74ba
fix crash with on_warning_callback on docker & k8s and add note about…
giovannicorsetti Apr 13, 2023
5eba1fe
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 13, 2023
6d86cd3
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 13, 2023
660c1f2
quick patch
giovannicorsetti Apr 13, 2023
daa780b
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 13, 2023
73fdc5c
Merge branch 'main' into main
CorsettiS Apr 13, 2023
05db490
Merge branch 'main' into main
CorsettiS Apr 13, 2023
64c7ba6
solving new branch conflict
giovannicorsetti Apr 14, 2023
f018428
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 14, 2023
7ae00fb
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 14, 2023
c78ea6d
solving new branch conflict patch_1
giovannicorsetti Apr 14, 2023
155adc9
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 14, 2023
5c80ad1
Update warn_parsing.py
CorsettiS Apr 14, 2023
ccfb239
Merge branch 'main' into main
CorsettiS Apr 15, 2023
111f405
reset raw_customers.csv
giovannicorsetti Apr 17, 2023
2add5d8
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2023
a78d551
reset raw_orders.csv
giovannicorsetti Apr 17, 2023
8f05fee
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 17, 2023
d862dd9
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2023
d665193
Update cosmos/providers/dbt/core/operators/local.py
CorsettiS Apr 17, 2023
1658fd1
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2023
f514982
format local.py
giovannicorsetti Apr 17, 2023
a450c55
format adapted_subprocesshook.py
giovannicorsetti Apr 17, 2023
837e5c8
format warn_parsing.py
giovannicorsetti Apr 17, 2023
ff864d1
format raw_orders.csv
giovannicorsetti Apr 17, 2023
e2c4bb4
add extra note consideration
giovannicorsetti Apr 18, 2023
23310d2
Merge branch 'main' into main
CorsettiS Apr 19, 2023
9912f34
add extra note consideration 2
giovannicorsetti Apr 18, 2023
b24e34c
add second note to documentation
giovannicorsetti Apr 19, 2023
545b875
Merge branch 'main' into main
CorsettiS Apr 25, 2023
a4fd279
Merge branch 'main' into main
CorsettiS Apr 27, 2023
3fb0aec
Merge branch 'main' into main
jlaneve Apr 28, 2023
6b7addd
Merge branch 'main' into main
CorsettiS Apr 28, 2023
7280ca8
renaming subprocesshook to fulloutputsubprocesshook + adding descript…
giovannicorsetti Apr 29, 2023
58268fd
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 29, 2023
3efd844
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 29, 2023
c8c4deb
adding tests for warning functions
giovannicorsetti Apr 29, 2023
9d172eb
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 29, 2023
0000b9e
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 29, 2023
329462a
improving consistency of tests
giovannicorsetti Apr 29, 2023
3774add
Merge remote-tracking branch 'origin/main'
giovannicorsetti Apr 29, 2023
2e62efe
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Apr 29, 2023
e5bfcfb
Merge branch 'main' into main
CorsettiS May 1, 2023
d61313b
renaming all SubprocessResult for FullOutputSubprocessResult
giovannicorsetti May 1, 2023
07ff0d3
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] May 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cosmos/providers/dbt/core/operators/docker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import logging
from typing import Sequence
from typing import Callable, Optional, Sequence

import yaml
from airflow.utils.context import Context
Expand Down Expand Up @@ -137,9 +137,13 @@ class DbtTestDockerOperator(DbtDockerBaseOperator):

ui_color = "#8194E0"

def __init__(self, **kwargs) -> None:
def __init__(
self, on_warning_callback: Optional[Callable] = None, **kwargs
) -> None:
super().__init__(**kwargs)
self.base_cmd = "test"
# as of now, on_warning_callback in docker executor does nothing
self.on_warning_callback = on_warning_callback
Comment thread
jlaneve marked this conversation as resolved.

def execute(self, context: Context):
return self.build_and_run_cmd(context=context)
Expand Down
8 changes: 6 additions & 2 deletions cosmos/providers/dbt/core/operators/kubernetes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import logging
from typing import Sequence
from typing import Callable, Optional, Sequence

import yaml
from airflow.utils.context import Context
Expand Down Expand Up @@ -147,9 +147,13 @@ class DbtTestKubernetesOperator(DbtKubernetesBaseOperator):

ui_color = "#8194E0"

def __init__(self, **kwargs) -> None:
def __init__(
self, on_warning_callback: Optional[Callable] = None, **kwargs
) -> None:
super().__init__(**kwargs)
self.base_cmd = "test"
# as of now, on_warning_callback in kubernetes executor does nothing
self.on_warning_callback = on_warning_callback
Comment thread
jlaneve marked this conversation as resolved.

def execute(self, context: Context):
return self.build_and_run_cmd(context=context)
Expand Down
72 changes: 65 additions & 7 deletions cosmos/providers/dbt/core/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@
import shutil
import signal
import tempfile
from typing import Sequence
from collections import namedtuple
from typing import Callable, Optional, Sequence

import yaml
from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.hooks.subprocess import SubprocessHook, SubprocessResult
from airflow.utils.context import Context

from cosmos.providers.dbt.core.operators.base import DbtBaseOperator
from cosmos.providers.dbt.core.utils.adapted_subprocesshook import (
FullOutputSubprocessHook,
)
from cosmos.providers.dbt.core.utils.warn_parsing import (
extract_log_issues,
parse_output,
)

logger = logging.getLogger(__name__)

FullOutputSubprocessResult = namedtuple(
"FullOutputSubprocessResult", ["exit_code", "output", "full_output"]
)


class DbtLocalBaseOperator(DbtBaseOperator):
"""
Expand All @@ -38,9 +49,9 @@ def __init__(
@cached_property
def subprocess_hook(self):
"""Returns hook for running the bash command."""
return SubprocessHook()
return FullOutputSubprocessHook()

def exception_handling(self, result: SubprocessResult):
def exception_handling(self, result: FullOutputSubprocessResult):
if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code:
raise AirflowSkipException(
f"dbt command returned exit code {self.skip_exit_code}. Skipping."
Expand All @@ -54,7 +65,7 @@ def run_command(
self,
cmd: list[str],
env: dict[str, str],
) -> SubprocessResult:
) -> FullOutputSubprocessResult:
"""
Copies the dbt project to a temporary directory and runs the command.
"""
Expand Down Expand Up @@ -88,7 +99,7 @@ def run_command(

def build_and_run_cmd(
self, context: Context, cmd_flags: list[str] | None = None
) -> SubprocessResult:
) -> FullOutputSubprocessResult:
dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags)
return self.run_command(cmd=dbt_cmd, env=env)

Expand Down Expand Up @@ -189,16 +200,63 @@ def execute(self, context: Context):
class DbtTestLocalOperator(DbtLocalBaseOperator):
"""
Executes a dbt core test command.
:param on_warning_callback: A callback function called on warnings with additional Context variables "test_names"
and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results".
"""

ui_color = "#8194E0"

def __init__(self, **kwargs) -> None:
def __init__(
self,
on_warning_callback: Optional[Callable] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.base_cmd = "test"
self.on_warning_callback = on_warning_callback

def _should_run_tests(
self,
result: FullOutputSubprocessResult,
no_tests_message: str = "Nothing to do",
) -> bool:
"""
Check if any tests are defined to run in the DAG. If tests are defined
and on_warning_callback is set, then function returns True.

:param result: The output from the build and run command.
"""

return self.on_warning_callback and no_tests_message not in result.output

def _handle_warnings(
self, result: FullOutputSubprocessResult, context: Context
) -> None:
"""
Handles warnings by extracting log issues, creating additional context, and calling the
on_warning_callback with the updated context.

:param result: The result object from the build and run command.
:param context: The original airflow context in which the build and run command was executed.
"""
test_names, test_results = extract_log_issues(result.full_output)

warning_context = dict(context)
warning_context["test_names"] = test_names
warning_context["test_results"] = test_results

self.on_warning_callback(warning_context)

def execute(self, context: Context):
result = self.build_and_run_cmd(context=context)

if not self._should_run_tests(result):
return result.output

warnings = parse_output(result, "WARN")
if warnings > 0:
self._handle_warnings(result, context)

return result.output


Expand Down
108 changes: 108 additions & 0 deletions cosmos/providers/dbt/core/utils/adapted_subprocesshook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# This hook has been refined from the Airflow SubprocessHook, offering an added functionality to the original.
# It presents an alternative option to return the complete command output, as opposed to solely the last line from
# stdout or stderr. This option proves to be highly beneficial for any text analysis that depends on the stdout or
# stderr output of a dbt command.
from __future__ import annotations

import contextlib
import os
import signal
from collections import namedtuple
from subprocess import PIPE, STDOUT, Popen
from tempfile import TemporaryDirectory, gettempdir

from airflow.hooks.base import BaseHook

FullOutputSubprocessResult = namedtuple(
"FullOutputSubprocessResult", ["exit_code", "output", "full_output"]
)


class FullOutputSubprocessHook(BaseHook):
"""Hook for running processes with the ``subprocess`` module."""

def __init__(self) -> None:
self.sub_process: Popen[bytes] | None = None
super().__init__()

def run_command(
self,
command: list[str],
env: dict[str, str] | None = None,
output_encoding: str = "utf-8",
cwd: str | None = None,
) -> FullOutputSubprocessResult:
"""
Execute the command.

If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards.
If ``env`` is not supplied, ``os.environ`` is passed

:param command: the command to run
:param env: Optional dict containing environment variables to be made available to the shell
environment in which ``command`` will be executed. If omitted, ``os.environ`` will be used.
Note, that in case you have Sentry configured, original variables from the environment
will also be passed to the subprocess with ``SUBPROCESS_`` prefix. See
:doc:`/administration-and-deployment/logging-monitoring/errors` for details.
:param output_encoding: encoding to use for decoding stdout
:param cwd: Working directory to run the command in.
If None (default), the command is run in a temporary directory.
:return: :class:`namedtuple` containing:
``exit_code``
``output``: the last line from stderr or stdout
``full_output``: all lines from stderr or stdout.
"""
self.log.info("Tmp dir root location: \n %s", gettempdir())
log_lines = []
with contextlib.ExitStack() as stack:
if cwd is None:
cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))

def pre_exec():
# Restore default signal disposition and invoke setsid
for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
if hasattr(signal, sig):
signal.signal(getattr(signal, sig), signal.SIG_DFL)
os.setsid()

self.log.info("Running command: %s", command)

self.sub_process = Popen(
command,
stdout=PIPE,
stderr=STDOUT,
cwd=cwd,
env=env if env or env == {} else os.environ,
preexec_fn=pre_exec,
)

self.log.info("Output:")
line = ""

if self.sub_process is None:
raise RuntimeError("The subprocess should be created here and is None!")
if self.sub_process.stdout is not None:
for raw_line in iter(self.sub_process.stdout.readline, b""):
line = raw_line.decode(
output_encoding, errors="backslashreplace"
).rstrip()
# storing the warn & error lines to be used later
log_lines.append(line)
self.log.info("%s", line)

self.sub_process.wait()

self.log.info(
"Command exited with return code %s", self.sub_process.returncode
)
return_code: int = self.sub_process.returncode

return FullOutputSubprocessResult(
exit_code=return_code, output=line, full_output=log_lines
)

def send_sigterm(self):
"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""
self.log.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
39 changes: 39 additions & 0 deletions cosmos/providers/dbt/core/utils/tests/test_warn_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from airflow.hooks.subprocess import SubprocessResult

from cosmos.providers.dbt.core.utils.warn_parsing import (
extract_log_issues,
parse_output,
)


def test_parse_output() -> None:
for warnings in range(0, 3):
output_str = f"Done. PASS=15 WARN={warnings} ERROR=0 SKIP=0 TOTAL=16"
keyword = "WARN"
result = SubprocessResult(exit_code=0, output=output_str)
num_warns = parse_output(result, keyword)
assert num_warns == warnings


def test_extract_log_issues() -> None:
log_list = [
"20:30:01 \x1b[33mRunning with dbt=1.3.0\x1b[0m",
"20:30:03 \x1b[33mFinished running 1 test in 10.31s.\x1b[0m",
"20:30:02 \x1b[33mWarning in test my_test (models/my_model.sql)\x1b[0m",
"20:30:02 \x1b[33mSome warning message\x1b[0m",
"20:30:03 \x1b[33mWarning in test my_second_test (models/my_model.sql)\x1b[0m",
"20:30:03 \x1b[33mA very different warning message\x1b[0m",
]
test_names, test_results = extract_log_issues(log_list)
assert "my_test" in test_names
assert "my_second_test" in test_names
assert "Some warning message" in test_results
assert "A very different warning message" in test_results

log_list_no_warning = [
"20:30:01 \x1b[33mRunning with dbt=1.3.0\x1b[0m",
"20:30:03 \x1b[33mFinished running 1 test in 10.31s.\x1b[0m",
]
test_names_no_warns, test_results_no_warns = extract_log_issues(log_list_no_warning)
assert test_names_no_warns == []
assert test_results_no_warns == []
71 changes: 71 additions & 0 deletions cosmos/providers/dbt/core/utils/warn_parsing.py
Comment thread
CorsettiS marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import logging
import re
from typing import List, Tuple

from airflow.hooks.subprocess import SubprocessResult


def parse_output(result: SubprocessResult, keyword: str) -> int:
"""
Parses the DBT test output message and returns the number of errors or warnings.

:param result: String containing the output to be parsed.
:param keyword: String representing the keyword to search for in the output (WARN, ERROR).
:return: An integer value associated with the keyword, or 0 if parsing fails.

Usage:
-----
output_str = "Done. PASS=15 WARN=1 ERROR=0 SKIP=0 TOTAL=16"
keyword = "WARN"
num_warns = parse_output(output_str, keyword)
print(num_warns)
# Output: 1
"""
output = result.output
try:
num = int(output.split(f"{keyword}=")[1].split()[0])
except ValueError:
logging.error(
f"Could not parse number of {keyword}s. Check your dbt/airflow version or if --quiet is not being used"
)
return num


def extract_log_issues(log_list: List[str]) -> Tuple[List[str], List[str]]:
"""
Extracts warning messages from the log list and returns them as a formatted string.

This function searches for warning messages in DBT test. It reverses the log list for performance
improvement. It extracts and formats the relevant information and appends it to a list of warnings.

:param log_list: List of strings, where each string is a log line from DBT test.
:return: two lists of strings, the first one containing the test names and the second one
containing the test results.
"""

def clean_line(line: str) -> str:
return line.replace("\x1b[33m", "").replace("\x1b[0m", "").strip()

test_names = []
test_results = []
pattern1 = re.compile(r"\d{2}:\d{2}:\d{2}\s+Warning in test ([\w_]+).*")
pattern2 = re.compile(r"\d{2}:\d{2}:\d{2}\s+(.*)")

for line_index, line in enumerate(reversed(log_list)):
cleaned_line = clean_line(line)

if "Finished running" in cleaned_line:
# No need to keep checking the log lines once all warnings are found
break

if "Warning in test" in cleaned_line:
test_name = pattern1.sub(r"\1", cleaned_line)
# test_result is on the next line by default
test_result = pattern2.sub(
r"\1", clean_line(log_list[-(line_index + 1) + 1])
)

test_names.append(test_name)
test_results.append(test_result)

return test_names, test_results
Comment thread
jlaneve marked this conversation as resolved.
Loading