diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py index a9d8325e5ba64..76f279b02e2e0 100644 --- a/airflow/api/common/mark_tasks.py +++ b/airflow/api/common/mark_tasks.py @@ -158,10 +158,6 @@ def set_state( qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates) tis_altered += session.scalars(qry_sub_dag.with_for_update()).all() for task_instance in tis_altered: - # The try_number was decremented when setting to up_for_reschedule and deferred. - # Increment it back when changing the state again - if task_instance.state in (TaskInstanceState.DEFERRED, TaskInstanceState.UP_FOR_RESCHEDULE): - task_instance._try_number += 1 task_instance.set_state(state, session=session) session.flush() else: diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 4777b8bd4c577..f4ea4bdddf72b 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -53,7 +53,7 @@ class Meta: end_date = auto_field() duration = auto_field() state = TaskInstanceStateField() - _try_number = auto_field(data_key="try_number") + try_number = auto_field() max_tries = auto_field() task_display_name = fields.String(attribute="task_display_name", dump_only=True) hostname = auto_field() diff --git a/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py b/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py index 44e6bad432a60..3335b7da4ee77 100644 --- a/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py +++ b/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py @@ -30,7 +30,7 @@ class DecreasingPriorityStrategy(PriorityWeightStrategy): """A priority weight strategy that decreases the priority weight with each attempt of the DAG task.""" def get_weight(self, ti: TaskInstance): - return max(3 - ti._try_number + 1, 1) + return max(3 - ti.try_number + 1, 1) class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin): diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index ace6a001311a1..4fd6b24bbfc2a 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -22,7 +22,7 @@ import attr import pendulum -from sqlalchemy import select, tuple_, update +from sqlalchemy import case, or_, select, tuple_, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient from tabulate import tabulate @@ -245,7 +245,16 @@ def _update_counters(self, ti_status: _DagRunTaskStatus, session: Session) -> No session.execute( update(TI) .where(filter_for_tis) - .values(state=TaskInstanceState.SCHEDULED) + .values( + state=TaskInstanceState.SCHEDULED, + try_number=case( + ( + or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE), + TI.try_number + 1, + ), + else_=TI.try_number, + ), + ) .execution_options(synchronize_session=False) ) session.flush() @@ -425,6 +434,8 @@ def _task_instances_for_dag_run( try: for ti in dag_run.get_task_instances(session=session): if ti in schedulable_tis: + if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE: + ti.try_number += 1 ti.set_state(TaskInstanceState.SCHEDULED) if ti.state != TaskInstanceState.REMOVED: tasks_to_run[ti.key] = ti @@ -515,6 +526,7 @@ def _per_task_process(key, ti: TaskInstance, session): if key in ti_status.running: ti_status.running.pop(key) # Reset the failed task in backfill to scheduled state + ti.try_number += 1 ti.set_state(TaskInstanceState.SCHEDULED, session=session) if ti.dag_run not in ti_status.active_runs: ti_status.active_runs.add(ti.dag_run) @@ -552,6 +564,14 @@ def _per_task_process(key, ti: TaskInstance, session): else: self.log.debug("Sending %s to executor", ti) # Skip scheduled state, we are executing immediately + if ti.state in (TaskInstanceState.UP_FOR_RETRY, None): + # i am not sure why this is necessary. + # seemingly a quirk of backfill runner. + # it should be handled elsewhere i think. + # seems the leaf tasks are set SCHEDULED but others not. + # but i am not going to look too closely since we need + # to nuke the current backfill approach anyway. + ti.try_number += 1 ti.state = TaskInstanceState.QUEUED ti.queued_by_job_id = self.job.id ti.queued_dttm = timezone.utcnow() @@ -695,7 +715,9 @@ def _per_task_process(key, ti: TaskInstance, session): self.log.debug(e) perform_heartbeat( - job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True + job=self.job, + heartbeat_callback=self.heartbeat_callback, + only_if_necessary=True, ) # execute the tasks in the queue executor.heartbeat() @@ -725,6 +747,7 @@ def to_keep(key: TaskInstanceKey) -> bool: ti_status.to_run.update({ti.key: ti for ti in new_mapped_tis}) for new_ti in new_mapped_tis: + new_ti.try_number += 1 new_ti.set_state(TaskInstanceState.SCHEDULED, session=session) # Set state to failed for running TIs that are set up for retry if disable-retry flag is set @@ -930,7 +953,6 @@ def _execute(self, session: Session = NEW_SESSION) -> None: "combination. Please adjust backfill dates or wait for this DagRun to finish.", ) return - # picklin' pickle_id = None executor_class, _ = ExecutorLoader.import_default_executor_cls() diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 9db0de45a036d..c9a8424a831f0 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2948,6 +2948,8 @@ def add_logger_if_needed(ti: TaskInstance): session.expire_all() schedulable_tis, _ = dr.update_state(session=session) for s in schedulable_tis: + if s.state != TaskInstanceState.UP_FOR_RESCHEDULE: + s.try_number += 1 s.state = TaskInstanceState.SCHEDULED session.commit() # triggerer may mark tasks scheduled so we read from DB diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 117dd59e288ba..84076ee70c53e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -45,7 +45,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates -from sqlalchemy.sql.expression import false, select, true +from sqlalchemy.sql.expression import case, false, select, true from airflow import settings from airflow.api_internal.internal_api_call import internal_api_call @@ -1545,7 +1545,8 @@ def schedule_tis( and not ti.task.on_success_callback and not ti.task.outlets ): - ti._try_number += 1 + if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE: + ti.try_number += 1 ti.defer_task( defer=TaskDeferred(trigger=ti.task.start_trigger, method_name=ti.task.next_method), session=session, @@ -1567,7 +1568,16 @@ def schedule_tis( TI.run_id == self.run_id, tuple_in_condition((TI.task_id, TI.map_index), schedulable_ti_ids_chunk), ) - .values(state=TaskInstanceState.SCHEDULED) + .values( + state=TaskInstanceState.SCHEDULED, + try_number=case( + ( + or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE), + TI.try_number + 1, + ), + else_=TI.try_number, + ), + ) .execution_options(synchronize_session=False) ).rowcount diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 1a9d1e00362ef..3bc96ec01ca92 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -38,6 +38,7 @@ import jinja2 import lazy_object_proxy import pendulum +from deprecated import deprecated from jinja2 import TemplateAssertionError, UndefinedError from sqlalchemy import ( Column, @@ -281,14 +282,13 @@ def clear_task_instances( ti.refresh_from_task(task) if TYPE_CHECKING: assert ti.task - task_retries = task.retries - ti.max_tries = ti.try_number + task_retries - 1 + ti.max_tries = ti.try_number + task.retries else: # Ignore errors when updating max_tries if the DAG or # task are not found since database records could be # outdated. We make max_tries the maximum value of its # original max_tries or the last attempted try number. - ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries) + ti.max_tries = max(ti.max_tries, ti.try_number) ti.state = None ti.external_executor_id = None ti.clear_next_method_args() @@ -539,7 +539,7 @@ def _refresh_from_db( task_instance.end_date = ti.end_date task_instance.duration = ti.duration task_instance.state = ti.state - task_instance.try_number = _get_private_try_number(task_instance=ti) + task_instance.try_number = ti.try_number task_instance.max_tries = ti.max_tries task_instance.hostname = ti.hostname task_instance.unixname = ti.unixname @@ -928,53 +928,6 @@ def _handle_failure( TaskInstance.save_to_db(failure_context["ti"], session) -def _get_try_number(*, task_instance: TaskInstance): - """ - Return the try number that a task number will be when it is actually run. - - If the TaskInstance is currently running, this will match the column in the - database, in all other cases this will be incremented. - - This is designed so that task logs end up in the right file. - - :param task_instance: the task instance - - :meta private: - """ - if task_instance.state == TaskInstanceState.RUNNING: - return task_instance._try_number - return task_instance._try_number + 1 - - -def _get_private_try_number(*, task_instance: TaskInstance | TaskInstancePydantic): - """ - Opposite of _get_try_number. - - Given the value returned by try_number, return the value of _try_number that - should produce the same result. - This is needed for setting _try_number on TaskInstance from the value on PydanticTaskInstance, which has no private attrs. - - :param task_instance: the task instance - - :meta private: - """ - if task_instance.state == TaskInstanceState.RUNNING: - return task_instance.try_number - return task_instance.try_number - 1 - - -def _set_try_number(*, task_instance: TaskInstance | TaskInstancePydantic, value: int) -> None: - """ - Set a task try number. - - :param task_instance: the task instance - :param value: the try number - - :meta private: - """ - task_instance._try_number = value # type: ignore[union-attr] - - def _refresh_from_task( *, task_instance: TaskInstance | TaskInstancePydantic, task: Operator, pool_override: str | None = None ) -> None: @@ -1164,13 +1117,10 @@ def _get_email_subject_content( 'Mark success: Link
' ) - # This function is called after changing the state from RUNNING, - # so we need to subtract 1 from self.try_number here. - current_try_number = task_instance.try_number - 1 additional_context: dict[str, Any] = { "exception": exception, "exception_html": exception_html, - "try_number": current_try_number, + "try_number": task_instance.try_number, "max_tries": task_instance.max_tries, } @@ -1343,7 +1293,7 @@ class TaskInstance(Base, LoggingMixin): end_date = Column(UtcDateTime) duration = Column(Float) state = Column(String(20)) - _try_number = Column("try_number", Integer, default=0) + try_number = Column(Integer, default=0) max_tries = Column(Integer, server_default=text("-1")) hostname = Column(String(1000)) unixname = Column(String(1000)) @@ -1508,6 +1458,26 @@ def __init__( def __hash__(self): return hash((self.task_id, self.dag_id, self.run_id, self.map_index)) + @property + @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning) + def _try_number(self): + """ + Do not use. For semblance of backcompat. + + :meta private: + """ + return self.try_number + + @_try_number.setter + @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning) + def _try_number(self, val): + """ + Do not use. For semblance of backcompat. + + :meta private: + """ + self.try_number = val + @property def stats_tags(self) -> dict[str, str]: """Returns task instance tags.""" @@ -1527,7 +1497,7 @@ def insert_mapping(run_id: str, task: Operator, map_index: int) -> dict[str, Any "dag_id": task.dag_id, "task_id": task.task_id, "run_id": run_id, - "_try_number": 0, + "try_number": 0, "hostname": "", "unixname": getuser(), "queue": task.queue, @@ -1549,53 +1519,22 @@ def init_on_load(self) -> None: """Initialize the attributes that aren't stored in the DB.""" self.test_mode = False # can be changed when calling 'run' - @hybrid_property - def try_number(self): - """ - Return the try number that a task number will be when it is actually run. - - If the TaskInstance is currently running, this will match the column in the - database, in all other cases this will be incremented. - - This is designed so that task logs end up in the right file. - """ - return _get_try_number(task_instance=self) - - @try_number.expression - def try_number(cls): - """ - Return the expression to be used by SQLAlchemy when filtering on try_number. - - This is required because the override in the get_try_number function causes - try_number values to be off by one when listing tasks in the UI. - - :meta private: - """ - return cls._try_number - - @try_number.setter - def try_number(self, value: int) -> None: - """ - Set a task try number. - - :param value: the try number - """ - _set_try_number(task_instance=self, value=value) - @property + @deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning) def prev_attempted_tries(self) -> int: """ - Calculate the number of previously attempted tries, defaulting to 0. + Calculate the total number of attempted tries, defaulting to 0. + + This used to be necessary because try_number did not always tell the truth. - Expose this for the Task Tries and Gantt graph views. - Using `try_number` throws off the counts for non-running tasks. - Also useful in error logging contexts to get the try number for the last try that was attempted. + :meta private: """ - return self._try_number + return self.try_number @property def next_try_number(self) -> int: - return self._try_number + 1 + # todo (dstandish): deprecate this property; we don't need a property that is just + 1 + return self.try_number + 1 @property def operator_name(self) -> str | None: @@ -2178,7 +2117,9 @@ def next_retry_datetime(self): # If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus, # we must round up prior to converting to an int, otherwise a divide by zero error # will occur in the modded_hash calculation. - min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 2))) + # this probably gives unexpected results if a task instance has previously been cleared, + # because try_number can increase without bound + min_backoff = math.ceil(delay.total_seconds() * (2 ** (self.try_number - 1))) # In the case when delay.total_seconds() is 0, min_backoff will not be rounded up to 1. # To address this, we impose a lower bound of 1 on min_backoff. This effectively makes @@ -2372,7 +2313,6 @@ def _check_and_change_state_before_execution( cls.logger().info("Resuming after deferral") else: cls.logger().info("Starting attempt %s of %s", ti.try_number, ti.max_tries + 1) - ti._try_number += 1 if not test_mode: session.add(Log(TaskInstanceState.RUNNING.value, ti)) @@ -2791,9 +2731,6 @@ def defer_task(self, session: Session, defer: TaskDeferred) -> None: self.next_method = defer.method_name self.next_kwargs = defer.kwargs or {} - # Decrement try number so the next one is the same try - self._try_number -= 1 - # Calculate timeout too if it was passed if defer.timeout is not None: self.trigger_timeout = timezone.utcnow() + defer.timeout @@ -2910,7 +2847,7 @@ def _handle_reschedule( self.task_id, self.dag_id, self.run_id, - self._try_number, + self.try_number, actual_start_date, self.end_date, reschedule_exception.reschedule_date, @@ -2921,10 +2858,6 @@ def _handle_reschedule( # set state self.state = TaskInstanceState.UP_FOR_RESCHEDULE - # Decrement try_number so subsequent runs will use the same try number and write - # to same log file. - self._try_number -= 1 - self.clear_next_method_args() session.merge(self) @@ -3040,7 +2973,6 @@ def fetch_handle_failure_context( # e.g. we could make refresh_from_db return a TI and replace ti with that raise RuntimeError("Expected TaskInstance here. Further AIP-44 work required.") # We increase the try_number to fail the task if it fails to start after sometime - ti._try_number += 1 ti.state = State.UP_FOR_RETRY email_for_state = operator.attrgetter("email_on_retry") callbacks = task.on_retry_callback if task else None diff --git a/airflow/models/taskinstancekey.py b/airflow/models/taskinstancekey.py index 50906e47b0a31..b705ecbe8785d 100644 --- a/airflow/models/taskinstancekey.py +++ b/airflow/models/taskinstancekey.py @@ -37,6 +37,7 @@ def primary(self) -> tuple[str, str, str, int]: @property def reduced(self) -> TaskInstanceKey: """Remake the key by subtracting 1 from try number to match in memory information.""" + # todo (dstandish): remove this property return TaskInstanceKey( self.dag_id, self.task_id, self.run_id, max(1, self.try_number - 1), self.map_index ) diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py index c5e7e3d6b46c3..ca7047bb1ca22 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py @@ -527,7 +527,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task ti.queue, ti.command_as_list(), ti.executor_config, - ti.prev_attempted_tries, + ti.try_number, ) adopted_tis.append(ti) diff --git a/airflow/providers/dbt/cloud/CHANGELOG.rst b/airflow/providers/dbt/cloud/CHANGELOG.rst index cae2a00c3c056..bd5e64da00d1a 100644 --- a/airflow/providers/dbt/cloud/CHANGELOG.rst +++ b/airflow/providers/dbt/cloud/CHANGELOG.rst @@ -28,6 +28,11 @@ Changelog --------- +main +..... + +In Airflow 2.10.0, we fix the way try_number works, so that it no longer returns different values depending on task instance state. Importantly, after the task is done, it no longer shows current_try + 1. Thus in 3.8.1 we patch this provider to fix try_number references so they no longer adjust for the old, bad behavior. + 3.8.0 ..... diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py b/airflow/providers/dbt/cloud/utils/openlineage.py index 5e4550b677697..ad50552b5146c 100644 --- a/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/airflow/providers/dbt/cloud/utils/openlineage.py @@ -21,6 +21,8 @@ from contextlib import suppress from typing import TYPE_CHECKING +from airflow import __version__ as airflow_version + if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator @@ -28,6 +30,16 @@ from airflow.providers.openlineage.extractors.base import OperatorLineage +def _get_try_number(val): + # todo: remove when min airflow version >= 2.10.0 + from packaging.version import parse + + if parse(parse(airflow_version).base_version) < parse("2.10.0"): + return val.try_number - 1 + else: + return val.try_number + + def generate_openlineage_events_from_dbt_cloud_run( operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance ) -> OperatorLineage: @@ -131,7 +143,7 @@ async def get_artifacts_for_steps(steps, artifacts): dag_id=task_instance.dag_id, task_id=operator.task_id, execution_date=task_instance.execution_date, - try_number=task_instance.try_number - 1, + try_number=_get_try_number(task_instance), ) parent_job = ParentRunMetadata( diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index cfc7b49e9edd7..82cc887553d65 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -108,7 +108,7 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: .one_or_none() ) if isinstance(val, TaskInstance): - val._try_number = ti.try_number + val.try_number = ti.try_number return val else: raise AirflowException(f"Could not find TaskInstance for {ti}") diff --git a/airflow/providers/openlineage/CHANGELOG.rst b/airflow/providers/openlineage/CHANGELOG.rst index 4e1f6ff80dd8f..882c12139d1d3 100644 --- a/airflow/providers/openlineage/CHANGELOG.rst +++ b/airflow/providers/openlineage/CHANGELOG.rst @@ -26,6 +26,11 @@ Changelog --------- +main +..... + +In Airflow 2.10.0, we fix the way try_number works, so that it no longer returns different values depending on task instance state. Importantly, after the task is done, it no longer shows current_try + 1. Thus in 1.7.2 we patch this provider to fix try_number references so they no longer adjust for the old, bad behavior. + 1.7.1 ..... diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 25ded6d7f4361..03c60059d6686 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -23,6 +23,7 @@ from openlineage.client.serde import Serde +from airflow import __version__ as airflow_version from airflow.listeners import hookimpl from airflow.providers.openlineage.extractors import ExtractorManager from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState @@ -45,6 +46,16 @@ _openlineage_listener: OpenLineageListener | None = None +def _get_try_number_success(val): + # todo: remove when min airflow version >= 2.10.0 + from packaging.version import parse + + if parse(parse(airflow_version).base_version) < parse("2.10.0"): + return val.try_number - 1 + else: + return val.try_number + + class OpenLineageListener: """OpenLineage listener sends events on task instance and dag run starts, completes and failures.""" @@ -165,7 +176,7 @@ def on_success(): dag_id=dag.dag_id, task_id=task.task_id, execution_date=task_instance.execution_date, - try_number=task_instance.try_number - 1, + try_number=_get_try_number_success(task_instance), ) event_type = RunState.COMPLETE.value.lower() operator_name = task.task_type.lower() diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 8e13279babb2d..5f49d0096e44b 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -241,11 +241,11 @@ def execute(self, context: Context) -> Any: started_at: datetime.datetime | float if self.reschedule: - # If reschedule, use the start date of the first try (first try can be either the very - # first execution of the task, or the first execution after the task was cleared.) ti = context["ti"] max_tries: int = ti.max_tries or 0 retries: int = self.retries or 0 + # If reschedule, use the start date of the first try (first try can be either the very + # first execution of the task, or the first execution after the task was cleared.) first_try_number = max_tries - retries + 1 start_date = _orig_start_date( dag_id=ti.dag_id, diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 2a1dfd25f6410..043f59d52e895 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -145,8 +145,7 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance | TaskInstancePydantic, sessio Will raise exception if no TI is found in the database. """ - from airflow.models.taskinstance import TaskInstance, _get_private_try_number - from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic + from airflow.models.taskinstance import TaskInstance if isinstance(ti, TaskInstance): return ti @@ -162,10 +161,7 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance | TaskInstancePydantic, sessio ) if not val: raise AirflowException(f"Could not find TaskInstance for {ti}") - if isinstance(ti, TaskInstancePydantic): - val.try_number = _get_private_try_number(task_instance=ti) - else: # TaskInstanceKey - val.try_number = ti.try_number + val.try_number = ti.try_number return val diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 513b453006fd1..4235ab597f9ea 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -147,7 +147,7 @@ def get_mapped_summary(parent_instance, task_instances): "start_date": group_start_date, "end_date": group_end_date, "mapped_states": mapped_states, - "try_number": get_try_count(parent_instance._try_number, parent_instance.state), + "try_number": get_try_count(parent_instance.try_number, parent_instance.state), "execution_date": parent_instance.execution_date, } diff --git a/airflow/www/views.py b/airflow/www/views.py index 682a70c8ea0fe..131b25f20be9f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -314,7 +314,7 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session) -> TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, - TaskInstance._try_number, + TaskInstance.try_number, func.min(TaskInstanceNote.content).label("note"), func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"), func.min(TaskInstance.queued_dttm).label("queued_dttm"), @@ -326,7 +326,7 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session) -> TaskInstance.dag_id == dag.dag_id, TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), ) - .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance._try_number) + .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance.try_number) .order_by(TaskInstance.task_id, TaskInstance.run_id) ) @@ -409,7 +409,7 @@ def set_overall_state(record): "queued_dttm": task_instance.queued_dttm, "start_date": task_instance.start_date, "end_date": task_instance.end_date, - "try_number": wwwutils.get_try_count(task_instance._try_number, task_instance.state), + "try_number": wwwutils.get_try_count(task_instance.try_number, task_instance.state), "note": task_instance.note, } for task_instance in grouped_tis[item.task_id] @@ -1687,7 +1687,7 @@ def log(self, session: Session = NEW_SESSION): num_logs = 0 if ti is not None: - num_logs = wwwutils.get_try_count(ti._try_number, ti.state) + num_logs = wwwutils.get_try_count(ti.try_number, ti.state) logs = [""] * num_logs root = request.args.get("root", "") return self.render_template( @@ -1788,7 +1788,7 @@ def task(self, session: Session = NEW_SESSION): warnings.simplefilter("ignore", RemovedInAirflow3Warning) all_ti_attrs = ( # fetching the value of _try_number to be shown under name try_number in UI - (name, getattr(ti, "_try_number" if name == "try_number" else name)) + (name, getattr(ti, name)) for name in dir(ti) if not name.startswith("_") and name not in ti_attrs_to_skip ) @@ -5196,7 +5196,7 @@ class TaskInstanceModelView(AirflowModelView): "pool", "queued_by_job_id", ] - + # todo: don't use prev_attempted_tries; just use try_number label_columns = {"dag_run.execution_date": "Logical Date", "prev_attempted_tries": "Try Number"} search_columns = [ diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 775217e872b82..2d10cdac6fef4 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -62,6 +62,7 @@ def create_context(task) -> Context: task_instance = TaskInstance(task=task) task_instance.dag_run = dag_run task_instance.dag_id = dag.dag_id + task_instance.try_number = 1 task_instance.xcom_push = mock.Mock() # type: ignore return Context( dag=dag, diff --git a/newsfragments/39336.significant.rst b/newsfragments/39336.significant.rst new file mode 100644 index 0000000000000..750a1807881e4 --- /dev/null +++ b/newsfragments/39336.significant.rst @@ -0,0 +1,7 @@ +``try_number`` is no longer incremented during task execution + +Previously, the try number (``try_number``) was incremented at the beginning of task execution on the worker. This was problematic for many reasons. For one it meant that the try number was incremented when it was not supposed to, namely when resuming from reschedule or deferral. And it also resulted in the try number being "wrong" when the task had not yet started. The workarounds for these two issues caused a lot of confusion. + +Now, instead, the try number for a task run is determined at the time the task is scheduled, and does not change in flight, and it is never decremented. So after the task runs, the observed try number remains the same as it was when the task was running; only when there is a "new try" will the try number be incremented again. + +One consequence of this change is, if users were "manually" running tasks (e.g. by calling ``ti.run()`` directly, or command line ``airflow tasks run``), try number will no longer be incremented. Airflow assumes that tasks are always run after being scheduled by the scheduler, so we do not regard this as a breaking change. diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index e63f2c3f7e93a..1e7c29cca20db 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -46,7 +46,7 @@ from airflow.operators.bash import BashOperator from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.state import State +from airflow.utils.state import State, TaskInstanceState from airflow.utils.types import DagRunType from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_pools, clear_db_runs @@ -651,6 +651,12 @@ def test_parentdag_downstream_clear(self): task_command.task_clear(args) +def _set_state_and_try_num(ti, session): + ti.state = TaskInstanceState.QUEUED + ti.try_number += 1 + session.commit() + + class TestLogsfromTaskRunCommand: def setup_method(self) -> None: self.dag_id = "test_logging_dag" @@ -668,7 +674,7 @@ def setup_method(self) -> None: dag = DagBag().get_dag(self.dag_id) data_interval = dag.timetable.infer_manual_data_interval(run_after=self.execution_date) - dag.create_dagrun( + self.dr = dag.create_dagrun( run_id=self.run_id, execution_date=self.execution_date, data_interval=data_interval, @@ -676,6 +682,9 @@ def setup_method(self) -> None: state=State.RUNNING, run_type=DagRunType.MANUAL, ) + self.tis = self.dr.get_task_instances() + assert len(self.tis) == 1 + self.ti = self.tis[0] root = self.root_logger = logging.getLogger() self.root_handlers = root.handlers.copy() @@ -757,7 +766,7 @@ def test_external_executor_id_present_for_process_run_task(self, mock_local_job) @pytest.mark.parametrize( "is_k8s, is_container_exec", [("true", "true"), ("true", ""), ("", "true"), ("", "")] ) - def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec): + def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_container_exec, session): """ When running task --local as k8s executor pod, all logging should make it to stdout. Otherwise, all logging after "running TI" is redirected to logs (and the actual log @@ -770,6 +779,9 @@ def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_containe """ import subprocess + ti = self.dr.get_task_instances(session=session)[0] + _set_state_and_try_num(ti, session) # so that try_number is correct + with mock.patch.dict( "os.environ", AIRFLOW_IS_K8S_EXECUTOR_POD=is_k8s, @@ -807,7 +819,9 @@ def test_logging_with_run_task_stdout_k8s_executor_pod(self, is_k8s, is_containe assert len(lines) == 1 @pytest.mark.skipif(not hasattr(os, "fork"), reason="Forking not available") - def test_logging_with_run_task(self): + def test_logging_with_run_task(self, session): + ti = self.dr.get_task_instances(session=session)[0] + _set_state_and_try_num(ti, session) with conf_vars({("core", "dags_folder"): self.dag_path}): task_command.task_run(self.parser.parse_args(self.task_args)) @@ -852,7 +866,10 @@ def test_run_task_with_pool(self): session.commit() @mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False) - def test_logging_with_run_task_subprocess(self): + def test_logging_with_run_task_subprocess(self, session): + ti = self.dr.get_task_instances(session=session)[0] + _set_state_and_try_num(ti, session) + with conf_vars({("core", "dags_folder"): self.dag_path}): task_command.task_run(self.parser.parse_args(self.task_args)) @@ -874,14 +891,14 @@ def test_logging_with_run_task_subprocess(self): f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs ) - def test_log_file_template_with_run_task(self): + def test_log_file_template_with_run_task(self, session): """Verify that the taskinstance has the right context for log_filename_template""" with conf_vars({("core", "dags_folder"): self.dag_path}): # increment the try_number of the task to be run with create_session() as session: ti = session.query(TaskInstance).filter_by(run_id=self.run_id).first() - ti.try_number = 1 + ti.try_number = 2 log_file_path = os.path.join(os.path.dirname(self.ti_log_file_path), "attempt=2.log") diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py index 10c88675ea265..de78912041d55 100644 --- a/tests/core/test_sentry.py +++ b/tests/core/test_sentry.py @@ -38,7 +38,7 @@ DAG_ID = "test_dag" TASK_ID = "test_task" OPERATOR = "PythonOperator" -TRY_NUMBER = 1 +TRY_NUMBER = 0 STATE = State.SUCCESS TEST_SCOPE = { "dag_id": DAG_ID, @@ -149,7 +149,7 @@ def test_add_tagging(self, sentry, task_instance): sentry.add_tagging(task_instance=task_instance) with configure_scope() as scope: for key, value in scope._tags.items(): - assert TEST_SCOPE[key] == value + assert value == TEST_SCOPE[key] @pytest.mark.db_test @time_machine.travel(CRUMB_DATE) diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 108ee9700383c..5b7bc36b4de56 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -202,7 +202,6 @@ def test_backfill_multi_dates(self): ) run_job(job=job, execute_callable=job_runner._execute) - expected_execution_order = [ ("runme_0", DEFAULT_DATE), ("runme_1", DEFAULT_DATE), @@ -217,11 +216,15 @@ def test_backfill_multi_dates(self): ("run_this_last", DEFAULT_DATE), ("run_this_last", end_date), ] - assert [ - ((dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1), (State.SUCCESS, None)) + actual = [(tuple(x), y) for x, y in executor.sorted_tasks] + expected = [ + ( + (dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1), + (State.SUCCESS, None), + ) for (task_id, when) in expected_execution_order - ] == executor.sorted_tasks - + ] + assert actual == expected session = settings.Session() drs = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date).all() @@ -907,10 +910,10 @@ def test_backfill_retry_always_failed_task(self, dag_maker): dr = dag_maker.create_dagrun(state=None) executor = MockExecutor(parallelism=16) - executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=1)] = ( + executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=0)] = ( State.UP_FOR_RETRY ) - executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=2) + executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=1) job = Job(executor=executor) job_runner = BackfillJobRunner( job=job, @@ -952,10 +955,14 @@ def test_backfill_ordered_concurrent_execute(self, dag_maker): runid1 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()}" runid2 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=2)).isoformat()}" - # test executor history keeps a list - history = executor.history - - assert [sorted(item[-1].key[1:3] for item in batch) for batch in history] == [ + actual = [] + for batch in executor.history: + this_batch = [] + for cmd, idx, queue, ti in batch: # noqa: B007 + key = ti.key + this_batch.append((key.task_id, key.run_id)) + actual.append(sorted(this_batch)) + assert actual == [ [ ("leave1", runid0), ("leave1", runid1), @@ -964,9 +971,21 @@ def test_backfill_ordered_concurrent_execute(self, dag_maker): ("leave2", runid1), ("leave2", runid2), ], - [("upstream_level_1", runid0), ("upstream_level_1", runid1), ("upstream_level_1", runid2)], - [("upstream_level_2", runid0), ("upstream_level_2", runid1), ("upstream_level_2", runid2)], - [("upstream_level_3", runid0), ("upstream_level_3", runid1), ("upstream_level_3", runid2)], + [ + ("upstream_level_1", runid0), + ("upstream_level_1", runid1), + ("upstream_level_1", runid2), + ], + [ + ("upstream_level_2", runid0), + ("upstream_level_2", runid1), + ("upstream_level_2", runid2), + ], + [ + ("upstream_level_3", runid0), + ("upstream_level_3", runid1), + ("upstream_level_3", runid2), + ], ] def test_backfill_pooled_tasks(self): @@ -1525,7 +1544,7 @@ def test_update_counters(self, dag_maker, session): # match what's in the in-memory ti_status.running map. This is the same # for skipped, failed and retry states. ti_status.running[ti.key] = ti # Task is queued and marked as running - ti._try_number += 1 # Try number is increased during ti.run() + ti.try_number += 1 ti.set_state(State.SUCCESS, session) # Task finishes with success state job_runner._update_counters(ti_status=ti_status, session=session) # Update counters assert len(ti_status.running) == 0 @@ -1538,7 +1557,7 @@ def test_update_counters(self, dag_maker, session): # Test for success when DB try_number is off from in-memory expectations ti_status.running[ti.key] = ti - ti._try_number += 2 + ti.try_number += 2 ti.set_state(State.SUCCESS, session) job_runner._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 @@ -1551,7 +1570,7 @@ def test_update_counters(self, dag_maker, session): # Test for skipped ti_status.running[ti.key] = ti - ti._try_number += 1 + ti.try_number += 1 ti.set_state(State.SKIPPED, session) job_runner._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 @@ -1564,7 +1583,7 @@ def test_update_counters(self, dag_maker, session): # Test for failed ti_status.running[ti.key] = ti - ti._try_number += 1 + ti.try_number += 1 ti.set_state(State.FAILED, session) job_runner._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 @@ -1577,7 +1596,7 @@ def test_update_counters(self, dag_maker, session): # Test for retry ti_status.running[ti.key] = ti - ti._try_number += 1 + ti.try_number += 1 ti.set_state(State.UP_FOR_RETRY, session) job_runner._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 @@ -1595,9 +1614,6 @@ def test_update_counters(self, dag_maker, session): # and DB representation of the task try_number the _same_, which is unlike # the above cases. But this is okay because the in-memory key is used. ti_status.running[ti.key] = ti # Task queued and marked as running - # Note: Both the increase and decrease are kept here for context - ti._try_number += 1 # Try number is increased during ti.run() - ti._try_number -= 1 # Task is being rescheduled, decrement try_number ti.set_state(State.UP_FOR_RESCHEDULE, session) # Task finishes with reschedule state job_runner._update_counters(ti_status=ti_status, session=session) assert len(ti_status.running) == 0 @@ -1610,10 +1626,6 @@ def test_update_counters(self, dag_maker, session): # test for none ti.set_state(State.NONE, session) - # Setting ti._try_number = 0 brings us to ti.try_number==1 - # so that the in-memory key access will work fine - ti._try_number = 0 - assert ti.try_number == 1 # see ti.try_number property in taskinstance module session.merge(ti) session.commit() ti_status.running[ti.key] = ti @@ -1955,20 +1967,20 @@ def on_change_state(key, state, info=None): ) assert ti_status.failed == set() assert ti_status.succeeded == { - TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=0), - TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=1), - TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=2), + TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=0), + TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=1), + TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=0, map_index=2), TaskInstanceKey( - dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=0 + dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=0 ), TaskInstanceKey( - dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=1 + dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=1 ), TaskInstanceKey( - dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=2 + dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=0, map_index=2 ), TaskInstanceKey( - dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=1, map_index=-1 + dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=0, map_index=-1 ), } @@ -2096,7 +2108,7 @@ def test_backfill_disable_retry(self, dag_maker, disable_retry, try_number, exce run_job(job=job, execute_callable=job_runner._execute) ti = dag_run.get_task_instance(task_id=task1.task_id) - assert ti._try_number == try_number + assert ti.try_number == try_number dag_run.refresh_from_db() diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index f122f122658b5..491e345649fe1 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3180,7 +3180,7 @@ def run_with_error(ti, ignore_ti_state=False): # executing task. run_with_error(ti, ignore_ti_state=True) assert ti.state == State.UP_FOR_RETRY - assert ti.try_number == 2 + assert ti.try_number == 1 with create_session() as session: ti.refresh_from_db(lock_for_update=True, session=session) @@ -3191,6 +3191,7 @@ def run_with_error(ti, ignore_ti_state=False): executor.do_update = True do_schedule() ti.refresh_from_db() + assert ti.try_number == 1 assert ti.state == State.SUCCESS def test_retry_handling_job(self): @@ -3214,8 +3215,6 @@ def test_retry_handling_job(self): .filter(TaskInstance.dag_id == dag.dag_id, TaskInstance.task_id == dag_task1.task_id) .first() ) - # make sure the counter has increased - assert ti.try_number == 2 assert ti.state == State.UP_FOR_RETRY def test_dag_get_active_runs(self, dag_maker): diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 9279c97d533af..df9699806fb4c 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -152,7 +152,7 @@ def __init__(self, password, **kwargs): # give it more time for the trigger event to write the log. time.sleep(0.5) - assert "test_dag/test_run/sensitive_arg_task/-1/1 (ID 1) starting" in caplog.text + assert "test_dag/test_run/sensitive_arg_task/-1/0 (ID 1) starting" in caplog.text assert "some_password" not in caplog.text diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index bce9dc4668a1d..26e3dcc9e4de3 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -25,7 +25,7 @@ from airflow import settings from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances +from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, clear_task_instances from airflow.models.taskreschedule import TaskReschedule from airflow.operators.empty import EmptyOperator from airflow.sensors.python import PythonSensor @@ -68,6 +68,13 @@ def test_clear_task_instances(self, dag_maker): ti1.run() with create_session() as session: + # do the incrementing of try_number ordinarily handled by scheduler + ti0.try_number += 1 + ti1.try_number += 1 + session.merge(ti0) + session.merge(ti1) + session.commit() + # we use order_by(task_id) here because for the test DAG structure of ours # this is equivalent to topological sort. It would not work in general case # but it works for our case because we specifically constructed test DAGS @@ -79,10 +86,10 @@ def test_clear_task_instances(self, dag_maker): ti1.refresh_from_db() # Next try to run will be try 2 assert ti0.state is None - assert ti0.try_number == 2 + assert ti0.try_number == 1 assert ti0.max_tries == 1 assert ti1.state is None - assert ti1.try_number == 2 + assert ti1.try_number == 1 assert ti1.max_tries == 3 def test_clear_task_instances_external_executor_id(self, dag_maker): @@ -279,6 +286,14 @@ def test_clear_task_instances_without_task(self, dag_maker): ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) + with create_session() as session: + # do the incrementing of try_number ordinarily handled by scheduler + ti0.try_number += 1 + ti1.try_number += 1 + session.merge(ti0) + session.merge(ti1) + session.commit() + ti0.run() ti1.run() @@ -298,10 +313,9 @@ def test_clear_task_instances_without_task(self, dag_maker): # When no task is found, max_tries will be maximum of original max_tries or try_number. ti0.refresh_from_db() ti1.refresh_from_db() - # Next try to run will be try 2 - assert ti0.try_number == 2 + assert ti0.try_number == 1 assert ti0.max_tries == 1 - assert ti1.try_number == 2 + assert ti1.try_number == 1 assert ti1.max_tries == 2 def test_clear_task_instances_without_dag(self, dag_maker): @@ -323,6 +337,14 @@ def test_clear_task_instances_without_dag(self, dag_maker): ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) + with create_session() as session: + # do the incrementing of try_number ordinarily handled by scheduler + ti0.try_number += 1 + ti1.try_number += 1 + session.merge(ti0) + session.merge(ti1) + session.commit() + ti0.run() ti1.run() @@ -337,10 +359,9 @@ def test_clear_task_instances_without_dag(self, dag_maker): # When no DAG is found, max_tries will be maximum of original max_tries or try_number. ti0.refresh_from_db() ti1.refresh_from_db() - # Next try to run will be try 2 - assert ti0.try_number == 2 + assert ti0.try_number == 1 assert ti0.max_tries == 1 - assert ti1.try_number == 2 + assert ti1.try_number == 1 assert ti1.max_tries == 2 def test_clear_task_instances_without_dag_param(self, dag_maker, session): @@ -365,6 +386,14 @@ def test_clear_task_instances_without_dag_param(self, dag_maker, session): ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) + with create_session() as session: + # do the incrementing of try_number ordinarily handled by scheduler + ti0.try_number += 1 + ti1.try_number += 1 + session.merge(ti0) + session.merge(ti1) + session.commit() + ti0.run(session=session) ti1.run(session=session) @@ -377,10 +406,9 @@ def test_clear_task_instances_without_dag_param(self, dag_maker, session): ti0.refresh_from_db(session=session) ti1.refresh_from_db(session=session) - # Next try to run will be try 2 - assert ti0.try_number == 2 + assert ti0.try_number == 1 assert ti0.max_tries == 1 - assert ti1.try_number == 2 + assert ti1.try_number == 1 assert ti1.max_tries == 3 def test_clear_task_instances_in_multiple_dags(self, dag_maker, session): @@ -418,6 +446,14 @@ def test_clear_task_instances_in_multiple_dags(self, dag_maker, session): ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) + with create_session() as session: + # do the incrementing of try_number ordinarily handled by scheduler + ti0.try_number += 1 + ti1.try_number += 1 + session.merge(ti0) + session.merge(ti1) + session.commit() + ti0.run(session=session) ti1.run(session=session) @@ -426,10 +462,9 @@ def test_clear_task_instances_in_multiple_dags(self, dag_maker, session): ti0.refresh_from_db(session=session) ti1.refresh_from_db(session=session) - # Next try to run will be try 2 - assert ti0.try_number == 2 + assert ti0.try_number == 1 assert ti0.max_tries == 1 - assert ti1.try_number == 2 + assert ti1.try_number == 1 assert ti1.max_tries == 3 def test_clear_task_instances_with_task_reschedule(self, dag_maker): @@ -451,6 +486,15 @@ def test_clear_task_instances_with_task_reschedule(self, dag_maker): ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) + + with create_session() as session: + # do the incrementing of try_number ordinarily handled by scheduler + ti0.try_number += 1 + ti1.try_number += 1 + session.merge(ti0) + session.merge(ti1) + session.commit() + ti0.run() ti1.run() @@ -500,37 +544,35 @@ def test_dag_clear(self, dag_maker): ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id) ti0.refresh_from_task(task0) ti1.refresh_from_task(task1) - + session.get(TaskInstance, ti0.key.primary).try_number += 1 + session.commit() # Next try to run will be try 1 assert ti0.try_number == 1 ti0.run() - assert ti0.try_number == 2 + assert ti0.try_number == 1 dag.clear() ti0.refresh_from_db() - assert ti0.try_number == 2 + assert ti0.try_number == 1 assert ti0.state == State.NONE assert ti0.max_tries == 1 assert ti1.max_tries == 2 - ti1.try_number = 1 - session.merge(ti1) + session.get(TaskInstance, ti1.key.primary).try_number += 1 session.commit() - - # Next try will be 2 ti1.run() - assert ti1.try_number == 3 + assert ti1.try_number == 1 assert ti1.max_tries == 2 dag.clear() ti0.refresh_from_db() ti1.refresh_from_db() - # after clear dag, ti2 should show attempt 3 of 5 - assert ti1.max_tries == 4 - assert ti1.try_number == 3 - # after clear dag, ti1 should show attempt 2 of 2 - assert ti0.try_number == 2 + # after clear dag, we have 2 remaining tries + assert ti1.max_tries == 3 + assert ti1.try_number == 1 + # after clear dag, ti0 has no remaining tries + assert ti0.try_number == 1 assert ti0.max_tries == 1 def test_dags_clear(self): @@ -559,9 +601,11 @@ def test_dags_clear(self): # test clear all dags for i in range(num_of_dags): + session.get(TaskInstance, tis[i].key.primary).try_number += 1 + session.commit() tis[i].run() assert tis[i].state == State.SUCCESS - assert tis[i].try_number == 2 + assert tis[i].try_number == 1 assert tis[i].max_tries == 0 DAG.clear_dags(dags) @@ -569,14 +613,16 @@ def test_dags_clear(self): for i in range(num_of_dags): tis[i].refresh_from_db() assert tis[i].state == State.NONE - assert tis[i].try_number == 2 + assert tis[i].try_number == 1 assert tis[i].max_tries == 1 # test dry_run for i in range(num_of_dags): + session.get(TaskInstance, tis[i].key.primary).try_number += 1 + session.commit() tis[i].run() assert tis[i].state == State.SUCCESS - assert tis[i].try_number == 3 + assert tis[i].try_number == 2 assert tis[i].max_tries == 1 DAG.clear_dags(dags, dry_run=True) @@ -584,7 +630,7 @@ def test_dags_clear(self): for i in range(num_of_dags): tis[i].refresh_from_db() assert tis[i].state == State.SUCCESS - assert tis[i].try_number == 3 + assert tis[i].try_number == 2 assert tis[i].max_tries == 1 # test only_failed @@ -599,14 +645,14 @@ def test_dags_clear(self): ti.refresh_from_db() if ti is failed_dag: assert ti.state == State.NONE - assert ti.try_number == 3 + assert ti.try_number == 2 assert ti.max_tries == 2 else: assert ti.state == State.SUCCESS - assert ti.try_number == 3 + assert ti.try_number == 2 assert ti.max_tries == 1 - def test_operator_clear(self, dag_maker): + def test_operator_clear(self, dag_maker, session): with dag_maker( "test_operator_clear", start_date=DEFAULT_DATE, @@ -625,18 +671,27 @@ def test_operator_clear(self, dag_maker): ti1.task = op1 ti2.task = op2 + session.get(TaskInstance, ti2.key.primary).try_number += 1 + session.commit() ti2.run() # Dependency not met assert ti2.try_number == 1 assert ti2.max_tries == 1 op2.clear(upstream=True) + # max tries will be set to retries + curr try number == 1 + 1 == 2 + assert session.get(TaskInstance, ti2.key.primary).max_tries == 2 + + session.get(TaskInstance, ti1.key.primary).try_number += 1 + session.commit() ti1.run() + assert ti1.try_number == 1 + + session.get(TaskInstance, ti2.key.primary).try_number += 1 + session.commit() ti2.run(ignore_ti_state=True) - assert ti1.try_number == 2 # max_tries is 0 because there is no task instance in db for ti1 # so clear won't change the max_tries. assert ti1.max_tries == 0 assert ti2.try_number == 2 - # try_number (0) + retries(1) - assert ti2.max_tries == 1 + assert ti2.max_tries == 2 # max tries has not changed since it was updated when op2.clear called diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index aa6be108a1019..bb134d8728077 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3257,7 +3257,13 @@ def test_dag_timetable_change_after_init(timetable): assert not dag._check_schedule_interval_matches_timetable() -@pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ("test-run-id", None)]) +@pytest.mark.parametrize( + "run_id, execution_date", + [ + (None, datetime_tz(2020, 1, 1)), + ("test-run-id", None), + ], +) def test_set_task_instance_state(run_id, execution_date, session, dag_maker): """Test that set_task_instance_state updates the TaskInstance state and clear downstream failed""" @@ -3321,7 +3327,9 @@ def get_ti_from_db(task): # dagrun should be set to QUEUED assert dagrun.get_state() == State.QUEUED - assert {t.key for t in altered} == {("test_set_task_instance_state", "task_1", dagrun.run_id, 1, -1)} + assert {tuple(t.key) for t in altered} == { + ("test_set_task_instance_state", "task_1", dagrun.run_id, 0, -1) + } def test_set_task_instance_state_mapped(dag_maker, session): @@ -3472,8 +3480,8 @@ def get_ti_from_db(task): assert dagrun.get_state() == State.QUEUED assert {t.key for t in altered} == { - ("test_set_task_group_state", "section_1.task_1", dagrun.run_id, 1, -1), - ("test_set_task_group_state", "section_1.task_3", dagrun.run_id, 1, -1), + ("test_set_task_group_state", "section_1.task_1", dagrun.run_id, 0, -1), + ("test_set_task_group_state", "section_1.task_3", dagrun.run_id, 0, -1), } diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 11d833a21c502..d5e72f96d8d5b 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -29,7 +29,7 @@ from traceback import format_exception from typing import cast from unittest import mock -from unittest.mock import MagicMock, call, mock_open, patch +from unittest.mock import call, mock_open, patch from uuid import uuid4 import pendulum @@ -66,8 +66,6 @@ TaskInstance, TaskInstance as TI, TaskInstanceNote, - _get_private_try_number, - _get_try_number, _run_finished_callback, ) from airflow.models.taskmap import TaskMap @@ -623,23 +621,33 @@ def run_with_error(ti): ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0] ti.task = task - assert ti.try_number == 1 + with create_session() as session: + session.get(TaskInstance, ti.key.primary).try_number += 1 + # first run -- up for retry run_with_error(ti) assert ti.state == State.UP_FOR_RETRY - assert ti.try_number == 2 + assert ti.try_number == 1 + + with create_session() as session: + session.get(TaskInstance, ti.key.primary).try_number += 1 # second run -- still up for retry because retry_delay hasn't expired time_machine.coordinates.shift(3) run_with_error(ti) assert ti.state == State.UP_FOR_RETRY + assert ti.try_number == 2 + + with create_session() as session: + session.get(TaskInstance, ti.key.primary).try_number += 1 # third run -- failed time_machine.coordinates.shift(datetime.datetime.resolution) run_with_error(ti) assert ti.state == State.FAILED + assert ti.try_number == 3 - def test_retry_handling(self, dag_maker): + def test_retry_handling(self, dag_maker, session): """ Test that task retries are handled properly """ @@ -663,36 +671,44 @@ def run_with_error(ti): ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0] ti.task = task - assert ti.try_number == 1 + assert ti.try_number == 0 + + session.get(TaskInstance, ti.key.primary).try_number += 1 + session.commit() # first run -- up for retry run_with_error(ti) assert ti.state == State.UP_FOR_RETRY - assert ti._try_number == 1 - assert ti.try_number == 2 + assert ti.try_number == 1 + + session.get(TaskInstance, ti.key.primary).try_number += 1 + session.commit() # second run -- fail run_with_error(ti) assert ti.state == State.FAILED - assert ti._try_number == 2 - assert ti.try_number == 3 + assert ti.try_number == 2 # Clear the TI state since you can't run a task with a FAILED state without # clearing it first dag.clear() + session.get(TaskInstance, ti.key.primary).try_number += 1 + session.commit() + # third run -- up for retry run_with_error(ti) assert ti.state == State.UP_FOR_RETRY - assert ti._try_number == 3 - assert ti.try_number == 4 + assert ti.try_number == 3 + + session.get(TaskInstance, ti.key.primary).try_number += 1 + session.commit() # fourth run -- fail run_with_error(ti) ti.refresh_from_db() assert ti.state == State.FAILED - assert ti._try_number == 4 - assert ti.try_number == 5 + assert ti.try_number == 4 assert RenderedTaskInstanceFields.get_templated_fields(ti) == expected_rendered_ti_fields def test_next_retry_datetime(self, dag_maker): @@ -783,8 +799,12 @@ def func(): ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0] ti.task = task - assert ti._try_number == 0 - assert ti.try_number == 1 + assert ti.try_number == 0 + + date1 = timezone.utcnow() + date2 = date1 + datetime.timedelta(minutes=1) + date3 = date2 + datetime.timedelta(minutes=1) + date4 = date3 + datetime.timedelta(minutes=1) def run_ti_and_assert( run_date, @@ -796,30 +816,29 @@ def run_ti_and_assert( expected_task_reschedule_count, ): with time_machine.travel(run_date, tick=False): + exc = None try: ti.run() - except AirflowException: + except AirflowException as e: + exc = e if not fail: raise + if exc and not fail: + raise RuntimeError("expected to fail") ti.refresh_from_db() assert ti.state == expected_state - assert ti._try_number == expected_try_number - assert ti.try_number == expected_try_number + 1 + assert ti.try_number == expected_try_number assert ti.start_date == expected_start_date assert ti.end_date == expected_end_date assert ti.duration == expected_duration assert len(task_reschedules_for_ti(ti)) == expected_task_reschedule_count - date1 = timezone.utcnow() - date2 = date1 + datetime.timedelta(minutes=1) - date3 = date2 + datetime.timedelta(minutes=1) - date4 = date3 + datetime.timedelta(minutes=1) - # Run with multiple reschedules. # During reschedule the try number remains the same, but each reschedule is recorded. # The start date is expected to remain the initial date, hence the duration increases. - # When finished the try number is incremented and there is no reschedule expected - # for this try. + # When there's a new try (task run following something other than a reschedule), then + # the scheduler will increment the try_number. We do that inline here since + # we're not using the scheduler. done, fail = False, False run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 0, 1) @@ -831,29 +850,37 @@ def run_ti_and_assert( run_ti_and_assert(date3, date1, date3, 120, State.UP_FOR_RESCHEDULE, 0, 3) done, fail = True, False - run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 1, 0) + run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 0, 3) # Clear the task instance. dag.clear() ti.refresh_from_db() assert ti.state == State.NONE - assert ti._try_number == 1 + assert ti.try_number == 0 + + # We will run it again with reschedules and a retry. + + # We increment the try number because that's what the scheduler would do + with create_session() as session: + session.get(TaskInstance, ti.key.primary).try_number += 1 - # Run again after clearing with reschedules and a retry. - # The retry increments the try number, and for that try no reschedule is expected. # After the retry the start date is reset, hence the duration is also reset. done, fail = False, False run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 1, 1) done, fail = False, True - run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 2, 0) + run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 1, 1) + + # scheduler would create a new try here + with create_session() as session: + session.get(TaskInstance, ti.key.primary).try_number += 1 done, fail = False, False run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1) done, fail = True, False - run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0) + run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1) def test_mapped_reschedule_handling(self, dag_maker, task_reschedules_for_ti): """ @@ -880,8 +907,7 @@ def func(): ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0] ti.task = task - assert ti._try_number == 0 - assert ti.try_number == 1 + assert ti.try_number == 0 def run_ti_and_assert( run_date, @@ -901,8 +927,7 @@ def run_ti_and_assert( raise ti.refresh_from_db() assert ti.state == expected_state - assert ti._try_number == expected_try_number - assert ti.try_number == expected_try_number + 1 + assert ti.try_number == expected_try_number assert ti.start_date == expected_start_date assert ti.end_date == expected_end_date assert ti.duration == expected_duration @@ -929,29 +954,36 @@ def run_ti_and_assert( run_ti_and_assert(date3, date1, date3, 120, State.UP_FOR_RESCHEDULE, 0, 3) done, fail = True, False - run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 1, 0) + run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 0, 3) # Clear the task instance. dag.clear() ti.refresh_from_db() assert ti.state == State.NONE - assert ti._try_number == 1 + assert ti.try_number == 0 # Run again after clearing with reschedules and a retry. - # The retry increments the try number, and for that try no reschedule is expected. + + # We increment the try number because that's what the scheduler would do + with create_session() as session: + session.get(TaskInstance, ti.key.primary).try_number += 1 + # After the retry the start date is reset, hence the duration is also reset. done, fail = False, False run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 1, 1) done, fail = False, True - run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 2, 0) + run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 1, 1) + + with create_session() as session: + session.get(TaskInstance, ti.key.primary).try_number += 1 done, fail = False, False run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1) done, fail = True, False - run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0) + run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 2, 1) @pytest.mark.usefixtures("test_pool") def test_mapped_task_reschedule_handling_clear_reschedules(self, dag_maker, task_reschedules_for_ti): @@ -978,8 +1010,7 @@ def func(): ).expand(poke_interval=[0]) ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0] ti.task = task - assert ti._try_number == 0 - assert ti.try_number == 1 + assert ti.try_number == 0 def run_ti_and_assert( run_date, @@ -999,8 +1030,7 @@ def run_ti_and_assert( raise ti.refresh_from_db() assert ti.state == expected_state - assert ti._try_number == expected_try_number - assert ti.try_number == expected_try_number + 1 + assert ti.try_number == expected_try_number assert ti.start_date == expected_start_date assert ti.end_date == expected_end_date assert ti.duration == expected_duration @@ -1015,7 +1045,7 @@ def run_ti_and_assert( dag.clear() ti.refresh_from_db() assert ti.state == State.NONE - assert ti._try_number == 0 + assert ti.try_number == 0 # Check that reschedules for ti have also been cleared. assert not task_reschedules_for_ti(ti) @@ -1045,8 +1075,7 @@ def func(): ) ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0] ti.task = task - assert ti._try_number == 0 - assert ti.try_number == 1 + assert ti.try_number == 0 def run_ti_and_assert( run_date, @@ -1065,8 +1094,7 @@ def run_ti_and_assert( raise ti.refresh_from_db() assert ti.state == expected_state - assert ti._try_number == expected_try_number - assert ti.try_number == expected_try_number + 1 + assert ti.try_number == expected_try_number assert ti.start_date == expected_start_date assert ti.end_date == expected_end_date assert ti.duration == expected_duration @@ -1081,7 +1109,7 @@ def run_ti_and_assert( dag.clear() ti.refresh_from_db() assert ti.state == State.NONE - assert ti._try_number == 0 + assert ti.try_number == 0 # Check that reschedules for ti have also been cleared. assert not task_reschedules_for_ti(ti) @@ -1803,12 +1831,12 @@ def test_check_and_change_state_before_execution(self, create_task_instance): serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id) - assert ti_from_deserialized_task._try_number == 0 + assert ti_from_deserialized_task.try_number == 0 assert ti_from_deserialized_task.check_and_change_state_before_execution() # State should be running, and try_number column should be incremented assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id assert ti_from_deserialized_task.state == State.RUNNING - assert ti_from_deserialized_task._try_number == 1 + assert ti_from_deserialized_task.try_number == 0 def test_check_and_change_state_before_execution_provided_id_overrides(self, create_task_instance): expected_external_executor_id = "banana" @@ -1822,14 +1850,14 @@ def test_check_and_change_state_before_execution_provided_id_overrides(self, cre serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id) - assert ti_from_deserialized_task._try_number == 0 + assert ti_from_deserialized_task.try_number == 0 assert ti_from_deserialized_task.check_and_change_state_before_execution( external_executor_id=expected_external_executor_id ) # State should be running, and try_number column should be incremented assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id assert ti_from_deserialized_task.state == State.RUNNING - assert ti_from_deserialized_task._try_number == 1 + assert ti_from_deserialized_task.try_number == 0 def test_check_and_change_state_before_execution_with_exec_id(self, create_task_instance): expected_external_executor_id = "minions" @@ -1840,14 +1868,14 @@ def test_check_and_change_state_before_execution_with_exec_id(self, create_task_ serialized_dag = SerializedDagModel.get(ti.task.dag.dag_id).dag ti_from_deserialized_task = TI(task=serialized_dag.get_task(ti.task_id), run_id=ti.run_id) - assert ti_from_deserialized_task._try_number == 0 + assert ti_from_deserialized_task.try_number == 0 assert ti_from_deserialized_task.check_and_change_state_before_execution( external_executor_id=expected_external_executor_id ) - # State should be running, and try_number column should be incremented + # State should be running, and try_number column should be unchanged assert ti_from_deserialized_task.external_executor_id == expected_external_executor_id assert ti_from_deserialized_task.state == State.RUNNING - assert ti_from_deserialized_task._try_number == 1 + assert ti_from_deserialized_task.try_number == 0 def test_check_and_change_state_before_execution_dep_not_met(self, create_task_instance): ti = create_task_instance(dag_id="test_check_and_change_state_before_execution") @@ -1895,12 +1923,14 @@ def test_try_number(self, create_task_instance): Test the try_number accessor behaves in various running states """ ti = create_task_instance(dag_id="test_check_and_change_state_before_execution") - assert 1 == ti.try_number + # TI starts at 0. It's only incremented by the scheduler. + assert ti.try_number == 0 ti.try_number = 2 + assert ti.try_number == 2 ti.state = State.RUNNING - assert 2 == ti.try_number + assert ti.try_number == 2 # unaffected by state ti.state = State.SUCCESS - assert 3 == ti.try_number + assert ti.try_number == 2 # unaffected by state def test_get_num_running_task_instances(self, create_task_instance): session = settings.Session() @@ -2047,7 +2077,7 @@ def test_email_alert(self, mock_send_email, dag_maker, use_native_obj): assert email == "to" assert "test_email_alert" in title assert "test_email_alert" in body - assert "Try 1" in body + assert "Try 0" in body @conf_vars( { @@ -2130,7 +2160,7 @@ def test_email_alert(x): (email, title, body), _ = mock_send_email.call_args assert email == "to" assert title == f"Airflow alert: " - assert body.startswith("Try 1") + assert body.startswith("Try 0") # try number only incremented by the scheduler assert "test_email_alert" in body tf = ( @@ -3018,7 +3048,7 @@ def test_handle_failure(self, create_dummy_dag, session=None): assert "task_instance" in context_arg_3 mock_on_retry_3.assert_not_called() - def test_handle_failure_updates_queued_task_try_number(self, dag_maker): + def test_handle_failure_updates_queued_task_updates_state(self, dag_maker): session = settings.Session() with dag_maker(): task = EmptyOperator(task_id="mytask", retries=1) @@ -3028,13 +3058,8 @@ def test_handle_failure_updates_queued_task_try_number(self, dag_maker): session.merge(ti) session.flush() assert ti.state == State.QUEUED - assert ti.try_number == 1 ti.handle_failure("test queued ti", test_mode=True) assert ti.state == State.UP_FOR_RETRY - # Assert that 'ti._try_number' is bumped from 0 to 1. This is the last/current try - assert ti._try_number == 1 - # Check 'ti.try_number' is bumped to 2. This is try_number for next run - assert ti.try_number == 2 @patch.object(Stats, "incr") def test_handle_failure_no_task(self, Stats_incr, dag_maker): @@ -3047,6 +3072,7 @@ def test_handle_failure_no_task(self, Stats_incr, dag_maker): task = EmptyOperator(task_id="mytask", retries=1) dr = dag_maker.create_dagrun() ti = TI(task=task, run_id=dr.run_id) + ti.try_number += 1 ti = session.merge(ti) ti.task = None ti.state = State.QUEUED @@ -3060,10 +3086,8 @@ def test_handle_failure_no_task(self, Stats_incr, dag_maker): ti.handle_failure("test queued ti", test_mode=False) assert ti.state == State.UP_FOR_RETRY - # Assert that 'ti._try_number' is bumped from 0 to 1. This is the last/current try - assert ti._try_number == 1 - # Check 'ti.try_number' is bumped to 2. This is try_number for next run - assert ti.try_number == 2 + # try_number remains at 1 + assert ti.try_number == 1 Stats_incr.assert_any_call("ti_failures", tags=expected_stats_tags) Stats_incr.assert_any_call("operator_failures_EmptyOperator", tags=expected_stats_tags) @@ -3413,7 +3437,7 @@ def test_refresh_from_db(self, create_task_instance): "end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234), "duration": 1.234, "state": State.SUCCESS, - "_try_number": 1, + "try_number": 1, "max_tries": 1, "hostname": "some_unique_hostname", "unixname": "some_unique_unixname", @@ -3439,7 +3463,7 @@ def test_refresh_from_db(self, create_task_instance): "task_display_name": "Test Refresh from DB Task", } # Make sure we aren't missing any new value in our expected_values list. - expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values} + expected_keys = {f"task_instance.{key}" for key in expected_values} assert {str(c) for c in TI.__table__.columns} == expected_keys, ( "Please add all non-foreign values of TaskInstance to this list. " "This prevents refresh_from_db() from missing a field." @@ -4687,20 +4711,11 @@ def test__refresh_from_db_should_not_increment_try_number(dag_maker, session): BashOperator(task_id="hello", bash_command="hi") dag_maker.create_dagrun(state="success") ti = session.scalar(select(TaskInstance)) + session.get(TaskInstance, ti.key.primary).try_number += 1 + session.commit() assert ti.task_id == "hello" # just to confirm... assert ti.try_number == 1 # starts out as 1 ti.refresh_from_db() assert ti.try_number == 1 # stays 1 ti.refresh_from_db() assert ti.try_number == 1 # stays 1 - - -@pytest.mark.parametrize("state", list(TaskInstanceState)) -def test_get_private_try_number(state: str): - mock_ti = MagicMock() - mock_ti.state = state - private_try_number = 2 - mock_ti._try_number = private_try_number - mock_ti.try_number = _get_try_number(task_instance=mock_ti) - delattr(mock_ti, "_try_number") - assert _get_private_try_number(task_instance=mock_ti) == private_try_number diff --git a/tests/plugins/priority_weight_strategy.py b/tests/plugins/priority_weight_strategy.py index c56ae7364adb3..a20553635f844 100644 --- a/tests/plugins/priority_weight_strategy.py +++ b/tests/plugins/priority_weight_strategy.py @@ -44,7 +44,7 @@ class DecreasingPriorityStrategy(PriorityWeightStrategy): """A priority weight strategy that decreases the priority weight with each attempt.""" def get_weight(self, ti: TaskInstance): - return max(3 - ti._try_number + 1, 1) + return max(3 - ti.try_number + 1, 1) class TestPriorityWeightStrategyPlugin(AirflowPlugin): diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py index 524360dbac2ad..6c6bef5f7c7b1 100644 --- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py @@ -1179,7 +1179,7 @@ def test_try_adopt_task_instances(self, mock_executor): orphaned_tasks[1].external_executor_id = "002" # Matches a running task_arn orphaned_tasks[2].external_executor_id = None # One orphaned task has no external_executor_id for task in orphaned_tasks: - task.prev_attempted_tries = 1 + task.try_number = 1 not_adopted_tasks = mock_executor.try_adopt_task_instances(orphaned_tasks) diff --git a/tests/providers/celery/executors/test_celery_executor.py b/tests/providers/celery/executors/test_celery_executor.py index 4c62a24f816b3..e0913054d1dae 100644 --- a/tests/providers/celery/executors/test_celery_executor.py +++ b/tests/providers/celery/executors/test_celery_executor.py @@ -200,8 +200,6 @@ def test_try_adopt_task_instances_none(self): def test_try_adopt_task_instances(self): start_date = timezone.utcnow() - timedelta(days=2) - try_number = 1 - with DAG("test_try_adopt_task_instances_none") as dag: task_1 = BaseOperator(task_id="task_1", start_date=start_date) task_2 = BaseOperator(task_id="task_2", start_date=start_date) @@ -221,8 +219,8 @@ def test_try_adopt_task_instances(self): not_adopted_tis = executor.try_adopt_task_instances(tis) - key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, try_number) - key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, try_number) + key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0) + key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0) assert executor.running == {key_1, key_2} assert executor.tasks == {key_1: AsyncResult("231"), key_2: AsyncResult("232")} diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index fe7c94146011d..224824edbc5e3 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -358,7 +358,7 @@ def test_labels(self, hook_mock, in_cluster): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "1", + "try_number": "0", "airflow_version": mock.ANY, "run_id": "test", "airflow_kpo_in_cluster": str(in_cluster), @@ -374,7 +374,7 @@ def test_labels_mapped(self): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "1", + "try_number": "0", "airflow_version": mock.ANY, "run_id": "test", "map_index": "10", @@ -884,7 +884,7 @@ def test_full_pod_spec(self, randomize_name, pod_spec): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "1", + "try_number": "0", "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -920,7 +920,7 @@ def test_full_pod_spec_kwargs(self, randomize_name, pod_spec): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "1", + "try_number": "0", "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -991,7 +991,7 @@ def test_pod_template_file(self, randomize_name, pod_template_file): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "1", + "try_number": "0", "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -1061,7 +1061,7 @@ def test_pod_template_file_kwargs_override(self, randomize_name, pod_template_fi "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "1", + "try_number": "0", "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", @@ -1112,7 +1112,7 @@ def test_pod_template_dict(self, randomize_name): "dag_id": "dag", "kubernetes_pod_operator": "True", "task_id": "task", - "try_number": "1", + "try_number": "0", "airflow_version": mock.ANY, "airflow_kpo_in_cluster": str(k.hook.is_in_cluster), "run_id": "test", diff --git a/tests/providers/cncf/kubernetes/test_template_rendering.py b/tests/providers/cncf/kubernetes/test_template_rendering.py index 0627eb8d586f2..f3e61101eab8f 100644 --- a/tests/providers/cncf/kubernetes/test_template_rendering.py +++ b/tests/providers/cncf/kubernetes/test_template_rendering.py @@ -48,7 +48,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance): "dag_id": "test_render_k8s_pod_yaml", "run_id": "test_run_id", "task_id": "op1", - "try_number": "1", + "try_number": "0", }, "labels": { "airflow-worker": "0", @@ -57,7 +57,7 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, create_task_instance): "run_id": "test_run_id", "kubernetes_executor": "True", "task_id": "op1", - "try_number": "1", + "try_number": "0", }, "name": mock.ANY, "namespace": "default", diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index c37892c1f3dbe..fa651de1b22d9 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -297,12 +297,14 @@ def mock_task_id(dag_id, task_id, execution_date, try_number): listener.on_task_instance_success(None, task_instance, None) # This run_id will be different as we did NOT simulate increase of the try_number attribute, # which happens in Airflow. - listener.adapter.complete_task.assert_called_once_with( + calls = listener.adapter.complete_task.call_args_list + assert len(calls) == 1 + assert calls[0][1] == dict( end_time="2023-01-03T13:01:01", job_name="job_name", parent_job_name="dag_id", parent_run_id="dag_id.dag_run_run_id", - run_id="dag_id.task_id.execution_date.0", + run_id="dag_id.task_id.execution_date.1", task=listener.extractor_manager.extract_metadata(), ) @@ -310,12 +312,14 @@ def mock_task_id(dag_id, task_id, execution_date, try_number): listener.adapter.complete_task.reset_mock() task_instance.try_number += 1 listener.on_task_instance_success(None, task_instance, None) - listener.adapter.complete_task.assert_called_once_with( + calls = listener.adapter.complete_task.call_args_list + assert len(calls) == 1 + assert calls[0][1] == dict( end_time="2023-01-03T13:01:01", job_name="job_name", parent_job_name="dag_id", parent_run_id="dag_id.dag_run_run_id", - run_id="dag_id.task_id.execution_date.1", + run_id="dag_id.task_id.execution_date.2", task=listener.extractor_manager.extract_metadata(), ) @@ -334,24 +338,23 @@ def mock_task_id(dag_id, task_id, execution_date, try_number): listener, task_instance = _create_listener_and_task_instance() mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id - + expected_run_id_1 = "dag_id.task_id.execution_date.1" + expected_run_id_2 = "dag_id.task_id.execution_date.2" listener.on_task_instance_running(None, task_instance, None) - expected_run_id = listener.adapter.start_task.call_args.kwargs["run_id"] - assert expected_run_id == "dag_id.task_id.execution_date.1" + assert listener.adapter.start_task.call_args.kwargs["run_id"] == expected_run_id_1 listener.on_task_instance_failed(None, task_instance, None) - assert listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id + assert listener.adapter.fail_task.call_args.kwargs["run_id"] == expected_run_id_1 - # This run_id will be different as we did NOT simulate increase of the try_number attribute, - # which happens in Airflow. + # This run_id will not be different as we did NOT simulate increase of the try_number attribute, listener.on_task_instance_success(None, task_instance, None) - assert listener.adapter.complete_task.call_args.kwargs["run_id"] == "dag_id.task_id.execution_date.0" + assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_1 # Now we simulate the increase of try_number, and the run_id should reflect that change. # This is how airflow works, and that's why we expect the run_id to remain constant across all methods. task_instance.try_number += 1 listener.on_task_instance_success(None, task_instance, None) - assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id + assert listener.adapter.complete_task.call_args.kwargs["run_id"] == expected_run_id_2 def test_running_task_correctly_calls_openlineage_adapter_run_id_method(): @@ -403,7 +406,7 @@ def test_successful_task_correctly_calls_openlineage_adapter_run_id_method(mock_ dag_id="dag_id", task_id="task_id", execution_date="execution_date", - try_number=0, + try_number=1, ) @@ -428,16 +431,16 @@ def fail_callable(**kwargs): _, task_instance = _create_test_dag_and_task(fail_callable, "failure") # try_number before execution - assert task_instance.try_number == 1 + assert task_instance.try_number == 0 with suppress(CustomError): task_instance.run() # try_number at the moment of function being called - assert captured_try_numbers["running"] == 1 - assert captured_try_numbers["failed"] == 1 + assert captured_try_numbers["running"] == 0 + assert captured_try_numbers["failed"] == 0 # try_number after task has been executed - assert task_instance.try_number == 2 + assert task_instance.try_number == 0 @mock.patch("airflow.models.taskinstance.get_listener_manager") @@ -457,15 +460,15 @@ def success_callable(**kwargs): _, task_instance = _create_test_dag_and_task(success_callable, "success") # try_number before execution - assert task_instance.try_number == 1 + assert task_instance.try_number == 0 task_instance.run() # try_number at the moment of function being called - assert captured_try_numbers["running"] == 1 - assert captured_try_numbers["success"] == 2 + assert captured_try_numbers["running"] == 0 + assert captured_try_numbers["success"] == 0 # try_number after task has been executed - assert task_instance.try_number == 2 + assert task_instance.try_number == 0 @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") diff --git a/tests/providers/smtp/notifications/test_smtp.py b/tests/providers/smtp/notifications/test_smtp.py index f1a71e2b53c1b..b19cc4baa873b 100644 --- a/tests/providers/smtp/notifications/test_smtp.py +++ b/tests/providers/smtp/notifications/test_smtp.py @@ -129,7 +129,7 @@ def test_notifier_with_defaults(self, mock_smtphook_hook, create_task_instance): from_email=conf.get("smtp", "smtp_mail_from"), to="test_reciver@test.com", subject="DAG dag - Task op - Run ID test in State None", - html_content="""\n\n \n \n \n \n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
Run ID:test
Try:1 of 1
Task State:None
Host:
Log Link:http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs
Mark Success Link:http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success
\n\n""", + html_content="""\n\n \n \n \n \n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
Run ID:test
Try:0 of 1
Task State:None
Host:
Log Link:http://localhost:8080/dags/dag/grid?dag_run_id=test&task_id=op&map_index=-1&tab=logs
Mark Success Link:http://localhost:8080/confirm?task_id=op&dag_id=dag&dag_run_id=test&upstream=false&downstream=false&state=success
\n\n""", smtp_conn_id="smtp_default", files=None, cc=None, diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py index a41e6836838d5..dbf62883a50bb 100644 --- a/tests/sensors/test_base.py +++ b/tests/sensors/test_base.py @@ -297,7 +297,17 @@ def _get_tis(): date1 = timezone.utcnow() time_machine.move_to(date1, tick=False) + sensor_ti, dummy_ti = _get_tis() + assert dummy_ti.state == State.NONE + assert sensor_ti.state == State.NONE + + # ordinarily the scheduler does this + sensor_ti.state = State.SCHEDULED + sensor_ti.try_number += 1 # first TI run + session.commit() + self._run(sensor, session=session) + sensor_ti, dummy_ti = _get_tis() assert sensor_ti.state == State.UP_FOR_RESCHEDULE assert dummy_ti.state == State.NONE @@ -326,6 +336,9 @@ def _get_tis(): # first poke returns False and task is re-scheduled date1 = timezone.utcnow() time_machine.move_to(date1, tick=False) + sensor_ti, dummy_ti = _get_tis() + sensor_ti.try_number += 1 # second TI run + session.commit() self._run(sensor) sensor_ti, dummy_ti = _get_tis() assert sensor_ti.state == State.UP_FOR_RESCHEDULE @@ -359,6 +372,9 @@ def _get_tis(): # first poke returns False and task is re-scheduled date1 = timezone.utcnow() time_machine.move_to(date1, tick=False) + sensor_ti, dummy_ti = _get_tis() + sensor_ti.try_number += 1 # first TI run + session.commit() self._run(sensor) sensor_ti, dummy_ti = _get_tis() assert sensor_ti.state == State.UP_FOR_RESCHEDULE @@ -382,9 +398,12 @@ def _get_tis(): # Task is cleared sensor.clear() sensor_ti, dummy_ti = _get_tis() - assert sensor_ti.try_number == 2 + assert sensor_ti.try_number == 1 assert sensor_ti.max_tries == 2 + sensor_ti, dummy_ti = _get_tis() + sensor_ti.try_number += 1 # second TI run + session.commit() # third poke returns False and task is rescheduled again date3 = date1 + timedelta(seconds=sensor.poke_interval) * 2 + sensor.retry_delay time_machine.coordinates.shift(sensor.poke_interval + sensor.retry_delay.total_seconds()) @@ -687,9 +706,15 @@ def _get_sensor_ti(): tis = dr.get_task_instances(session=session) return next(x for x in tis if x.task_id == SENSOR_OP) + def _increment_try_number(): + sensor_ti = _get_sensor_ti() + sensor_ti.try_number += 1 + session.commit() + # first poke returns False and task is re-scheduled date1 = timezone.utcnow() time_machine.move_to(date1, tick=False) + _increment_try_number() # first TI run self._run(sensor) sensor_ti = _get_sensor_ti() assert sensor_ti.try_number == 1 @@ -701,12 +726,13 @@ def _get_sensor_ti(): with pytest.raises(RuntimeError): self._run(sensor) sensor_ti = _get_sensor_ti() - assert sensor_ti.try_number == 2 + assert sensor_ti.try_number == 1 assert sensor_ti.max_tries == 2 assert sensor_ti.state == State.UP_FOR_RETRY # third poke returns False and task is rescheduled again time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1)) + _increment_try_number() # second TI run self._run(sensor) sensor_ti = _get_sensor_ti() assert sensor_ti.try_number == 2 @@ -718,19 +744,22 @@ def _get_sensor_ti(): with pytest.raises(AirflowSensorTimeout): self._run(sensor) sensor_ti = _get_sensor_ti() - assert sensor_ti.try_number == 3 + assert sensor_ti.try_number == 2 assert sensor_ti.max_tries == 2 assert sensor_ti.state == State.FAILED # Clear the failed sensor sensor.clear() sensor_ti = _get_sensor_ti() - assert sensor_ti.try_number == 3 + # clearing does not change the try_number + assert sensor_ti.try_number == 2 + # but it does change the max_tries assert sensor_ti.max_tries == 4 assert sensor_ti.state is None time_machine.coordinates.shift(20) + _increment_try_number() # third TI run for _ in range(3): time_machine.coordinates.shift(sensor.poke_interval) self._run(sensor) @@ -744,7 +773,7 @@ def _get_sensor_ti(): with pytest.raises(AirflowSensorTimeout): self._run(sensor) sensor_ti = _get_sensor_ti() - assert sensor_ti.try_number == 4 + assert sensor_ti.try_number == 3 assert sensor_ti.max_tries == 4 assert sensor_ti.state == State.FAILED @@ -794,13 +823,16 @@ def _get_sensor_ti(): # first poke returns False and task is re-scheduled date1 = timezone.utcnow() time_machine.move_to(date1, tick=False) + sensor_ti = _get_sensor_ti() + sensor_ti.try_number += 1 # first TI run self._run(sensor) + sensor_ti = _get_sensor_ti() assert sensor_ti.try_number == 1 assert sensor_ti.max_tries == 2 assert sensor_ti.state == State.UP_FOR_RESCHEDULE - # second poke raises RuntimeError and task instance is re-scheduled again + # second poke raises reschedule exception and task instance is re-scheduled again time_machine.coordinates.shift(sensor.poke_interval) self._run(sensor) sensor_ti = _get_sensor_ti() @@ -821,19 +853,21 @@ def _get_sensor_ti(): with pytest.raises(AirflowSensorTimeout): self._run(sensor) sensor_ti = _get_sensor_ti() - assert sensor_ti.try_number == 2 + assert sensor_ti.try_number == 1 assert sensor_ti.max_tries == 2 assert sensor_ti.state == State.FAILED # Clear the failed sensor sensor.clear() sensor_ti = _get_sensor_ti() - assert sensor_ti.try_number == 2 + assert sensor_ti.try_number == 1 assert sensor_ti.max_tries == 3 assert sensor_ti.state == State.NONE time_machine.coordinates.shift(20) + sensor_ti.try_number += 1 # second TI run + session.commit() for _ in range(3): time_machine.coordinates.shift(sensor.poke_interval) self._run(sensor) @@ -847,7 +881,7 @@ def _get_sensor_ti(): with pytest.raises(AirflowSensorTimeout): self._run(sensor) sensor_ti = _get_sensor_ti() - assert sensor_ti.try_number == 3 + assert sensor_ti.try_number == 2 assert sensor_ti.max_tries == 3 assert sensor_ti.state == State.FAILED @@ -983,7 +1017,7 @@ class TestAsyncSensor: (False, AirflowException), ], ) - def test_fail_after_resuming_deffered_sensor(self, soft_fail, expected_exception): + def test_fail_after_resuming_deferred_sensor(self, soft_fail, expected_exception): async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", soft_fail=soft_fail) ti = TaskInstance(task=async_sensor) ti.next_method = "execute_complete" diff --git a/tests/test_utils/mock_executor.py b/tests/test_utils/mock_executor.py index ba555fbcd9cb3..eaf5d32d8695a 100644 --- a/tests/test_utils/mock_executor.py +++ b/tests/test_utils/mock_executor.py @@ -71,7 +71,6 @@ def sort_by(item): sorted_queue = sorted(self.queued_tasks.items(), key=sort_by) for key, (_, _, _, ti) in sorted_queue[:open_slots]: self.queued_tasks.pop(key) - ti._try_number += 1 state = self.mock_task_results[key] ti.set_state(state, session=session) self.change_state(key, state) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 077270480042d..4f7eb9471c57f 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -122,7 +122,7 @@ def task_callable(ti): # We expect set_context generates a file locally. log_filename = file_handler.handler.baseFilename assert os.path.isfile(log_filename) - assert log_filename.endswith("1.log"), log_filename + assert log_filename.endswith("0.log"), log_filename ti.run(ignore_ti_state=True) @@ -161,7 +161,7 @@ def task_callable(ti): python_callable=task_callable, ) ti = TaskInstance(task=task, run_id=dagrun.run_id) - + ti.try_number += 1 logger = ti.log ti.log.disabled = False @@ -498,7 +498,7 @@ def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, se t.task_instance = ti h = FileTaskHandler(base_log_folder=os.fspath(tmp_path)) h.set_context(ti) - expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=1.log" + expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=0.log" if is_a_trigger: expected += f".trigger.{job.id}.log" actual = h.handler.baseFilename diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index 3d3248f1108b2..e32eb6654ba7a 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -185,7 +185,7 @@ def create_expected_log_file(log_path, tis): handler = FileTaskHandler(log_path) def create_expected_log_file(try_number): - ti.try_number = try_number - 1 + ti.try_number = 1 handler.set_context(ti) handler.emit(logging.makeLogRecord({"msg": "Log for testing."})) handler.flush() @@ -271,8 +271,9 @@ def test_get_logs_with_metadata_as_download_file(log_admin_client, create_expect in content_disposition ) assert 200 == response.status_code - assert "Log for testing." in response.data.decode("utf-8") - assert "localhost\n" in response.data.decode("utf-8") + content = response.data.decode("utf-8") + assert "Log for testing." in content + assert "localhost\n" in content DIFFERENT_LOG_FILENAME = "{{ ti.dag_id }}/{{ ti.run_id }}/{{ ti.task_id }}/{{ try_number }}.log" diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 2b893c40f671d..233542ea1e3a2 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -1089,7 +1089,7 @@ def test_task_instances(admin_client): "task_id": "also_run_this", "trigger_id": None, "trigger_timeout": None, - "try_number": 1, + "try_number": 0, "unixname": getuser(), "updated_at": DEFAULT_DATE.isoformat(), }, @@ -1124,7 +1124,7 @@ def test_task_instances(admin_client): "task_id": "run_after_loop", "trigger_id": None, "trigger_timeout": None, - "try_number": 1, + "try_number": 0, "unixname": getuser(), "updated_at": DEFAULT_DATE.isoformat(), }, @@ -1159,7 +1159,7 @@ def test_task_instances(admin_client): "task_id": "run_this_last", "trigger_id": None, "trigger_timeout": None, - "try_number": 1, + "try_number": 0, "unixname": getuser(), "updated_at": DEFAULT_DATE.isoformat(), }, @@ -1194,7 +1194,7 @@ def test_task_instances(admin_client): "task_id": "runme_0", "trigger_id": None, "trigger_timeout": None, - "try_number": 1, + "try_number": 0, "unixname": getuser(), "updated_at": DEFAULT_DATE.isoformat(), }, @@ -1229,7 +1229,7 @@ def test_task_instances(admin_client): "task_id": "runme_1", "trigger_id": None, "trigger_timeout": None, - "try_number": 1, + "try_number": 0, "unixname": getuser(), "updated_at": DEFAULT_DATE.isoformat(), }, @@ -1264,7 +1264,7 @@ def test_task_instances(admin_client): "task_id": "runme_2", "trigger_id": None, "trigger_timeout": None, - "try_number": 1, + "try_number": 0, "unixname": getuser(), "updated_at": DEFAULT_DATE.isoformat(), }, @@ -1299,7 +1299,7 @@ def test_task_instances(admin_client): "task_id": "this_will_skip", "trigger_id": None, "trigger_timeout": None, - "try_number": 1, + "try_number": 0, "unixname": getuser(), "updated_at": DEFAULT_DATE.isoformat(), },