Skip to content

Commit b9e4d65

Browse files
kaxilPaulKobow7536
authored andcommitted
Remove the ability to import executors from plugins (apache#43289)
Executors should no longer be registered or imported via Airflow's plugin mechanism -- these types of classes are just treated as plain python classes by Airflow, so there is no need to register them with Airflow.
1 parent deadeb9 commit b9e4d65

File tree

14 files changed

+7
-124
lines changed

14 files changed

+7
-124
lines changed

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

-6
Original file line numberDiff line numberDiff line change
@@ -2212,11 +2212,6 @@ components:
22122212
type: string
22132213
type: array
22142214
title: Hooks
2215-
executors:
2216-
items:
2217-
type: string
2218-
type: array
2219-
title: Executors
22202215
macros:
22212216
items:
22222217
type: string
@@ -2274,7 +2269,6 @@ components:
22742269
required:
22752270
- name
22762271
- hooks
2277-
- executors
22782272
- macros
22792273
- flask_blueprints
22802274
- fastapi_apps

airflow/api_fastapi/core_api/serializers/plugins.py

-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ class PluginResponse(BaseModel):
6565

6666
name: str
6767
hooks: list[str]
68-
executors: list[str]
6968
macros: list[str]
7069
flask_blueprints: list[str]
7170
fastapi_apps: list[FastAPIAppResponse]

airflow/executors/executor_loader.py

-12
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import functools
2222
import logging
2323
import os
24-
from contextlib import suppress
2524
from typing import TYPE_CHECKING
2625

2726
from airflow.api_internal.internal_api_call import InternalApiConfig
@@ -284,17 +283,6 @@ def _import_and_validate(path: str) -> type[BaseExecutor]:
284283
cls.validate_database_executor_compatibility(executor)
285284
return executor
286285

287-
if executor_name.connector_source == ConnectorSource.PLUGIN:
288-
with suppress(ImportError, AttributeError):
289-
# Load plugins here for executors as at that time the plugins might not have been
290-
# initialized yet
291-
from airflow import plugins_manager
292-
293-
plugins_manager.integrate_executor_plugins()
294-
return (
295-
_import_and_validate(f"airflow.executors.{executor_name.module_path}"),
296-
ConnectorSource.PLUGIN,
297-
)
298286
return _import_and_validate(executor_name.module_path), executor_name.connector_source
299287

300288
@classmethod

airflow/plugins_manager.py

+1-32
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
# Plugin components to integrate as modules
6565
registered_hooks: list[BaseHook] | None = None
6666
macros_modules: list[Any] | None = None
67-
executors_modules: list[Any] | None = None
6867

6968
# Plugin components to integrate directly
7069
admin_views: list[Any] | None = None
@@ -88,7 +87,6 @@
8887
"""
8988
PLUGINS_ATTRIBUTES_TO_DUMP = {
9089
"hooks",
91-
"executors",
9290
"macros",
9391
"admin_views",
9492
"flask_blueprints",
@@ -154,7 +152,6 @@ class AirflowPlugin:
154152
name: str | None = None
155153
source: AirflowPluginSource | None = None
156154
hooks: list[Any] = []
157-
executors: list[Any] = []
158155
macros: list[Any] = []
159156
admin_views: list[Any] = []
160157
flask_blueprints: list[Any] = []
@@ -533,33 +530,6 @@ def initialize_hook_lineage_readers_plugins():
533530
hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)
534531

535532

536-
def integrate_executor_plugins() -> None:
537-
"""Integrate executor plugins to the context."""
538-
global plugins
539-
global executors_modules
540-
541-
if executors_modules is not None:
542-
return
543-
544-
ensure_plugins_loaded()
545-
546-
if plugins is None:
547-
raise AirflowPluginException("Can't load plugins.")
548-
549-
log.debug("Integrate executor plugins")
550-
551-
executors_modules = []
552-
for plugin in plugins:
553-
if plugin.name is None:
554-
raise AirflowPluginException("Invalid plugin name")
555-
plugin_name: str = plugin.name
556-
557-
executors_module = make_module("airflow.executors." + plugin_name, plugin.executors)
558-
if executors_module:
559-
executors_modules.append(executors_module)
560-
sys.modules[executors_module.__name__] = executors_module
561-
562-
563533
def integrate_macros_plugins() -> None:
564534
"""Integrates macro plugins."""
565535
global plugins
@@ -615,7 +585,6 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str
615585
:param attrs_to_dump: A list of plugin attributes to dump
616586
"""
617587
ensure_plugins_loaded()
618-
integrate_executor_plugins()
619588
integrate_macros_plugins()
620589
initialize_web_ui_plugins()
621590
initialize_fastapi_plugins()
@@ -629,7 +598,7 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str
629598
for attr in attrs_to_dump:
630599
if attr in ("global_operator_extra_links", "operator_extra_links"):
631600
info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)]
632-
elif attr in ("macros", "timetables", "hooks", "executors", "priority_weight_strategies"):
601+
elif attr in ("macros", "timetables", "hooks", "priority_weight_strategies"):
633602
info[attr] = [qualname(d) for d in getattr(plugin, attr)]
634603
elif attr == "listeners":
635604
# listeners may be modules or class instances

airflow/ui/openapi-gen/requests/schemas.gen.ts

-8
Original file line numberDiff line numberDiff line change
@@ -1333,13 +1333,6 @@ export const $PluginResponse = {
13331333
type: "array",
13341334
title: "Hooks",
13351335
},
1336-
executors: {
1337-
items: {
1338-
type: "string",
1339-
},
1340-
type: "array",
1341-
title: "Executors",
1342-
},
13431336
macros: {
13441337
items: {
13451338
type: "string",
@@ -1419,7 +1412,6 @@ export const $PluginResponse = {
14191412
required: [
14201413
"name",
14211414
"hooks",
1422-
"executors",
14231415
"macros",
14241416
"flask_blueprints",
14251417
"fastapi_apps",

airflow/ui/openapi-gen/requests/types.gen.ts

-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ export type PluginCollectionResponse = {
311311
export type PluginResponse = {
312312
name: string;
313313
hooks: Array<string>;
314-
executors: Array<string>;
315314
macros: Array<string>;
316315
flask_blueprints: Array<string>;
317316
fastapi_apps: Array<FastAPIAppResponse>;

airflow/www/views.py

-1
Original file line numberDiff line numberDiff line change
@@ -4286,7 +4286,6 @@ class PluginView(AirflowBaseView):
42864286
def list(self):
42874287
"""List loaded plugins."""
42884288
plugins_manager.ensure_plugins_loaded()
4289-
plugins_manager.integrate_executor_plugins()
42904289
plugins_manager.initialize_extra_operators_links_plugins()
42914290
plugins_manager.initialize_web_ui_plugins()
42924291
plugins_manager.initialize_fastapi_plugins()

newsfragments/43289.significant.rst

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Support for adding executors via Airflow Plugins is removed
2+
3+
Executors should no longer be registered or imported via Airflow's plugin mechanism -- these types of classes
4+
are just treated as plain Python classes by Airflow, so there is no need to register them with Airflow.

tests/api_connexion/endpoints/test_plugin_endpoint.py

-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ def test_get_plugins_return_200(self):
145145
{
146146
"appbuilder_menu_items": [appbuilder_menu_items],
147147
"appbuilder_views": [{"view": qualname(MockView)}],
148-
"executors": [],
149148
"flask_blueprints": [
150149
f"<{qualname(bp.__class__)}: name={bp.name!r} import_name={bp.import_name!r}>"
151150
],

tests/api_connexion/schemas/test_plugin_schema.py

-3
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ def test_serialize(self):
8686
assert deserialized_plugin == {
8787
"appbuilder_menu_items": [appbuilder_menu_items],
8888
"appbuilder_views": [{"view": self.mock_plugin.appbuilder_views[0]["view"]}],
89-
"executors": [],
9089
"flask_blueprints": [str(bp)],
9190
"fastapi_apps": [
9291
{"app": app, "name": "App name", "url_prefix": "/some_prefix"},
@@ -113,7 +112,6 @@ def test_serialize(self):
113112
{
114113
"appbuilder_menu_items": [appbuilder_menu_items],
115114
"appbuilder_views": [{"view": self.mock_plugin.appbuilder_views[0]["view"]}],
116-
"executors": [],
117115
"flask_blueprints": [str(bp)],
118116
"fastapi_apps": [
119117
{"app": app, "name": "App name", "url_prefix": "/some_prefix"},
@@ -131,7 +129,6 @@ def test_serialize(self):
131129
{
132130
"appbuilder_menu_items": [appbuilder_menu_items],
133131
"appbuilder_views": [{"view": self.mock_plugin.appbuilder_views[0]["view"]}],
134-
"executors": [],
135132
"flask_blueprints": [str(bp)],
136133
"fastapi_apps": [
137134
{"app": app, "name": "App name", "url_prefix": "/some_prefix"},

tests/cli/commands/test_plugins_command.py

-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ def test_should_display_one_plugin(self):
6969
"admin_views": [],
7070
"macros": ["tests.plugins.test_plugin.plugin_macro"],
7171
"menu_links": [],
72-
"executors": ["tests.plugins.test_plugin.PluginExecutor"],
7372
"flask_blueprints": [
7473
"<flask.blueprints.Blueprint: name='test_plugin' import_name='tests.plugins.test_plugin'>"
7574
],

tests/executors/test_executor_loader.py

+2-48
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import pytest
2424

25-
from airflow import plugins_manager
2625
from airflow.exceptions import AirflowConfigException
2726
from airflow.executors import executor_loader
2827
from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, ExecutorName
@@ -34,9 +33,6 @@
3433

3534
pytestmark = pytest.mark.skip_if_database_isolation_mode
3635

37-
# Plugin Manager creates new modules, which is difficult to mock, so we use test isolation by a unique name.
38-
TEST_PLUGIN_NAME = "unique_plugin_name_to_avoid_collision_i_love_kitties"
39-
4036

4137
class FakeExecutor:
4238
is_single_threaded = False
@@ -46,11 +42,6 @@ class FakeSingleThreadedExecutor:
4642
is_single_threaded = True
4743

4844

49-
class FakePlugin(plugins_manager.AirflowPlugin):
50-
name = TEST_PLUGIN_NAME
51-
executors = [FakeExecutor]
52-
53-
5445
class TestExecutorLoader:
5546
def setup_method(self) -> None:
5647
from airflow.executors import executor_loader
@@ -89,17 +80,6 @@ def test_should_support_executor_from_core(self, executor_name):
8980
assert executor.name == ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name)
9081
assert executor.name.connector_source == ConnectorSource.CORE
9182

92-
@mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
93-
@mock.patch("airflow.plugins_manager.executors_modules", None)
94-
def test_should_support_plugins(self):
95-
with conf_vars({("core", "executor"): f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
96-
executor = ExecutorLoader.get_default_executor()
97-
assert executor is not None
98-
assert "FakeExecutor" == executor.__class__.__name__
99-
assert executor.name is not None
100-
assert executor.name == ExecutorName(f"{TEST_PLUGIN_NAME}.FakeExecutor")
101-
assert executor.name.connector_source == ConnectorSource.PLUGIN
102-
10383
def test_should_support_custom_path(self):
10484
with conf_vars({("core", "executor"): "tests.executors.test_executor_loader.FakeExecutor"}):
10585
executor = ExecutorLoader.get_default_executor()
@@ -124,7 +104,7 @@ def test_should_support_custom_path(self):
124104
),
125105
# Core executors and custom module path executor and plugin
126106
(
127-
f"CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor, {TEST_PLUGIN_NAME}.FakeExecutor",
107+
"CeleryExecutor, LocalExecutor, tests.executors.test_executor_loader.FakeExecutor",
128108
[
129109
ExecutorName(
130110
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
@@ -138,17 +118,12 @@ def test_should_support_custom_path(self):
138118
"tests.executors.test_executor_loader.FakeExecutor",
139119
None,
140120
),
141-
ExecutorName(
142-
f"{TEST_PLUGIN_NAME}.FakeExecutor",
143-
None,
144-
),
145121
],
146122
),
147123
# Core executors and custom module path executor and plugin with aliases
148124
(
149125
(
150-
"CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor, "
151-
f"plugin_exec:{TEST_PLUGIN_NAME}.FakeExecutor"
126+
"CeleryExecutor, LocalExecutor, fake_exec:tests.executors.test_executor_loader.FakeExecutor"
152127
),
153128
[
154129
ExecutorName(
@@ -163,10 +138,6 @@ def test_should_support_custom_path(self):
163138
"tests.executors.test_executor_loader.FakeExecutor",
164139
"fake_exec",
165140
),
166-
ExecutorName(
167-
f"{TEST_PLUGIN_NAME}.FakeExecutor",
168-
"plugin_exec",
169-
),
170141
],
171142
),
172143
],
@@ -194,8 +165,6 @@ def test_init_executors(self):
194165
"CeleryExecutor, my.module.path, my.module.path",
195166
"CeleryExecutor, my_alias:my.module.path, my.module.path",
196167
"CeleryExecutor, my_alias:my.module.path, other_alias:my.module.path",
197-
f"CeleryExecutor, {TEST_PLUGIN_NAME}.FakeExecutor, {TEST_PLUGIN_NAME}.FakeExecutor",
198-
f"my_alias:{TEST_PLUGIN_NAME}.FakeExecutor, other_alias:{TEST_PLUGIN_NAME}.FakeExecutor",
199168
],
200169
)
201170
def test_get_hybrid_executors_from_config_duplicates_should_fail(self, executor_config):
@@ -239,21 +208,6 @@ def test_should_support_import_executor_from_core(self, executor_config, expecte
239208
assert expected_value == executor.__name__
240209
assert import_source == ConnectorSource.CORE
241210

242-
@mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
243-
@mock.patch("airflow.plugins_manager.executors_modules", None)
244-
@pytest.mark.parametrize(
245-
("executor_config"),
246-
[
247-
(f"{TEST_PLUGIN_NAME}.FakeExecutor"),
248-
(f"my_cool_alias:{TEST_PLUGIN_NAME}.FakeExecutor, CeleryExecutor"),
249-
],
250-
)
251-
def test_should_support_import_plugins(self, executor_config):
252-
with conf_vars({("core", "executor"): executor_config}):
253-
executor, import_source = ExecutorLoader.import_default_executor_cls()
254-
assert "FakeExecutor" == executor.__name__
255-
assert import_source == ConnectorSource.PLUGIN
256-
257211
@pytest.mark.parametrize(
258212
"executor_config",
259213
[

tests/plugins/test_plugin.py

-8
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
from flask import Blueprint
2222
from flask_appbuilder import BaseView as AppBuilderBaseView, expose
2323

24-
from airflow.executors.base_executor import BaseExecutor
25-
2624
# Importing base classes that we need to derive
2725
from airflow.hooks.base import BaseHook
2826

@@ -49,11 +47,6 @@ class PluginHook(BaseHook):
4947
pass
5048

5149

52-
# Will show up under airflow.executors.test_plugin.PluginExecutor
53-
class PluginExecutor(BaseExecutor):
54-
pass
55-
56-
5750
# Will show up under airflow.macros.test_plugin.plugin_macro
5851
def plugin_macro():
5952
pass
@@ -123,7 +116,6 @@ def get_weight(self, ti):
123116
class AirflowTestPlugin(AirflowPlugin):
124117
name = "test_plugin"
125118
hooks = [PluginHook]
126-
executors = [PluginExecutor]
127119
macros = [plugin_macro]
128120
flask_blueprints = [bp]
129121
fastapi_apps = [app_with_metadata]

tests_common/test_utils/mock_plugins.py

-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
"plugins",
2626
"registered_hooks",
2727
"macros_modules",
28-
"executors_modules",
2928
"admin_views",
3029
"flask_blueprints",
3130
"fastapi_apps",
@@ -44,7 +43,6 @@
4443
"plugins",
4544
"registered_hooks",
4645
"macros_modules",
47-
"executors_modules",
4846
"admin_views",
4947
"flask_blueprints",
5048
"menu_links",

0 commit comments

Comments
 (0)