Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
54602e8
chore: Update CI versions
tomasfarias Oct 27, 2025
cdfdddb
fix: Specify versions required for Airflow 3.1
tomasfarias Oct 28, 2025
0363fc9
fix: Update import paths according to Airflow versions
tomasfarias Oct 28, 2025
9d868ac
fix: Do not set connection value when empty string
tomasfarias Oct 28, 2025
a945e21
fix: Handle xcom push differently in Airflow>=3.1
tomasfarias Oct 28, 2025
769b4a4
fix(examples): Update import path for compatibility
tomasfarias Oct 28, 2025
9014645
fix: Update fixtures
tomasfarias Oct 28, 2025
03dbeeb
fix: Update tests to use new connections
tomasfarias Oct 28, 2025
effdac0
fix: Update DAG tests to deal with Airflow database
tomasfarias Oct 28, 2025
c6b9dca
fix: Update dependency versions
tomasfarias Oct 28, 2025
fc03d50
fix: Pin structlog version as Airflow>=3.1 crashes
tomasfarias Oct 29, 2025
d21bbc8
fix: Clean up session usage in tests
tomasfarias Oct 29, 2025
41389a0
fix: Update CI versions
tomasfarias Oct 29, 2025
ff59c96
fix: Drop python 3.9 support due to dulwich incompatibility
tomasfarias Oct 29, 2025
8f34a52
fix: Ignore type
tomasfarias Oct 29, 2025
1518a9e
fix: Adjust fixture scope
tomasfarias Oct 29, 2025
6193268
fix: Conditionally import FSHook
tomasfarias Oct 29, 2025
0b526a5
fix: Run git server in thread
tomasfarias Oct 29, 2025
f8ee9a7
fix: Remove unused import
tomasfarias Oct 29, 2025
236f463
fix: Ignore a bunch of types
tomasfarias Oct 29, 2025
ae9ff67
fix: Only sync dags in Airflow 3.1 or later
tomasfarias Oct 29, 2025
03a2fce
fix: Also use TI for xcom_push in Airflow 3.0
tomasfarias Oct 29, 2025
a22e9ab
fix: Attempt to use dag.test() for Airflow 3.0 too
tomasfarias Oct 29, 2025
1b5344f
fix: Also use DAG in taskflow test in Airflow 3.0
tomasfarias Oct 29, 2025
83cfdff
fix: Attempt to also sync DAG in Airflow 3.0
tomasfarias Oct 29, 2025
a578b2f
fix: Just run Airflow 3.0 tests like before
tomasfarias Oct 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,28 @@ jobs:
fail-fast: false
matrix:
python-version:
- '3.13'
- '3.12'
- '3.11'
- '3.10'
- '3.9'
airflow-version:
- '3.0'
- '2.10'
- '2.9'
- '3.1.1'
- '3.0.6'
- '2.10.4'
dbt-version:
- '1.10.9'
- '1.9.9'
- '1.10.13'
- '1.9.10'
exclude:
# Airflow added 3.13 support in >=3.1
- airflow-version: '3.0.6'
python-version: '3.13'

- airflow-version: '2.10.4'
python-version: '3.13'

# Dbt added 3.13 support in >=1.10
- dbt-version: '1.9.10'
python-version: '3.13'

runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -83,8 +94,8 @@ jobs:

- name: Install Airflow & dbt
run: |
uv add "apache-airflow~=${{ matrix.airflow-version }}.0" \
"dbt-core~=${{ matrix.dbt-version }}"
uv add "apache-airflow==${{ matrix.airflow-version }}; python_version=='${{ matrix.python-version }}'" \
"dbt-core==${{ matrix.dbt-version }}"
uv sync --all-extras --dev
uv run airflow db migrate
uv run airflow connections create-default-connections
Expand All @@ -94,8 +105,7 @@ jobs:

- name: Static type checking with mypy
# We only run mypy on the latest supported versions of Airflow & dbt,
# so it is currently impossible to write conditions for that depend on package versions.
if: matrix.python-version == '3.12' && matrix.airflow-version == '2.10' && matrix.dbt-version == '1.9.9'
if: matrix.python-version == '3.13' && matrix.airflow-version == '3.1.1' && matrix.dbt-version == '1.10.13'
run: uv run mypy .

- name: Code formatting with ruff
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pypi_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ jobs:
- uses: actions/[email protected]
- uses: actions/[email protected]
with:
python-version: '3.12'
python-version: '3.13'

- name: Install uv and set the python version
uses: astral-sh/setup-uv@v5
with:
version: 0.7.2
python-version: 3.12
python-version: 3.13

- name: Install airflow-dbt-python with uv
run: uv sync --no-dev
Expand Down
11 changes: 5 additions & 6 deletions airflow_dbt_python/hooks/fs/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ def update_refs(refs):
return new_refs

client.send_pack(
path,
path.encode("utf-8"),
update_refs,
generate_pack_data=repo.generate_pack_data,
generate_pack_data=repo.generate_pack_data, # type: ignore
)

def _download(
Expand Down Expand Up @@ -160,8 +160,7 @@ def _download(
path,
str(destination),
mkdir=not destination.exists(),
# NOTE: Dulwich expects branch to be bytes if defined.
branch=branch.encode("utf-8") if isinstance(branch, str) else branch,
branch=branch,
)

def get_git_client_path(self, url: URL) -> Tuple[GitClients, str, Optional[str]]:
Expand Down Expand Up @@ -191,7 +190,7 @@ def get_git_client_path(self, url: URL) -> Tuple[GitClients, str, Optional[str]]
host=url.hostname,
port=self.port,
username=url.username or self.username,
vendor=vendor,
vendor=vendor, # type: ignore
)
path = f"{url.netloc.split(':')[1]}/{str(url.path)}"

Expand All @@ -211,7 +210,7 @@ def get_git_client_path(self, url: URL) -> Tuple[GitClients, str, Optional[str]]
elif url.authentication.username:
base_url = f"{url.scheme}://{url.authentication.username}@{base_url}"

client = HttpGitClient(base_url, **auth_params)
client = HttpGitClient(base_url, **auth_params) # type: ignore

path = str(url.path)

Expand Down
7 changes: 6 additions & 1 deletion airflow_dbt_python/hooks/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from pathlib import Path
from typing import Optional

from airflow.hooks.filesystem import FSHook
from airflow_dbt_python.utils.version import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.hooks.filesystem import FSHook
else:
from airflow.hooks.filesystem import FSHook # type: ignore

from airflow_dbt_python.hooks.fs import DbtFSHook
from airflow_dbt_python.utils.url import URL
Expand Down
5 changes: 2 additions & 3 deletions airflow_dbt_python/hooks/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
Union,
)

from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection
from airflow.providers.common.compat.sdk import BaseHook, Connection


def try_decode_base64(s: str) -> str:
Expand Down Expand Up @@ -260,7 +259,7 @@ def get_dbt_details_from_connection(self, conn: Connection) -> dict[str, Any]:
key = param
value = getattr(conn, key, None)

if value is None:
if value is None or value == "":
continue

dbt_details[key] = value
Expand Down
12 changes: 9 additions & 3 deletions airflow_dbt_python/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@
from __future__ import annotations

import datetime as dt
import functools
import os
from abc import ABC, abstractmethod
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Union

from airflow.exceptions import AirflowException
from airflow.models.baseoperator import BaseOperator
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.providers.common.compat.sdk import BaseOperator

from airflow_dbt_python.utils.enums import LogFormat
from airflow_dbt_python.utils.version import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from dbt.contracts.results import RunResult
Expand Down Expand Up @@ -344,10 +346,14 @@ def xcom_push_dbt_results(self, context, dbt_results: DbtTaskResult) -> None:
serializable_result = self.make_run_results_serializable(
dbt_results.run_results
)
self.xcom_push(context, key=XCOM_RETURN_KEY, value=serializable_result)
if AIRFLOW_V_3_0_PLUS:
xcom_push = context["ti"].xcom_push
else:
xcom_push = functools.partial(self.xcom_push, context) # type: ignore

xcom_push(key=XCOM_RETURN_KEY, value=serializable_result)
for key, artifact in dbt_results.artifacts.items():
self.xcom_push(context, key=key, value=artifact)
xcom_push(key=key, value=artifact)

def make_run_results_serializable(
self, result: Optional[RunResult]
Expand Down
2 changes: 1 addition & 1 deletion airflow_dbt_python/utils/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def create_dbt_task(
and runtime_config
and not DBT_INSTALLED_GTE_1_10_7
):
manifest = parse_manifest(
manifest = parse_manifest( # type: ignore
runtime_config,
write_perf_info=write_perf_info,
write=False,
Expand Down
13 changes: 13 additions & 0 deletions airflow_dbt_python/utils/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,16 @@

DBT_INSTALLED_1_8 = DBT_1_8 < installed < DBT_1_9
DBT_INSTALLED_1_9 = DBT_1_9 < installed < DBT_2_0


def _get_base_airflow_version_tuple() -> tuple[int, int, int]:
from airflow import __version__
from packaging.version import Version

airflow_version = Version(__version__)
return airflow_version.major, airflow_version.minor, airflow_version.micro


AIRFLOW_V_3_0_PLUS = _get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = _get_base_airflow_version_tuple() >= (3, 1, 0)
AIRFLOW_V_3_0 = AIRFLOW_V_3_0_PLUS and not AIRFLOW_V_3_1_PLUS
5 changes: 4 additions & 1 deletion examples/airflow_connection_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import json

import pendulum
from airflow import DAG, settings
from airflow import settings
from airflow.models.connection import Connection

# from airflow.sdk import DAG in >=3.1
from airflow.providers.common.compat.sdk import DAG

from airflow_dbt_python.operators.dbt import DbtRunOperator

session = settings.Session() # type: ignore
Expand Down
4 changes: 3 additions & 1 deletion examples/basic_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import datetime as dt

import pendulum
from airflow import DAG

# from airflow.sdk import DAG in >=3.1
from airflow.providers.common.compat.sdk import DAG

from airflow_dbt_python.operators.dbt import DbtRunOperator

Expand Down
4 changes: 3 additions & 1 deletion examples/complete_dbt_workflow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import datetime as dt

import pendulum
from airflow import DAG

# from airflow.sdk import DAG in >=3.1
from airflow.providers.common.compat.sdk import DAG

from airflow_dbt_python.operators.dbt import (
DbtRunOperator,
Expand Down
4 changes: 3 additions & 1 deletion examples/dbt_project_in_github_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import datetime as dt

import pendulum
from airflow import DAG

# from airflow.sdk import DAG in >=3.1
from airflow.providers.common.compat.sdk import DAG

from airflow_dbt_python.operators.dbt import (
DbtRunOperator,
Expand Down
4 changes: 3 additions & 1 deletion examples/dbt_project_in_s3_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import datetime as dt

import pendulum
from airflow import DAG

# from airflow.sdk import DAG in >=3.1
from airflow.providers.common.compat.sdk import DAG

from airflow_dbt_python.operators.dbt import DbtDocsGenerateOperator, DbtRunOperator

Expand Down
4 changes: 3 additions & 1 deletion examples/readme_example_dbt_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import datetime as dt

import pendulum
from airflow import DAG

# from airflow.sdk import DAG in >=3.1
from airflow.providers.common.compat.sdk import DAG

from airflow_dbt_python.operators.dbt import (
DbtRunOperator,
Expand Down
4 changes: 3 additions & 1 deletion examples/use_dbt_artifacts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import datetime as dt

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator

# from airflow.sdk import DAG in >=3.1
from airflow.providers.common.compat.sdk import DAG

from airflow_dbt_python.operators.dbt import DbtRunOperator


Expand Down
13 changes: 9 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@ description = "A collection of Airflow operators, hooks, and utilities to execut
authors = [{ name = "Tomás Farías Santana", email = "[email protected]" }]
license = "MIT"
readme = "README.md"
requires-python = ">=3.9,<3.13"
requires-python = ">=3.10,<3.14"
classifiers = [
"Development Status :: 5 - Production/Stable",

"Intended Audience :: Developers",

"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",

]

dependencies = [
"apache-airflow>=2.8",
"apache-airflow>=2.8,<4.0; python_version<'3.12'",
"apache-airflow>=3.0,<4.0; python_version>='3.12' and python_version<'3.13'",
"apache-airflow>=3.1,<4.0; python_version>='3.13'",
"contextlib-chdir==1.0.2;python_version<'3.11'",
"dbt-core>=1.8.0,<2.0.0",
]
Expand Down Expand Up @@ -48,7 +51,7 @@ gcs = [
]
git = [
"apache-airflow-providers-ssh>=3.0.0",
"dulwich>=0.21",
"dulwich>=0.24.3",
]
postgres = [
"dbt-postgres>=1.8.0,<2.0.0",
Expand Down Expand Up @@ -85,6 +88,8 @@ dev = [
"pytest-mock>=3.14.0",
"pytest-postgresql>=5",
"ruff>=0.0.254",
# Pinned as Airflow crashes: https://github.com/apache/airflow/issues/57426.
"structlog<=25.4",
"types-PyYAML>=6.0.7",
"types-freezegun>=1.1.6",
]
Expand Down
Loading
Loading