Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-attach logic - final fixes #4064

Merged
merged 2 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This class provides integration with Airflow and Armada
## armada.operators.armada module


### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, \*\*kwargs)
### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, reattach_policy=None, \*\*kwargs)
Bases: `BaseOperator`, `LoggingMixin`

An Airflow operator that manages Job submission to Armada.
Expand Down Expand Up @@ -60,6 +60,9 @@ and handles job cancellation if the Airflow task is killed.
* **dry_run** (*bool*) –


* **reattach_policy** (*Optional**[**str**] **| **Callable**[**[**JobState**, **str**]**, **bool**]*) –



#### execute(context)
Submits the job to Armada and polls for completion.
Expand Down Expand Up @@ -167,6 +170,8 @@ acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param reattach_policy: Operator reattach policy to use (defaults to: never)
:type reattach_policy: Optional[str] | Callable[[JobState, str], bool]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


Expand Down
36 changes: 23 additions & 13 deletions third_party/airflow/armada/operators/armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param reattach_policy: Operator reattach policy to use (defaults to: always)
:type reattach_policy: Optional[str]
:param reattach_policy: Operator reattach policy to use (defaults to: never)
:type reattach_policy: Optional[str] | Callable[[JobState, str], bool]
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
"""

Expand Down Expand Up @@ -135,7 +135,7 @@ def __init__(
dry_run: bool = conf.getboolean(
"armada_operator", "default_dry_run", fallback=False
),
reattach_policy: Optional[str] = None,
reattach_policy: Optional[str] | Callable[[JobState, str], bool] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -154,14 +154,21 @@ def __init__(
self.dry_run = dry_run
self.job_context = None

configured_reattach_policy: str = resolve_parameter_value(
"reattach_policy", reattach_policy, kwargs, "never"
)
self.log.info(
f"Configured reattach policy to: '{configured_reattach_policy}',"
f" max retries: {self.retries}"
)
self.reattach_policy = policy(configured_reattach_policy)
if reattach_policy is callable(reattach_policy):
self.log.info(
f"Configured reattach policy with callable',"
f" max retries: {self.retries}"
)
self.reattach_policy = reattach_policy
else:
configured_reattach_policy: str = resolve_parameter_value(
"reattach_policy", reattach_policy, kwargs, "never"
)
self.log.info(
f"Configured reattach policy to: '{configured_reattach_policy}',"
f" max retries: {self.retries}"
)
self.reattach_policy = policy(configured_reattach_policy)

if self.container_logs and self.k8s_token_retriever is None:
self.log.warning(
Expand Down Expand Up @@ -342,8 +349,11 @@ def _try_reattach_to_running_job(
self, context: Context
) -> Optional[RunningJobContext]:
# On first try we intentionally do not re-attach.
self.log.info(context)
if context["ti"].try_number == 1:
new_run = (
context["ti"].max_tries - context["ti"].try_number + 1
== context["ti"].task.retries
)
if new_run:
return None

expected_job_uri = external_job_uri(context)
Expand Down
4 changes: 2 additions & 2 deletions third_party/airflow/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ build-backend = "setuptools.build_meta"

[project]
name = "armada_airflow"
version = "1.0.10"
version = "1.0.11"
description = "Armada Airflow Operator"
readme='README.md'
authors = [{name = "Armada-GROSS", email = "[email protected]"}]
license = { text = "Apache Software License" }
dependencies=[
'armada-client>=0.4.7',
'armada-client>=0.4.8',
'apache-airflow>=2.6.3',
'types-protobuf==4.24.0.1',
'kubernetes>=23.6.0',
Expand Down