Skip to content

Commit f17ec0e

Browse files
committed
Update FAB to 4.1.1
The Flask Application Builder have been updated recently to support a number of newer dependencies. This PR is the attempt to migrate FAB to newer version. Fixes: apache#22397
1 parent 2936759 commit f17ec0e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+489
-325
lines changed

.github/workflows/ci.yml

+4
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
801801
run: >
802802
breeze verify-provider-packages --use-airflow-version wheel --use-packages-from-dist
803803
--package-format wheel
804+
env:
805+
SKIP_CONSTRAINTS: "${{ needs.build-info.outputs.upgradeToNewerDependencies }}"
804806
- name: "Remove airflow package and replace providers with 2.2-compliant versions"
805807
run: |
806808
rm -vf dist/apache_airflow-*.whl \
@@ -878,6 +880,8 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
878880
run: >
879881
breeze verify-provider-packages --use-airflow-version sdist --use-packages-from-dist
880882
--package-format sdist
883+
env:
884+
SKIP_CONSTRAINTS: "${{ needs.build-info.outputs.upgradeToNewerDependencies }}"
881885
- name: "Fix ownership"
882886
run: breeze fix-ownership
883887
if: always()

Dockerfile.ci

+27-9
Original file line numberDiff line numberDiff line change
@@ -686,29 +686,47 @@ if [[ ${SKIP_ENVIRONMENT_INITIALIZATION=} != "true" ]]; then
686686
echo "${COLOR_BLUE}Uninstalling airflow and providers"
687687
echo
688688
uninstall_airflow_and_providers
689-
echo "${COLOR_BLUE}Install airflow from wheel package with extras: '${AIRFLOW_EXTRAS}' and constraints reference ${AIRFLOW_CONSTRAINTS_REFERENCE}.${COLOR_RESET}"
690-
echo
691-
install_airflow_from_wheel "${AIRFLOW_EXTRAS}" "${AIRFLOW_CONSTRAINTS_REFERENCE}"
689+
if [[ ${SKIP_CONSTRAINTS,,=} == "true" ]]; then
690+
echo "${COLOR_BLUE}Install airflow from wheel package with extras: '${AIRFLOW_EXTRAS}' with no constraints.${COLOR_RESET}"
691+
echo
692+
install_airflow_from_wheel "${AIRFLOW_EXTRAS}" "none"
693+
else
694+
echo "${COLOR_BLUE}Install airflow from wheel package with extras: '${AIRFLOW_EXTRAS}' and constraints reference ${AIRFLOW_CONSTRAINTS_REFERENCE}.${COLOR_RESET}"
695+
echo
696+
install_airflow_from_wheel "${AIRFLOW_EXTRAS}" "${AIRFLOW_CONSTRAINTS_REFERENCE}"
697+
fi
692698
uninstall_providers
693699
elif [[ ${USE_AIRFLOW_VERSION} == "sdist" ]]; then
694700
echo
695701
echo "${COLOR_BLUE}Uninstalling airflow and providers"
696702
echo
697703
uninstall_airflow_and_providers
698704
echo
699-
echo "${COLOR_BLUE}Install airflow from sdist package with extras: '${AIRFLOW_EXTRAS}' and constraints reference ${AIRFLOW_CONSTRAINTS_REFERENCE}.${COLOR_RESET}"
700-
echo
701-
install_airflow_from_sdist "${AIRFLOW_EXTRAS}" "${AIRFLOW_CONSTRAINTS_REFERENCE}"
705+
if [[ ${SKIP_CONSTRAINTS,,=} == "true" ]]; then
706+
echo "${COLOR_BLUE}Install airflow from sdist package with extras: '${AIRFLOW_EXTRAS}' with no constraints.${COLOR_RESET}"
707+
echo
708+
install_airflow_from_sdist "${AIRFLOW_EXTRAS}" "none"
709+
else
710+
echo "${COLOR_BLUE}Install airflow from sdist package with extras: '${AIRFLOW_EXTRAS}' and constraints reference ${AIRFLOW_CONSTRAINTS_REFERENCE}.${COLOR_RESET}"
711+
echo
712+
install_airflow_from_sdist "${AIRFLOW_EXTRAS}" "${AIRFLOW_CONSTRAINTS_REFERENCE}"
713+
fi
702714
uninstall_providers
703715
else
704716
echo
705717
echo "${COLOR_BLUE}Uninstalling airflow and providers"
706718
echo
707719
uninstall_airflow_and_providers
708720
echo
709-
echo "${COLOR_BLUE}Install released airflow from PyPI with extras: '${AIRFLOW_EXTRAS}' and constraints reference ${AIRFLOW_CONSTRAINTS_REFERENCE}.${COLOR_RESET}"
710-
echo
711-
install_released_airflow_version "${USE_AIRFLOW_VERSION}" "${AIRFLOW_CONSTRAINTS_REFERENCE}"
721+
if [[ ${SKIP_CONSTRAINTS,,=} == "true" ]]; then
722+
echo "${COLOR_BLUE}Install released airflow from PyPI with extras: '${AIRFLOW_EXTRAS}' with no constraints.${COLOR_RESET}"
723+
echo
724+
install_released_airflow_version "${USE_AIRFLOW_VERSION}" "none"
725+
else
726+
echo "${COLOR_BLUE}Install released airflow from PyPI with extras: '${AIRFLOW_EXTRAS}' and constraints reference ${AIRFLOW_CONSTRAINTS_REFERENCE}.${COLOR_RESET}"
727+
echo
728+
install_released_airflow_version "${USE_AIRFLOW_VERSION}" "${AIRFLOW_CONSTRAINTS_REFERENCE}"
729+
fi
712730
fi
713731
if [[ ${USE_PACKAGES_FROM_DIST=} == "true" ]]; then
714732
echo

airflow/api/auth/backend/basic_auth.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
from functools import wraps
1919
from typing import Any, Callable, Optional, Tuple, TypeVar, Union, cast
2020

21-
from flask import Response, current_app, request
21+
from flask import Response, request
2222
from flask_appbuilder.const import AUTH_LDAP
2323
from flask_login import login_user
2424

25+
from airflow.utils.airflow_flask_app import get_airflow_app
2526
from airflow.www.fab_security.sqla.models import User
2627

2728
CLIENT_AUTH: Optional[Union[Tuple[str, str], Any]] = None
@@ -40,7 +41,7 @@ def auth_current_user() -> Optional[User]:
4041
if auth is None or not auth.username or not auth.password:
4142
return None
4243

43-
ab_security_manager = current_app.appbuilder.sm
44+
ab_security_manager = get_airflow_app().appbuilder.sm
4445
user = None
4546
if ab_security_manager.auth_type == AUTH_LDAP:
4647
user = ab_security_manager.auth_user_ldap(auth.username, auth.password)

airflow/api_connexion/endpoints/dag_endpoint.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from typing import Collection, Optional
2020

2121
from connexion import NoContent
22-
from flask import current_app, g, request
22+
from flask import g, request
2323
from marshmallow import ValidationError
2424
from sqlalchemy.orm import Session
2525
from sqlalchemy.sql.expression import or_
@@ -38,6 +38,7 @@
3838
from airflow.exceptions import AirflowException, DagNotFound
3939
from airflow.models.dag import DagModel, DagTag
4040
from airflow.security import permissions
41+
from airflow.utils.airflow_flask_app import get_airflow_app
4142
from airflow.utils.session import NEW_SESSION, provide_session
4243

4344

@@ -56,7 +57,7 @@ def get_dag(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
5657
@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
5758
def get_dag_details(*, dag_id: str) -> APIResponse:
5859
"""Get details of DAG."""
59-
dag: DAG = current_app.dag_bag.get_dag(dag_id)
60+
dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
6061
if not dag:
6162
raise NotFound("DAG not found", detail=f"The DAG with dag_id: {dag_id} was not found")
6263
return dag_detail_schema.dump(dag)
@@ -83,7 +84,7 @@ def get_dags(
8384
if dag_id_pattern:
8485
dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
8586

86-
readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
87+
readable_dags = get_airflow_app().appbuilder.sm.get_accessible_dag_ids(g.user)
8788

8889
dags_query = dags_query.filter(DagModel.dag_id.in_(readable_dags))
8990
if tags:
@@ -143,7 +144,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat
143144
if dag_id_pattern == '~':
144145
dag_id_pattern = '%'
145146
dags_query = dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
146-
editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
147+
editable_dags = get_airflow_app().appbuilder.sm.get_editable_dag_ids(g.user)
147148

148149
dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
149150
if tags:

airflow/api_connexion/endpoints/dag_run_endpoint.py

+14-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import pendulum
2121
from connexion import NoContent
22-
from flask import current_app, g, request
22+
from flask import g
2323
from marshmallow import ValidationError
2424
from sqlalchemy import or_
2525
from sqlalchemy.orm import Query, Session
@@ -30,6 +30,7 @@
3030
set_dag_run_state_to_success,
3131
)
3232
from airflow.api_connexion import security
33+
from airflow.api_connexion.endpoints.mapping_from_request import get_mapping_from_request
3334
from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound
3435
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_datetime, format_parameters
3536
from airflow.api_connexion.schemas.dag_run_schema import (
@@ -47,6 +48,7 @@
4748
from airflow.api_connexion.types import APIResponse
4849
from airflow.models import DagModel, DagRun
4950
from airflow.security import permissions
51+
from airflow.utils.airflow_flask_app import get_airflow_app
5052
from airflow.utils.session import NEW_SESSION, provide_session
5153
from airflow.utils.state import DagRunState
5254
from airflow.utils.types import DagRunType
@@ -167,7 +169,7 @@ def get_dag_runs(
167169

168170
# This endpoint allows specifying ~ as the dag_id to retrieve DAG Runs for all DAGs.
169171
if dag_id == "~":
170-
appbuilder = current_app.appbuilder
172+
appbuilder = get_airflow_app().appbuilder
171173
query = query.filter(DagRun.dag_id.in_(appbuilder.sm.get_readable_dag_ids(g.user)))
172174
else:
173175
query = query.filter(DagRun.dag_id == dag_id)
@@ -199,13 +201,13 @@ def get_dag_runs(
199201
@provide_session
200202
def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
201203
"""Get list of DAG Runs"""
202-
body = request.get_json()
204+
body = get_mapping_from_request()
203205
try:
204206
data = dagruns_batch_form_schema.load(body)
205207
except ValidationError as err:
206208
raise BadRequest(detail=str(err.messages))
207209

208-
appbuilder = current_app.appbuilder
210+
appbuilder = get_airflow_app().appbuilder
209211
readable_dag_ids = appbuilder.sm.get_readable_dag_ids(g.user)
210212
query = session.query(DagRun)
211213
if data.get("dag_ids"):
@@ -252,7 +254,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
252254
detail=f"DAG with dag_id: '{dag_id}' has import errors",
253255
)
254256
try:
255-
post_body = dagrun_schema.load(request.json, session=session)
257+
post_body = dagrun_schema.load(get_mapping_from_request(), session=session)
256258
except ValidationError as err:
257259
raise BadRequest(detail=str(err))
258260

@@ -268,7 +270,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
268270
)
269271
if not dagrun_instance:
270272
try:
271-
dag = current_app.dag_bag.get_dag(dag_id)
273+
dag = get_airflow_app().dag_bag.get_dag(dag_id)
272274
dag_run = dag.create_dagrun(
273275
run_type=DagRunType.MANUAL,
274276
run_id=run_id,
@@ -277,7 +279,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
277279
state=DagRunState.QUEUED,
278280
conf=post_body.get("conf"),
279281
external_trigger=True,
280-
dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
282+
dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
281283
)
282284
return dagrun_schema.dump(dag_run)
283285
except ValueError as ve:
@@ -310,12 +312,12 @@ def update_dag_run_state(*, dag_id: str, dag_run_id: str, session: Session = NEW
310312
error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
311313
raise NotFound(error_message)
312314
try:
313-
post_body = set_dagrun_state_form_schema.load(request.json)
315+
post_body = set_dagrun_state_form_schema.load(get_mapping_from_request())
314316
except ValidationError as err:
315317
raise BadRequest(detail=str(err))
316318

317319
state = post_body['state']
318-
dag = current_app.dag_bag.get_dag(dag_id)
320+
dag = get_airflow_app().dag_bag.get_dag(dag_id)
319321
if state == DagRunState.SUCCESS:
320322
set_dag_run_state_to_success(dag=dag, run_id=dag_run.run_id, commit=True)
321323
elif state == DagRunState.QUEUED:
@@ -339,15 +341,15 @@ def clear_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSIO
339341
session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
340342
)
341343
if dag_run is None:
342-
error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
344+
error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
343345
raise NotFound(error_message)
344346
try:
345-
post_body = clear_dagrun_form_schema.load(request.json)
347+
post_body = clear_dagrun_form_schema.load(get_mapping_from_request())
346348
except ValidationError as err:
347349
raise BadRequest(detail=str(err))
348350

349351
dry_run = post_body.get('dry_run', False)
350-
dag = current_app.dag_bag.get_dag(dag_id)
352+
dag = get_airflow_app().dag_bag.get_dag(dag_id)
351353
start_date = dag_run.logical_date
352354
end_date = dag_run.logical_date
353355

airflow/api_connexion/endpoints/extra_link_endpoint.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from flask import current_app
1918
from sqlalchemy.orm.session import Session
2019

2120
from airflow import DAG
@@ -25,6 +24,7 @@
2524
from airflow.exceptions import TaskNotFound
2625
from airflow.models.dagbag import DagBag
2726
from airflow.security import permissions
27+
from airflow.utils.airflow_flask_app import get_airflow_app
2828
from airflow.utils.session import NEW_SESSION, provide_session
2929

3030

@@ -46,7 +46,7 @@ def get_extra_links(
4646
"""Get extra links for task instance"""
4747
from airflow.models.taskinstance import TaskInstance
4848

49-
dagbag: DagBag = current_app.dag_bag
49+
dagbag: DagBag = get_airflow_app().dag_bag
5050
dag: DAG = dagbag.get_dag(dag_id)
5151
if not dag:
5252
raise NotFound("DAG not found", detail=f'DAG with ID = "{dag_id}" not found')

airflow/api_connexion/endpoints/log_endpoint.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
1817
from typing import Any, Optional
1918

20-
from flask import Response, current_app, request
19+
from flask import Response, request
2120
from itsdangerous.exc import BadSignature
2221
from itsdangerous.url_safe import URLSafeSerializer
2322
from sqlalchemy.orm.session import Session
@@ -29,6 +28,7 @@
2928
from airflow.exceptions import TaskNotFound
3029
from airflow.models import TaskInstance
3130
from airflow.security import permissions
31+
from airflow.utils.airflow_flask_app import get_airflow_app
3232
from airflow.utils.log.log_reader import TaskLogReader
3333
from airflow.utils.session import NEW_SESSION, provide_session
3434

@@ -52,7 +52,7 @@ def get_log(
5252
session: Session = NEW_SESSION,
5353
) -> APIResponse:
5454
"""Get logs for specific task instance"""
55-
key = current_app.config["SECRET_KEY"]
55+
key = get_airflow_app().config["SECRET_KEY"]
5656
if not token:
5757
metadata = {}
5858
else:
@@ -87,7 +87,7 @@ def get_log(
8787
metadata['end_of_log'] = True
8888
raise NotFound(title="TaskInstance not found")
8989

90-
dag = current_app.dag_bag.get_dag(dag_id)
90+
dag = get_airflow_app().dag_bag.get_dag(dag_id)
9191
if dag:
9292
try:
9393
ti.task = dag.get_task(ti.task_id)
@@ -101,7 +101,8 @@ def get_log(
101101
if return_type == 'application/json' or return_type is None: # default
102102
logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
103103
logs = logs[0] if task_try_number is not None else logs
104-
token = URLSafeSerializer(key).dumps(metadata)
104+
# we must have token here, so we can safely ignore it
105+
token = URLSafeSerializer(key).dumps(metadata) # type: ignore[assignment]
105106
return logs_schema.dump(LogResponseObject(continuation_token=token, content=logs))
106107
# text/plain. Stream
107108
logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from typing import Any, Mapping, cast
19+
20+
21+
def get_mapping_from_request() -> Mapping[str, Any]:
22+
from flask import request
23+
24+
return cast(Mapping[str, Any], request.get_json())

0 commit comments

Comments
 (0)