Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ on: # yamllint disable-line rule:truthy
description: "Whether to run only latest version checks (true/false)"
required: true
type: string
enable-aip-44:
description: "Whether to enable AIP-44 (true/false)"
required: true
type: string
env:
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
jobs:
run-breeze-tests:
timeout-minutes: 10
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ jobs:
skip-pre-commits: ${{needs.build-info.outputs.skip-pre-commits}}
canary-run: ${{needs.build-info.outputs.canary-run}}
latest-versions-only: ${{needs.build-info.outputs.latest-versions-only}}
enable-aip-44: "false"

build-ci-images:
name: >
Expand Down
9 changes: 4 additions & 5 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ on: # yamllint disable-line rule:truthy
required: false
default: "false"
type: string
enable-aip-44:
description: "Whether to enable AIP-44 or not (true/false)"
database-isolation:
description: "Whether to enable database isolattion or not (true/false)"
required: false
default: "true"
default: "false"
type: string
force-lowest-dependencies:
description: "Whether to force lowest dependencies for the tests or not (true/false)"
Expand All @@ -129,8 +129,6 @@ jobs:
backend-version: "${{fromJSON(inputs.backend-versions)}}"
exclude: "${{fromJSON(inputs.excludes)}}"
env:
# yamllint disable rule:line-length
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
BACKEND: "${{ inputs.backend }}"
BACKEND_VERSION: "${{ matrix.backend-version }}"
DB_RESET: "true"
Expand All @@ -152,6 +150,7 @@ jobs:
PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
UPGRADE_BOTO: "${{ inputs.upgrade-boto }}"
AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}"
DATABASE_ISOLATION: "${{ inputs.database-isolation }}"
VERBOSE: "true"
steps:
- name: "Cleanup repo"
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/special-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,18 @@ jobs:
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-in-progress-disabled:
name: "In progress disabled test"
tests-database-isolation:
name: "Database isolation test"
uses: ./.github/workflows/run-unit-tests.yml
permissions:
contents: read
packages: read
secrets: inherit
with:
runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
enable-aip-44: "false"
test-name: "InProgressDisabled-Postgres"
test-scope: "All"
database-isolation: "true"
test-name: "DatabaseIsolation-Postgres"
test-scope: "DB"
backend: "postgres"
image-tag: ${{ inputs.image-tag }}
python-versions: "['${{ inputs.default-python-version }}']"
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,20 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
except Exception:
return log_and_build_error_response(message="Error deserializing parameters.", status=400)

log.info("Calling method %s\nparams: %s", method_name, params)
log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output, use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else None
log.info("Sending response: %s", response)
log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
# In case of AirflowException or other selective known types, transport the exception class back to caller
except (KeyError, AttributeError, AirflowException) as e:
exception_json = BaseSerialization.serialize(e, use_pydantic_models=True)
response = json.dumps(exception_json)
log.info("Sending exception response: %s", response)
log.debug("Sending exception response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500)
6 changes: 1 addition & 5 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.settings import _ENABLE_AIP_44, force_traceback_session_for_untrusted_components
from airflow.settings import force_traceback_session_for_untrusted_components
from airflow.typing_compat import ParamSpec
from airflow.utils.jwt_signer import JWTSigner

Expand All @@ -55,8 +55,6 @@ def set_use_database_access(component: str):
This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server
"""
InternalApiConfig._use_internal_api = False
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
logger.info(
"DB isolation mode. But this is a trusted component and DB connection is set. "
"Using database direct access when running %s.",
Expand All @@ -65,8 +63,6 @@ def set_use_database_access(component: str):

@staticmethod
def set_use_internal_api(component: str, allow_tests_to_use_db: bool = False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
internal_api_url = conf.get("core", "internal_api_url")
url_conf = urlparse(internal_api_url)
api_path = url_conf.path
Expand Down
51 changes: 23 additions & 28 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
from airflow.utils.state import DagRunState, JobState
Expand Down Expand Up @@ -2080,34 +2079,30 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
args=(),
),
]

if _ENABLE_AIP_44:
core_commands.append(
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
ActionCommand(
name="internal-api",
help="Start a Airflow Internal API instance",
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
args=(
ARG_INTERNAL_API_PORT,
ARG_INTERNAL_API_WORKERS,
ARG_INTERNAL_API_WORKERCLASS,
ARG_INTERNAL_API_WORKER_TIMEOUT,
ARG_INTERNAL_API_HOSTNAME,
ARG_PID,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_INTERNAL_API_ACCESS_LOGFILE,
ARG_INTERNAL_API_ERROR_LOGFILE,
ARG_INTERNAL_API_ACCESS_LOGFORMAT,
ARG_LOG_FILE,
ARG_SSL_CERT,
ARG_SSL_KEY,
ARG_DEBUG,
),
)
),
]


def _remove_dag_id_opt(command: ActionCommand):
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,10 +1512,11 @@ def run(
data_interval=info.data_interval,
)
ti = TaskInstance(self, run_id=dr.run_id)
session.add(ti)
ti.dag_run = dr
session.add(dr)
session.flush()

session.commit()
ti.run(
mark_success=mark_success,
ignore_depends_on_past=ignore_depends_on_past,
Expand Down
5 changes: 2 additions & 3 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
from airflow.operators.branch import BranchMixIn
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import Literal
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge
Expand Down Expand Up @@ -552,8 +551,8 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
self._write_args(input_path)
self._write_string_args(string_args_path)

if self.use_airflow_context and (not is_pydantic_2_installed() or not _ENABLE_AIP_44):
error_msg = "`get_current_context()` needs to be used with Pydantic 2 and AIP-44 enabled."
if self.use_airflow_context and not is_pydantic_2_installed():
error_msg = "`get_current_context()` needs to be used with Pydantic 2."
raise AirflowException(error_msg)

jinja_context = {
Expand Down
16 changes: 3 additions & 13 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.pydantic.tasklog import LogTemplatePydantic
from airflow.serialization.pydantic.trigger import TriggerPydantic
from airflow.settings import _ENABLE_AIP_44, DAGS_FOLDER, json
from airflow.settings import DAGS_FOLDER, json
from airflow.task.priority_strategy import (
PriorityWeightStrategy,
airflow_priority_weight_strategies,
Expand Down Expand Up @@ -627,11 +627,6 @@ def serialize(

:meta private:
"""
if use_pydantic_models and not _ENABLE_AIP_44:
raise RuntimeError(
"Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
"This parameter will be removed eventually when new serialization is used by AIP-44"
)
if cls._is_primitive(var):
# enum.IntEnum is an int instance, it causes json dumps error so we use its value.
if isinstance(var, enum.Enum):
Expand Down Expand Up @@ -758,7 +753,7 @@ def serialize(
obj = cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models)
d[str(k)] = obj
return cls._encode(d, type_=DAT.TASK_CONTEXT)
elif use_pydantic_models and _ENABLE_AIP_44:
elif use_pydantic_models:

def _pydantic_model_dump(model_cls: type[BaseModel], var: Any) -> dict[str, Any]:
return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined]
Expand Down Expand Up @@ -790,11 +785,6 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
:meta private:
"""
# JSON primitives (except for dict) are not encoded.
if use_pydantic_models and not _ENABLE_AIP_44:
raise RuntimeError(
"Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. "
"This parameter will be removed eventually when new serialization is used by AIP-44"
)
if cls._is_primitive(encoded_var):
return encoded_var
elif isinstance(encoded_var, list):
Expand Down Expand Up @@ -886,7 +876,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
return SlaCallbackRequest.from_json(var)
elif type_ == DAT.TASK_INSTANCE_KEY:
return TaskInstanceKey(**var)
elif use_pydantic_models and _ENABLE_AIP_44:
elif use_pydantic_models:
return _type_to_class[type_][0].model_validate(var)
elif type_ == DAT.ARG_NOT_SET:
return NOTSET
Expand Down
61 changes: 36 additions & 25 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ def remove(*args, **kwargs):
AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py")
AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py")
AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py")
AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "dag.py")
AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py")


class TracebackSessionForTests:
Expand Down Expand Up @@ -369,6 +371,9 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
:return: True if the object was created from test code, False otherwise.
"""
self.traceback = traceback.extract_stack()
if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _ in self.traceback):
# This is a fixture call
return True, None
airflow_frames = [
tb
for tb in self.traceback
Expand All @@ -377,24 +382,30 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
and not tb.filename == AIRFLOW_UTILS_SESSION_PATH
]
if any(
filename.endswith("conftest.py") or filename.endswith("tests/test_utils/db.py")
for filename, _, _, _ in airflow_frames
filename.endswith("conftest.py")
or filename.endswith("tests/test_utils/db.py")
or (filename.startswith(AIRFLOW_TESTS_PATH) and name in ("setup_method", "teardown_method"))
for filename, _, name, _ in airflow_frames
):
# This is a fixture call or testing utilities
return True, None
if (
len(airflow_frames) >= 2
and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH)
and airflow_frames[-1].filename == AIRFLOW_MODELS_BASEOPERATOR_PATH
and airflow_frames[-1].name == "run"
):
# This is baseoperator run method that is called directly from the test code and this is
# usual pattern where we create a session in the test code to create dag_runs for tests.
# If `run` code will be run inside a real "airflow" code the stack trace would be longer
# and it would not be directly called from the test code. Also if subsequently any of the
# run_task() method called later from the task code will attempt to execute any DB
# method, the stack trace will be longer and we will catch it as "illegal" call.
return True, None
if len(airflow_frames) >= 2 and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH):
# Let's look at what we are calling directly from the test code
current_filename, current_method_name = airflow_frames[-1].filename, airflow_frames[-1].name
if (current_filename, current_method_name) in (
(AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"),
(AIRFLOW_MODELS_DAG_PATH, "create_dagrun"),
):
# This is baseoperator run method that is called directly from the test code and this is
# usual pattern where we create a session in the test code to create dag_runs for tests.
# If `run` code will be run inside a real "airflow" code the stack trace would be longer
# and it would not be directly called from the test code. Also if subsequently any of the
# run_task() method called later from the task code will attempt to execute any DB
# method, the stack trace will be longer and we will catch it as "illegal" call.
return True, None
if current_filename == AIRFLOW_DB_UTILS_PATH:
# This is a util method called directly from the test code
return True, None
for tb in airflow_frames[::-1]:
if tb.filename.startswith(AIRFLOW_PATH):
if tb.filename.startswith(AIRFLOW_TESTS_PATH):
Expand All @@ -406,6 +417,16 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
# The traceback line will be always 3rd (two bottom ones are Airflow)
return False, self.traceback[-2]

def get_bind(
self,
mapper=None,
clause=None,
bind=None,
_sa_skip_events=None,
_sa_skip_for_implicit_returning=False,
):
pass


def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
"""Determine whether the database connection URI specifies a relative path."""
Expand Down Expand Up @@ -858,13 +879,3 @@ def is_usage_data_collection_enabled() -> bool:
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")

# AIP-44: internal_api (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {
"true",
"t",
"yes",
"y",
"1",
}
3 changes: 0 additions & 3 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.logging_config import configure_logging
from airflow.models import import_all_models
from airflow.settings import _ENABLE_AIP_44
from airflow.utils.json import AirflowJsonProvider
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links
Expand Down Expand Up @@ -171,8 +170,6 @@ def create_app(config=None, testing=False):
init_error_handlers(flask_app)
init_api_connexion(flask_app)
if conf.getboolean("webserver", "run_internal_api", fallback=False):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
init_api_internal(flask_app)
init_api_auth_provider(flask_app)
init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first
Expand Down
Loading