Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
-->

This is a core airflow package that contains the functionality of Apache Airflow components:
scheduler, API server, dag file processor and triggerer.
scheduler, API server, Dag file processor and triggerer.
2 changes: 1 addition & 1 deletion airflow-core/docs/howto/listener-plugin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Airflow has feature that allows to add listener for monitoring and tracking
the task state using Plugins.

This is a simple example listener plugin of Airflow that helps to track the task
state and collect useful metadata information about the task, Dag run and dag.
state and collect useful metadata information about the task, Dag run and Dag.

This is an example plugin for Airflow that allows to create listener plugin of Airflow.
This plugin works by using SQLAlchemy's event mechanism. It watches
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6e452af5ca150404e6bd7553fb6a9adfcd7081ca60dfd09efea0f9aae7460cc6
c106254b0db4f7be6c268e26a201ee5c4eb45f596e7435f232a5071a75c4b437
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2c62bb0602aab89a86375daeab7543ba
89ee5f65e7b3d25d04d9fe12f352c661
Binary file modified airflow-core/docs/img/diagram_basic_airflow_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def generate_basic_airflow_diagram():
):
user = User("Airflow User")

dag_files = Custom("DAG files", MULTIPLE_FILES_IMAGE.as_posix())
dag_files = Custom("Dag files", MULTIPLE_FILES_IMAGE.as_posix())
user >> Edge(color="brown", style="solid", reverse=False, label="author\n\n") >> dag_files

with Cluster("Parsing, Scheduling & Executing"):
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
56006541287fe451c20e5fdd373d5456
2a7c3d3c13ed5ee81516357da8fb7e3d
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def generate_dag_processor_airflow_diagram():
operations_user = User("Operations User")
deployment_manager = User("Deployment Manager")

with Cluster("Security perimeter with no DAG code execution", graph_attr={"bgcolor": "lightgrey"}):
with Cluster("Security perimeter with no Dag code execution", graph_attr={"bgcolor": "lightgrey"}):
with Cluster("Scheduling\n\n"):
schedulers = Custom("Scheduler(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())

Expand All @@ -78,15 +78,15 @@ def generate_dag_processor_airflow_diagram():

metadata_db = Custom("Metadata DB", DATABASE_IMAGE.as_posix())

dag_author = User("DAG Author")
dag_author = User("Dag Author")

with Cluster("Security perimeter with DAG code execution"):
with Cluster("Security perimeter with Dag code execution"):
with Cluster("Execution"):
workers = Custom("Worker(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())
triggerer = Custom("Triggerer(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())
with Cluster("Parsing"):
dag_processors = Custom("DAG\nProcessor(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())
dag_files = Custom("DAG files", MULTIPLE_FILES_IMAGE.as_posix())
dag_files = Custom("Dag files", MULTIPLE_FILES_IMAGE.as_posix())

plugins_and_packages = Custom("Plugin folder\n& installed packages", PACKAGES_IMAGE.as_posix())

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
9715885d7403de716a7f5114c47f8027
8e5ec0bcedec30b8cf12ef73a2e3350b
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def generate_distributed_airflow_diagram():
graph_attr=graph_attr,
edge_attr=edge_attr,
):
dag_author = User("DAG Author")
dag_author = User("Dag Author")
deployment_manager = User("Deployment Manager")

dag_files = Custom("DAG files", MULTIPLE_FILES_IMAGE.as_posix(), height="1.8")
dag_files = Custom("Dag files", MULTIPLE_FILES_IMAGE.as_posix(), height="1.8")
dag_author >> Edge(color="brown", style="solid", reverse=False, label="author\n\n") >> dag_files

with Cluster("Parsing, Scheduling & Executing"):
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
d1fa59cecc320e7cfc6f8483850b5dc6
87c4172c68252332274d33830a8e526b
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def generate_dag_processor_airflow_diagram():
)

deployment_manager_1 = User("Deployment\nManager\nTeam 1")
dag_author_1 = User("DAG Author\nTeam 1")
dag_author_1 = User("Dag Author\nTeam 1")

with Cluster("Team 1 Airflow Deployment", graph_attr={"bgcolor": "#AAAABB", "fontsize": "22"}):
with Cluster("No DB access"):
Expand All @@ -120,12 +120,12 @@ def generate_dag_processor_airflow_diagram():
triggerer_1 = Custom("Triggerer(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())
with Cluster("Parsing"):
dag_processors_1 = Custom("DAG\nProcessor(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())
dag_files_1 = Custom("DAG Bundles\nTeam 1", MULTIPLE_FILES_IMAGE.as_posix())
dag_files_1 = Custom("Dag Bundles\nTeam 1", MULTIPLE_FILES_IMAGE.as_posix())
plugins_and_packages_1 = Custom("Plugins\n& Packages\nTenant 1", PACKAGES_IMAGE.as_posix())
operations_user_1 = User("Operations User\nTeam 1")

deployment_manager_2 = User("Deployment\nManager\nTeam 2")
dag_author_2 = User("DAG Author\nTeam 2")
dag_author_2 = User("Dag Author\nTeam 2")

with Cluster("Team 2 Airflow Deployment", graph_attr={"fontsize": "22"}):
with Cluster("No DB access"):
Expand All @@ -134,7 +134,7 @@ def generate_dag_processor_airflow_diagram():
triggerer_2 = Custom("Triggerer(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())
with Cluster("Parsing"):
dag_processors_2 = Custom("DAG\nProcessor(s)", PYTHON_MULTIPROCESS_LOGO.as_posix())
dag_files_2 = Custom("DAG Bundles\nTeam 2", MULTIPLE_FILES_IMAGE.as_posix())
dag_files_2 = Custom("Dag Bundles\nTeam 2", MULTIPLE_FILES_IMAGE.as_posix())
plugins_and_packages_2 = Custom("Plugins\n& Packages\nTeam 2", PACKAGES_IMAGE.as_posix())
operations_user_2 = User("Operations User\nTeam 2")

Expand Down
6 changes: 3 additions & 3 deletions airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``2b47dc6bc8df`` | ``d03e4a635aa3`` | ``3.0.0`` | add dag versioning. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``d03e4a635aa3`` | ``d8cd3297971e`` | ``3.0.0`` | Drop DAG pickling. |
| ``d03e4a635aa3`` | ``d8cd3297971e`` | ``3.0.0`` | Drop Dag pickling. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``d8cd3297971e`` | ``5f57a45b8433`` | ``3.0.0`` | Add last_heartbeat_at directly to TI. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down Expand Up @@ -154,8 +154,8 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``0bfc26bc256e`` | ``d0f1c55954fa`` | ``3.0.0`` | Rename DagModel schedule_interval to timetable_summary. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``d0f1c55954fa`` | ``044f740568ec`` | ``3.0.0`` | Remove SubDAGs: ``is_subdag`` & ``root_dag_id`` columns from |
| | | | DAG table. |
| ``d0f1c55954fa`` | ``044f740568ec`` | ``3.0.0`` | Remove SubDags: ``is_subdag`` & ``root_dag_id`` columns from |
| | | | Dag table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``044f740568ec`` | ``5f2621c13b39`` | ``3.0.0`` | Drop ab_user.id foreign key. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/api/common/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Delete DAGs APIs."""
"""Delete Dags APIs."""

from __future__ import annotations

Expand All @@ -42,9 +42,9 @@
@provide_session
def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = NEW_SESSION) -> int:
"""
Delete a DAG by a dag_id.
Delete a Dag by a dag_id.

:param dag_id: the dag_id of the DAG to delete
:param dag_id: the dag_id of the Dag to delete
:param keep_records_in_log: whether keep records of the given dag_id
in the Log table in the backend database (for reasons like auditing).
The default value is True.
Expand Down
20 changes: 10 additions & 10 deletions airflow-core/src/airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def set_state(

task_dags = {task[0].dag if isinstance(task, tuple) else task.dag for task in tasks}
if len(task_dags) > 1:
raise ValueError(f"Received tasks from multiple DAGs: {task_dags}")
raise ValueError(f"Received tasks from multiple Dags: {task_dags}")
dag = next(iter(task_dags))
if dag is None:
raise ValueError("Received tasks with no DAG")
Expand Down Expand Up @@ -143,7 +143,7 @@ def find_task_relatives(tasks, downstream, upstream):

@provide_session
def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASession = NEW_SESSION):
"""Return DAG executions' run_ids."""
"""Return Dag executions' run_ids."""
current_dagrun = dag.get_dagrun(run_id=run_id, session=session)
if current_dagrun.logical_date is None:
return [run_id]
Expand All @@ -162,7 +162,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
end_date = last_dagrun.logical_date if future else current_dagrun.logical_date
start_date = current_dagrun.logical_date if not past else first_dagrun.logical_date
if not dag.timetable.can_be_scheduled:
# If the DAG never schedules, need to look at existing DagRun if the user wants future or
# If the Dag never schedules, need to look at existing DagRun if the user wants future or
# past runs.
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date, session=session)
run_ids = sorted({d.run_id for d in dag_runs})
Expand Down Expand Up @@ -205,9 +205,9 @@ def set_dag_run_state_to_success(

Set for a specific logical date and its task instances to success.

:param dag: the DAG of which to alter state
:param dag: the Dag of which to alter state
:param run_id: the run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param commit: commit Dag and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
Expand Down Expand Up @@ -266,9 +266,9 @@ def set_dag_run_state_to_failed(

Set for a specific logical date and its task instances to failed.

:param dag: the DAG of which to alter state
:param run_id: the DAG run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param dag: the Dag of which to alter state
:param run_id: the Dag run_id to start looking from
:param commit: commit Dag and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
Expand Down Expand Up @@ -356,9 +356,9 @@ def __set_dag_run_state_to_running_or_queued(
"""
Set the dag run for a specific logical date to running.

:param dag: the DAG of which to alter state
:param dag: the Dag of which to alter state
:param run_id: the id of the DagRun
:param commit: commit DAG and tasks to be altered to the database
:param commit: commit Dag and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
Expand Down
12 changes: 6 additions & 6 deletions airflow-core/src/airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Triggering DAG runs APIs."""
"""Triggering Dag runs APIs."""

from __future__ import annotations

Expand Down Expand Up @@ -53,10 +53,10 @@ def _trigger_dag(
session: Session = NEW_SESSION,
) -> DagRun | None:
"""
Triggers DAG run.
Triggers Dag run.

:param dag_id: DAG ID
:param dag_bag: DAG Bag model
:param dag_id: Dag ID
:param dag_bag: Dag Bag model
:param triggered_by: the entity which triggers the dag_run
:param triggering_user_name: the user name who triggers the dag_run
:param run_after: the datetime before which dag cannot run
Expand Down Expand Up @@ -138,9 +138,9 @@ def trigger_dag(
session: Session = NEW_SESSION,
) -> DagRun | None:
"""
Triggers execution of DAG specified by dag_id.
Triggers execution of Dag specified by dag_id.

:param dag_id: DAG ID
:param dag_id: Dag ID
:param triggered_by: the entity which triggers the dag_run
:param triggering_user_name: the user name who triggers the dag_run
:param run_after: the datetime before which dag won't run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ def is_authorized_dag(

:param method: the method to perform
:param user: the user to performing the action
:param access_entity: the kind of DAG information the authorization request is about.
If not provided, the authorization request is about the DAG itself
:param access_entity: the kind of Dag information the authorization request is about.
If not provided, the authorization request is about the Dag itself
:param details: optional details about the DAG
"""

Expand Down Expand Up @@ -347,9 +347,9 @@ def get_authorized_dag_ids(
session: Session = NEW_SESSION,
) -> set[str]:
"""
Get DAGs the user has access to.
Get Dags the user has access to.

By default, reads all the DAGs and check individually if the user has permissions to access the DAG.
By default, reads all the Dags and check individually if the user has permissions to access the DAG.
Can lead to some poor performance. It is recommended to override this method in the auth manager
implementation to provide a more efficient implementation.

Expand All @@ -368,9 +368,9 @@ def filter_authorized_dag_ids(
method: ResourceMethod = "GET",
) -> set[str]:
"""
Filter DAGs the user has access to.
Filter Dags the user has access to.

:param dag_ids: the list of DAG ids
:param dag_ids: the list of Dag ids
:param user: the user
:param method: the method to filter on
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class AccessView(Enum):


class DagAccessEntity(Enum):
"""Enum of DAG entities the user tries to access."""
"""Enum of Dag entities the user tries to access."""

AUDIT_LOG = "AUDIT_LOG"
CODE = "CODE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class SimpleAuthManagerRole(namedtuple("SimpleAuthManagerRole", "name order"), E
# VIEWER role gives all read-only permissions
VIEWER = "VIEWER", 0

# USER role gives viewer role permissions + access to DAGs
# USER role gives viewer role permissions + access to Dags
USER = "USER", 1

# OP role gives user role permissions + access to connections, config, pools, variables
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/api_fastapi/common/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


def create_dag_bag() -> DBDagBag:
"""Create DagBag to retrieve DAGs from the database."""
"""Create DagBag to retrieve Dags from the database."""
return DBDagBag()


Expand Down
14 changes: 7 additions & 7 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def depends(cls, offset: NonNegativeInt = 0) -> OffsetFilter:


class _FavoriteFilter(BaseParam[bool]):
"""Filter DAGs by favorite status."""
"""Filter Dags by favorite status."""

def __init__(self, user_id: str, value: T | None = None, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
Expand Down Expand Up @@ -638,7 +638,7 @@ def depends_float(


class _HasAssetScheduleFilter(BaseParam[bool]):
"""Filter DAGs that have asset-based scheduling."""
"""Filter Dags that have asset-based scheduling."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
Expand All @@ -647,22 +647,22 @@ def to_orm(self, select: Select) -> Select:
asset_ref_subquery = sql_select(DagScheduleAssetReference.dag_id).distinct()

if self.value:
# Filter DAGs that have asset-based scheduling
# Filter Dags that have asset-based scheduling
return select.where(DagModel.dag_id.in_(asset_ref_subquery))

# Filter DAGs that do NOT have asset-based scheduling
# Filter Dags that do NOT have asset-based scheduling
return select.where(DagModel.dag_id.notin_(asset_ref_subquery))

@classmethod
def depends(
cls,
has_asset_schedule: bool | None = Query(None, description="Filter DAGs with asset-based scheduling"),
has_asset_schedule: bool | None = Query(None, description="Filter Dags with asset-based scheduling"),
) -> _HasAssetScheduleFilter:
return cls().set_value(has_asset_schedule)


class _AssetDependencyFilter(BaseParam[str]):
"""Filter DAGs by specific asset dependencies."""
"""Filter Dags by specific asset dependencies."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
Expand All @@ -681,7 +681,7 @@ def to_orm(self, select: Select) -> Select:
def depends(
cls,
asset_dependency: str | None = Query(
None, description="Filter DAGs by asset dependency (name or URI)"
None, description="Filter Dags by asset dependency (name or URI)"
),
) -> _AssetDependencyFilter:
return cls().set_value(asset_dependency)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


class DagScheduleAssetReference(StrictBaseModel):
"""DAG schedule reference serializer for assets."""
"""Dag schedule reference serializer for assets."""

dag_id: str
created_at: datetime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


class DagReportResponse(BaseModel):
"""DAG Report serializer for responses."""
"""Dag Report serializer for responses."""

file: str
duration: timedelta
Expand All @@ -34,7 +34,7 @@ class DagReportResponse(BaseModel):


class DagReportCollectionResponse(BaseModel):
"""DAG Report Collection serializer for responses."""
"""Dag Report Collection serializer for responses."""

dag_reports: list[DagReportResponse]
total_entries: int
Loading
Loading