From 85d462ac8f2b62ad0329cbb988460074f3d3d930 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 9 Apr 2026 16:34:26 +0530 Subject: [PATCH] Fix mypy 1.20.0 type check failures Update mirrors-mypy from v1.19.1 to v1.20.0 in pre-commit config. Change type: ignore error codes from [attr-defined] to [union-attr] in cosmos/airflow/graph.py. mypy 1.20.0 now correctly infers the union type DAGNode | Any and reports union-attr instead of attr-defined for attribute access on union members. Co-Authored-By: Claude Opus 4.6 (1M context) --- .pre-commit-config.yaml | 2 +- cosmos/airflow/graph.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1c5a4d886a..3f5a23ae70 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -122,7 +122,7 @@ repos: pass_filenames: false files: ^docs/ - repo: https://github.com/pre-commit/mirrors-mypy - rev: "v1.19.1" + rev: "v1.20.0" hooks: - id: mypy name: mypy-python diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 73de6c5748..b2ad51f1f9 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -664,7 +664,7 @@ def _add_dbt_setup_async_task( else [task_or_taskgroup] ) for task in node_tasks: - task.producer_task_id = setup_airflow_task.task_id # type: ignore[attr-defined] + task.producer_task_id = setup_airflow_task.task_id # type: ignore[union-attr] if not task.upstream_list: setup_airflow_task >> task @@ -742,7 +742,7 @@ def _add_watcher_dependencies( else [task_or_taskgroup] ) for task in node_tasks: - task.producer_task_id = producer_airflow_task.task_id # type: ignore[attr-defined] + task.producer_task_id = producer_airflow_task.task_id # type: ignore[union-attr] # Make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom # We only managed to do this in the case of DbtDag. @@ -761,7 +761,7 @@ def _add_watcher_dependencies( else: always_run_tasks = [task_or_taskgroup] for task in always_run_tasks: - task.trigger_rule = task_args.get("trigger_rule", "always") # type: ignore[attr-defined] + task.trigger_rule = task_args.get("trigger_rule", "always") # type: ignore[union-attr] def should_create_detached_nodes(render_config: RenderConfig) -> bool: