Skip to content

Commit

Permalink
[AIRFLOW-2716] Replace async and await py3.7 keywords
Browse files Browse the repository at this point in the history
Closes #3578 from JacobHayes/py37-keywords
  • Loading branch information
JacobHayes authored and Fokko Driesprong committed Jul 29, 2018
1 parent 9b7525f commit fcd51f3
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 12 deletions.
4 changes: 4 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ assists users migrating to a new version.

## Airflow Master

### Replace DataProcHook.await calls to DataProcHook.wait

The method name was changed to be compatible with the Python 3.7 async/await keywords

### DAG level Access Control for new RBAC UI

Extend and enhance new Airflow RBAC UI to support DAG level ACL. Each dag now has two permissions(one for write, one for read) associated('can_dag_edit', 'can_dag_read').
Expand Down
12 changes: 11 additions & 1 deletion airflow/contrib/hooks/gcp_dataproc_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import uuid

from apiclient.discovery import build
from zope.deprecation import deprecation

from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -224,7 +225,16 @@ def create_job_template(self, task_id, cluster_name, job_type, properties):
return _DataProcJobBuilder(self.project_id, task_id, cluster_name,
job_type, properties)

def await(self, operation):
def wait(self, operation):
"""Awaits for Google Cloud Dataproc Operation to complete."""
submitted = _DataProcOperation(self.get_conn(), operation)
submitted.wait_for_done()


setattr(
DataProcHook,
"await",
deprecation.deprecated(
DataProcHook.wait, "renamed to 'wait' for Python3.7 compatability"
),
)
2 changes: 1 addition & 1 deletion airflow/contrib/operators/dataproc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ def __init__(self,
)

def execute(self, context):
self.hook.await(self.start())
self.hook.wait(self.start())

def start(self, context):
raise AirflowException('plese start a workflow operation')
Expand Down
12 changes: 6 additions & 6 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ def execute_async(self, key, command,

def sync(self):
self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
for key, async in list(self.tasks.items()):
for key, task in list(self.tasks.items()):
try:
state = async.state
state = task.state
if self.last_state[key] != state:
if state == celery_states.SUCCESS:
self.success(key)
Expand All @@ -106,16 +106,16 @@ def sync(self):
del self.tasks[key]
del self.last_state[key]
else:
self.log.info("Unexpected state: %s", async.state)
self.last_state[key] = async.state
self.log.info("Unexpected state: %s", task.state)
self.last_state[key] = task.state
except Exception as e:
self.log.error("Error syncing the celery executor, ignoring it:")
self.log.exception(e)

def end(self, synchronous=False):
if synchronous:
while any([
async.state not in celery_states.READY_STATES
for async in self.tasks.values()]):
task.state not in celery_states.READY_STATES
for task in self.tasks.values()]):
time.sleep(5)
self.sync()
8 changes: 4 additions & 4 deletions tests/contrib/operators/test_dataproc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ def test_workflow(self):
with patch(HOOK) as MockHook:
hook = MockHook()
hook.get_conn.return_value = self.mock_conn
hook.await.return_value = None
hook.wait.return_value = None

dataproc_task = DataprocWorkflowTemplateInstantiateOperator(
task_id=TASK_ID,
Expand All @@ -586,7 +586,7 @@ def test_workflow(self):
self.mock_workflows.instantiate.assert_called_once_with(
name=template_name,
body=mock.ANY)
hook.await.assert_called_once_with(self.operation)
hook.wait.assert_called_once_with(self.operation)


class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase):
Expand Down Expand Up @@ -617,7 +617,7 @@ def test_iniline_workflow(self):
with patch(HOOK) as MockHook:
hook = MockHook()
hook.get_conn.return_value = self.mock_conn
hook.await.return_value = None
hook.wait.return_value = None

template = {
"placement": {
Expand Down Expand Up @@ -652,4 +652,4 @@ def test_iniline_workflow(self):
parent='projects/test-project-id/regions/test-region',
instanceId=mock.ANY,
body=template)
hook.await.assert_called_once_with(self.operation)
hook.wait.assert_called_once_with(self.operation)

0 comments on commit fcd51f3

Please sign in to comment.