From 6f3aa924d42c529945c6ca9e50350cbbf731eb1c Mon Sep 17 00:00:00 2001 From: Lukasz Wyszomirski Date: Fri, 27 May 2022 08:45:16 +0000 Subject: [PATCH 1/2] Support impersonation service account parameter for Dataflow runner --- airflow/providers/apache/beam/operators/beam.py | 15 +++++++++++++++ setup.py | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/airflow/providers/apache/beam/operators/beam.py b/airflow/providers/apache/beam/operators/beam.py index dbf6053ead940..3671bb22b01e1 100644 --- a/airflow/providers/apache/beam/operators/beam.py +++ b/airflow/providers/apache/beam/operators/beam.py @@ -51,6 +51,7 @@ class BeamDataflowMixin(metaclass=ABCMeta): dataflow_config: DataflowConfiguration gcp_conn_id: str delegate_to: Optional[str] + dataflow_support_impersonation: bool = True def _set_dataflow( self, @@ -91,6 +92,13 @@ def __get_dataflow_pipeline_options( pipeline_options[job_name_key] = job_name if self.dataflow_config.service_account: pipeline_options["serviceAccount"] = self.dataflow_config.service_account + if self.dataflow_support_impersonation and self.dataflow_config.impersonation_chain: + if isinstance(self.dataflow_config.impersonation_chain, list): + pipeline_options["impersonateServiceAccount"] = ",".join( + self.dataflow_config.impersonation_chain + ) + else: + pipeline_options["impersonateServiceAccount"] = self.dataflow_config.impersonation_chain pipeline_options["project"] = self.dataflow_config.project_id pipeline_options["region"] = self.dataflow_config.location pipeline_options.setdefault("labels", {}).update( @@ -549,6 +557,13 @@ def __init__( **kwargs, ) + if self.dataflow_config.impersonation_chain: + self.log.info( + "Impersonation chain parameter is not supported for Apache Beam GO SDK and will be skipped " + "in the execution" + ) + self.dataflow_support_impersonation = False + self.go_file = go_file self.should_init_go_module = False self.pipeline_options.setdefault("labels", {}).update( diff --git a/setup.py b/setup.py index e7d59516947cf..2859199941a05 100644 --- a/setup.py +++ b/setup.py @@ -205,7 +205,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version 'xmltodict<0.13.0', ] apache_beam = [ - 'apache-beam>=2.33.0', + 'apache-beam>=2.39.0', ] arangodb = ['python-arango>=7.3.2'] asana = ['asana>=0.10'] From df4eec679419a533e1488b30a465f7d2dd293e3c Mon Sep 17 00:00:00 2001 From: Lukasz Wyszomirski Date: Fri, 27 May 2022 11:35:22 +0000 Subject: [PATCH 2/2] Extension of the tests for apache beam related to DataflowRunner --- tests/providers/apache/beam/operators/test_beam.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/providers/apache/beam/operators/test_beam.py b/tests/providers/apache/beam/operators/test_beam.py index 74c4d9cf5e64c..ae3f4f07b9721 100644 --- a/tests/providers/apache/beam/operators/test_beam.py +++ b/tests/providers/apache/beam/operators/test_beam.py @@ -47,6 +47,7 @@ 'output': 'gs://test/output', 'labels': {'foo': 'bar', 'airflow-version': TEST_VERSION}, } +TEST_IMPERSONATION_ACCOUNT = "test@impersonation.com" class TestBeamRunPythonPipelineOperator(unittest.TestCase): @@ -104,7 +105,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock """Test DataflowHook is created and the right args are passed to start_python_dataflow. """ - dataflow_config = DataflowConfiguration() + dataflow_config = DataflowConfiguration(impersonation_chain=TEST_IMPERSONATION_ACCOUNT) self.operator.runner = "DataflowRunner" self.operator.dataflow_config = dataflow_config gcs_provide_file = gcs_hook.return_value.provide_file @@ -126,6 +127,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock 'output': 'gs://test/output', 'labels': {'foo': 'bar', 'airflow-version': TEST_VERSION}, 'region': 'us-central1', + 'impersonate_service_account': TEST_IMPERSONATION_ACCOUNT, } gcs_provide_file.assert_called_once_with(object_url=PY_FILE) persist_link_mock.assert_called_once_with( @@ -222,7 +224,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock """Test DataflowHook is created and the right args are passed to start_java_dataflow. """ - dataflow_config = DataflowConfiguration() + dataflow_config = DataflowConfiguration(impersonation_chain="test@impersonation.com") self.operator.runner = "DataflowRunner" self.operator.dataflow_config = dataflow_config gcs_provide_file = gcs_hook.return_value.provide_file @@ -247,6 +249,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock 'region': 'us-central1', 'labels': {'foo': 'bar', 'airflow-version': TEST_VERSION}, 'output': 'gs://test/output', + 'impersonateServiceAccount': TEST_IMPERSONATION_ACCOUNT, } persist_link_mock.assert_called_once_with( self.operator, @@ -373,7 +376,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock """Test DataflowHook is created and the right args are passed to start_go_dataflow. """ - dataflow_config = DataflowConfiguration() + dataflow_config = DataflowConfiguration(impersonation_chain="test@impersonation.com") self.operator.runner = "DataflowRunner" self.operator.dataflow_config = dataflow_config gcs_provide_file = gcs_hook.return_value.provide_file