Skip to content
Closed
24 changes: 20 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,33 @@ jobs:
fail-fast: false
matrix:
python-version:
- '3.13'
- '3.12'
- '3.11'
- '3.10'
- '3.9'
airflow-version:
- '3.1'
- '3.0'
- '2.10'
- '2.9'
dbt-version:
- '1.10.9'
- '1.9.9'
- '1.10.13'
- '1.9.10'
exclude:
# Airflow added 3.13 support in >=3.1
- airflow-version: '3.0'
python-version: '3.13'

- airflow-version: '2.10'
python-version: '3.13'

# Airflow dropped support for Python 3.9 in >=3.1
- airflow-version: '3.1'
python-version: '3.9'

# Dbt added 3.13 support in >=1.10
- dbt-version: '1.9.10'
python-version: '3.13'

runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -83,7 +99,7 @@ jobs:

- name: Install Airflow & dbt
run: |
uv add "apache-airflow~=${{ matrix.airflow-version }}.0" \
uv add "apache-airflow~=${{ matrix.airflow-version }}.0; python_version>='${{ matrix.python-version }}'" \
"dbt-core~=${{ matrix.dbt-version }}"
uv sync --all-extras --dev
uv run airflow db migrate
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pypi_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ jobs:
- uses: actions/[email protected]
- uses: actions/[email protected]
with:
python-version: '3.12'
python-version: '3.13'

- name: Install uv and set the python version
uses: astral-sh/setup-uv@v5
with:
version: 0.7.2
python-version: 3.12
python-version: 3.13

- name: Install airflow-dbt-python with uv
run: uv sync --no-dev
Expand Down
7 changes: 6 additions & 1 deletion airflow_dbt_python/hooks/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
from pathlib import Path
from typing import Optional

from airflow.hooks.filesystem import FSHook
from airflow_dbt_python.utils.version import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.providers.standard.hooks.filesystem import FSHook
else:
from airflow.hooks.filesystem import FSHook

from airflow_dbt_python.hooks.fs import DbtFSHook
from airflow_dbt_python.utils.url import URL
Expand Down
8 changes: 7 additions & 1 deletion airflow_dbt_python/hooks/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
Union,
)

from airflow.hooks.base import BaseHook
from airflow_dbt_python.utils.version import AIRFLOW_V_3_1_PLUS

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk.bases.hook import BaseHook
else:
from airflow.hooks.base import BaseHook

from airflow.models.connection import Connection


Expand Down
7 changes: 6 additions & 1 deletion airflow_dbt_python/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Union

from airflow.exceptions import AirflowException
from airflow.models.baseoperator import BaseOperator
from airflow.models.xcom import XCOM_RETURN_KEY

from airflow_dbt_python.utils.enums import LogFormat
from airflow_dbt_python.utils.version import AIRFLOW_V_3_1_PLUS

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import BaseOperator
else:
from airflow.models import BaseOperator

if TYPE_CHECKING:
from dbt.contracts.results import RunResult
Expand Down
12 changes: 12 additions & 0 deletions airflow_dbt_python/utils/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@

DBT_INSTALLED_1_8 = DBT_1_8 < installed < DBT_1_9
DBT_INSTALLED_1_9 = DBT_1_9 < installed < DBT_2_0


def _get_base_airflow_version_tuple() -> tuple[int, int, int]:
from airflow import __version__
from packaging.version import Version

airflow_version = Version(__version__)
return airflow_version.major, airflow_version.minor, airflow_version.micro


AIRFLOW_V_3_0_PLUS = _get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = _get_base_airflow_version_tuple() >= (3, 1, 0)
8 changes: 7 additions & 1 deletion examples/airflow_connection_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
import json

import pendulum
from airflow import DAG, settings
from airflow import settings
from airflow.models.connection import Connection

from airflow_dbt_python.operators.dbt import DbtRunOperator
from airflow_dbt_python.utils.version import AIRFLOW_V_3_1_PLUS

if AIRFLOW_V_3_1_PLUS:
from airflow.sdk import DAG
else:
from airflow import DAG

session = settings.Session() # type: ignore
existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "A collection of Airflow operators, hooks, and utilities to execut
authors = [{ name = "Tomás Farías Santana", email = "[email protected]" }]
license = "MIT"
readme = "README.md"
requires-python = ">=3.9,<3.13"
requires-python = ">=3.9,<3.14"
classifiers = [
"Development Status :: 5 - Production/Stable",

Expand All @@ -18,7 +18,9 @@ classifiers = [
]

dependencies = [
"apache-airflow>=2.8",
"apache-airflow>=2.8,<3.1;python_version=='3.9'",
"apache-airflow>=2.8;python_version>='3.10'",
"apache-airflow~=3.1.0 ; python_full_version == '3.12.*'",
"contextlib-chdir==1.0.2;python_version<'3.11'",
"dbt-core>=1.8.0,<2.0.0",
]
Expand Down
22 changes: 17 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@

import boto3
import pytest
from airflow import settings
from airflow.models.connection import Connection
from mockgcp.storage.client import MockClient as MockStorageClient
from moto import mock_aws
from pytest_postgresql.janitor import DatabaseJanitor

from airflow_dbt_python.hooks.dbt import DbtHook

if TYPE_CHECKING:
from _pytest.fixtures import SubRequest

Expand Down Expand Up @@ -218,6 +214,9 @@ def airflow_conns(database):
We create them by setting AIRFLOW_CONN_{CONN_ID} env variables. Only postgres
connections are set for now as our testing database is postgres.
"""
from airflow import settings
from airflow.models.connection import Connection

uris = (
f"postgres://{database.user}:{database.password}@{database.host}:{database.port}/public?dbname={database.dbname}",
)
Expand Down Expand Up @@ -276,6 +275,9 @@ def private_key() -> tuple[str, str]:
@pytest.fixture
def profile_conn_id(request: SubRequest) -> Generator[str, None, None]:
"""Create an Airflow connection by conn_id."""
from airflow import settings
from airflow.models.connection import Connection

conn_id = request.param
session = settings.Session()
existing = session.query(Connection).filter_by(conn_id=conn_id).first()
Expand Down Expand Up @@ -430,6 +432,8 @@ def s3_bucket(mocked_s3_res, s3_hook):
@pytest.fixture
def gcp_conn_id():
"""Provide a GCS connection for testing."""
from airflow import settings
from airflow.models.connection import Connection
from airflow.providers.google.cloud.hooks.gcs import GCSHook

conn_id = GCSHook.default_conn_name
Expand Down Expand Up @@ -535,11 +539,19 @@ def packages_file(dbt_project_file):
@pytest.fixture
def hook():
"""Provide a DbtHook."""
from airflow_dbt_python.hooks.dbt import DbtHook

return DbtHook()


@pytest.fixture
def pre_compile(hook, model_files, seed_files, dbt_project_file, profiles_file):
def pre_compile(
hook,
model_files,
seed_files,
dbt_project_file,
profiles_file,
):
"""Fixture to run a dbt compile task."""
target_dir = dbt_project_file.parent / "target"

Expand Down
18 changes: 16 additions & 2 deletions tests/dags/test_dbt_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@
from dbt.contracts.results import RunStatus, TestStatus

airflow = pytest.importorskip("airflow", minversion="2.2")
from airflow_dbt_python.utils.version import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import DAG
else:
from airflow import DAG

from airflow import DAG, settings
from airflow import __version__ as airflow_version
from airflow import settings
from airflow.models import DagBag, DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.state import DagRunState, TaskInstanceState
Expand All @@ -25,6 +31,7 @@
DbtSourceFreshnessOperator,
DbtTestOperator,
)
from airflow_dbt_python.utils.version import AIRFLOW_V_3_1_PLUS

DATA_INTERVAL_START = pendulum.datetime(2022, 1, 1, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + dt.timedelta(hours=1)
Expand All @@ -39,7 +46,14 @@ def _create_dagrun(
start_date: dt.datetime,
run_type: DagRunType,
) -> DagRun:
if AIRFLOW_MAJOR_VERSION >= 3:
print(AIRFLOW_V_3_1_PLUS)
print(airflow_version)
if AIRFLOW_V_3_1_PLUS:
return parent_dag.test(
logical_date=logical_date,
run_after=dt.datetime(1970, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc),
)
elif AIRFLOW_MAJOR_VERSION >= 3:
from airflow.utils.types import DagRunTriggeredByType # type: ignore

return parent_dag.create_dagrun( # type: ignore
Expand Down
Loading
Loading