Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0
AIRFLOW_HOME: ${{ github.workspace }}
CONFIG_ROOT_DIR: ${{ github.workspace }}/dags
PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/examples:$PYTHONPATH
PYTHONPATH: ${{ github.workspace }}:${{ github.workspace }}/dev/dags:$PYTHONPATH
AUTO_CONVERT_TO_AF3: true

- name: Upload coverage to Github
Expand Down
22 changes: 16 additions & 6 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from __future__ import annotations

import ast

# pylint: disable=ungrouped-imports
import inspect
import os
import re
Expand All @@ -14,8 +12,16 @@
from functools import partial, reduce
from typing import Any, Callable, Dict, List, Tuple, Union

from airflow import DAG, configuration
from airflow.models import BaseOperator, Variable
from airflow import configuration

try:
from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.variable import Variable
except ImportError:
from airflow.models import BaseOperator, Variable
from airflow.models.dag import DAG

from airflow.utils.module_loading import import_string
from airflow.version import version as AIRFLOW_VERSION
from packaging import version
Expand Down Expand Up @@ -1069,8 +1075,12 @@ def adjust_general_task_params(task_params: dict(str, Any)):
if utils.check_dict_key(task_params, "variables_as_arguments"):
variables: List[Dict[str, str]] = task_params.get("variables_as_arguments")
for variable in variables:
if Variable.get(variable["variable"], default_var=None) is not None:
task_params[variable["attribute"]] = Variable.get(variable["variable"], default_var=None)
default_argument_name = "default"
if INSTALLED_AIRFLOW_VERSION.major < AIRFLOW3_MAJOR_VERSION:
default_argument_name = "default_var"
variable_value = Variable.get(variable["variable"], **{default_argument_name: None})
if variable_value is not None:
task_params[variable["attribute"]] = variable_value
del task_params["variables_as_arguments"]

if version.parse(AIRFLOW_VERSION) >= version.parse("2.4.0"):
Expand Down
6 changes: 5 additions & 1 deletion dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

import yaml
from airflow.configuration import conf as airflow_conf
from airflow.models import DAG

try:
from airflow.sdk.definitions.dag import DAG
except ImportError:
from airflow.models import DAG
from airflow.version import version as AIRFLOW_VERSION
from packaging import version

Expand Down
6 changes: 5 additions & 1 deletion dagfactory/listeners/runtime_event.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from __future__ import annotations

from airflow.listeners import hookimpl
from airflow.models.dag import DAG

try:
from airflow.sdk.definitions.dag import DAG
except ImportError:
from airflow.models import DAG
from airflow.models.dagrun import DagRun

from dagfactory import telemetry
Expand Down
36 changes: 31 additions & 5 deletions scripts/test/pre-install-airflow.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/bin/bash

set -v
set -x
set -e

AIRFLOW_VERSION="$1"
PYTHON_VERSION="$2"

Expand All @@ -13,13 +17,35 @@ fi

echo "${VIRTUAL_ENV}"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt"
curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt
if [ "$AIRFLOW_VERSION" = "3.0" ] ; then
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.2/constraints-$PYTHON_VERSION.txt"
else
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt"
fi;

curl -sSL "$CONSTRAINT_URL" -o /tmp/constraint.txt
# Workaround to remove PyYAML constraint that will work on both Linux and MacOS
sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp
mv /tmp/constraint.txt.tmp /tmp/constraint.txt
# Install Airflow with constraints
uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt

uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt
pip install uv
uv pip install pip --upgrade


if [ "$AIRFLOW_VERSION" = "3.0" ]; then
uv pip install "apache-airflow>=3.0.2" --constraint /tmp/constraint.txt
else
uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt
uv pip install apache-airflow-providers-cncf-kubernetes --constraint /tmp/constraint.txt
fi;

rm /tmp/constraint.txt

actual_version=$(airflow version | cut -d. -f1,2)

if [ "$actual_version" = $AIRFLOW_VERSION ]; then
Comment thread
pankajastro marked this conversation as resolved.
echo "Version is as expected: $AIRFLOW_VERSION"
else
echo "Version does not match. Expected: $AIRFLOW_VERSION, but got: $actual_version"
exit 1
fi
3 changes: 2 additions & 1 deletion tests/test_dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import pytest

try:
from airflow.sdk.definitions import DAG
from airflow.sdk.definitions.dag import DAG
except ImportError:
from airflow.models import DAG

import yaml
from airflow.providers.common.sql.sensors.sql import SqlSensor
from airflow.providers.http.sensors.http import HttpSensor
Expand Down
10 changes: 7 additions & 3 deletions tests/test_dagbuilder_httpoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

import pendulum
import pytest
from airflow import DAG

try:
from airflow.sdk.definitions.dag import DAG # noqa: F401
except ImportError:
from airflow.models.dag import DAG # noqa: F401

from dagfactory.dagbuilder import DagBuilder
from dagfactory.exceptions import DagFactoryException
Expand Down Expand Up @@ -47,14 +51,14 @@
"concurrency": 1,
"max_active_runs": 1,
"dagrun_timeout_sec": 600,
"schedule_interval": "0 1 * * *",
get_schedule_key(): "0 1 * * *",
}

# Basic DAG config for tests
DAG_CONFIG = {
"default_args": {"owner": "custom_owner"},
"description": "this is an example dag",
"schedule_interval": "0 3 * * *",
get_schedule_key(): "0 3 * * *",
}


Expand Down
10 changes: 4 additions & 6 deletions tests/test_dagfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from airflow.version import version as AIRFLOW_VERSION

try:
from airflow.sdk.definitions.variable import Variable
from airflow.sdk.definitions.variable import Variable # noqa: F401
except ImportError:
from airflow.models.variable import Variable
from airflow.models.variable import Variable # noqa: F401

from packaging import version

from tests.utils import get_bash_operator_path, get_schedule_key
Expand Down Expand Up @@ -350,10 +351,7 @@ def test_kubernetes_pod_operator_dag_lt_2_7():
def test_variables_as_arguments_dag(monkeypatch):
monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true")
override_command = "value_from_variable"
if version.parse(AIRFLOW_VERSION) >= version.parse("1.10.10"):
os.environ["AIRFLOW_VAR_VAR1"] = override_command
else:
Variable.set("var1", override_command)
os.environ["AIRFLOW_VAR_VAR1"] = override_command
td = dagfactory.DagFactory(DAG_FACTORY_VARIABLES_AS_ARGUMENTS)
td.generate_dags(globals())
tasks = globals()["example_dag"].tasks
Expand Down
6 changes: 5 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import yaml
from airflow.configuration import secrets_backend_list
from airflow.exceptions import AirflowSkipException
from airflow.models.dag import DAG

try:
from airflow.sdk.definitions.dag import DAG
except ImportError:
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.secrets.local_filesystem import LocalFilesystemBackend
Expand Down