Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions airflow/providers/amazon/aws/operators/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence

from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.athena import AthenaHook
from airflow.providers.amazon.aws.triggers.athena import AthenaTrigger

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -71,7 +69,6 @@ def __init__(
sleep_time: int = 30,
max_polling_attempts: int | None = None,
log_query: bool = True,
deferrable: bool = False,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
Expand All @@ -84,10 +81,9 @@ def __init__(
self.query_execution_context = query_execution_context or {}
self.result_configuration = result_configuration or {}
self.sleep_time = sleep_time
self.max_polling_attempts = max_polling_attempts or 999999
self.max_polling_attempts = max_polling_attempts
self.query_execution_id: str | None = None
self.log_query: bool = log_query
self.deferrable = deferrable

@cached_property
def hook(self) -> AthenaHook:
Expand All @@ -105,15 +101,6 @@ def execute(self, context: Context) -> str | None:
self.client_request_token,
self.workgroup,
)

if self.deferrable:
self.defer(
trigger=AthenaTrigger(
self.query_execution_id, self.sleep_time, self.max_polling_attempts, self.aws_conn_id
),
method_name="execute_complete",
)
# implicit else:
query_status = self.hook.poll_query_status(
self.query_execution_id,
max_polling_attempts=self.max_polling_attempts,
Expand All @@ -134,11 +121,6 @@ def execute(self, context: Context) -> str | None:

return self.query_execution_id

def execute_complete(self, context, event=None):
if event["status"] != "success":
raise AirflowException(f"Error while waiting for operation on cluster to complete: {event}")
return event["value"]

def on_kill(self) -> None:
"""Cancel the submitted athena query."""
if self.query_execution_id:
Expand Down
76 changes: 0 additions & 76 deletions airflow/providers/amazon/aws/triggers/athena.py

This file was deleted.

3 changes: 0 additions & 3 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,6 @@ hooks:
- airflow.providers.amazon.aws.hooks.appflow

triggers:
- integration-name: Amazon Athena
python-modules:
- airflow.providers.amazon.aws.triggers.athena
- integration-name: AWS Batch
python-modules:
- airflow.providers.amazon.aws.triggers.batch
Expand Down
12 changes: 0 additions & 12 deletions tests/providers/amazon/aws/operators/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import pytest

from airflow.exceptions import TaskDeferred
from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.amazon.aws.hooks.athena import AthenaHook
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.triggers.athena import AthenaTrigger
from airflow.utils import timezone
from airflow.utils.timezone import datetime

Expand Down Expand Up @@ -160,13 +158,3 @@ def test_return_value(self, mock_conn, mock_run_query, mock_check_query_status):
ti.dag_run = dag_run

assert self.athena.execute(ti.get_template_context()) == ATHENA_QUERY_ID

@mock.patch.object(AthenaHook, "run_query", return_value=ATHENA_QUERY_ID)
def test_is_deferred(self, mock_run_query):
self.athena.deferrable = True

with pytest.raises(TaskDeferred) as deferred:
self.athena.execute(None)

assert isinstance(deferred.value.trigger, AthenaTrigger)
assert deferred.value.trigger.query_execution_id == ATHENA_QUERY_ID
53 changes: 0 additions & 53 deletions tests/providers/amazon/aws/triggers/test_athena.py

This file was deleted.