diff --git a/airflow/providers/apache/beam/operators/beam.py b/airflow/providers/apache/beam/operators/beam.py index dbf6053ead940..082a9a7f6e79a 100644 --- a/airflow/providers/apache/beam/operators/beam.py +++ b/airflow/providers/apache/beam/operators/beam.py @@ -319,6 +319,7 @@ def execute(self, context: 'Context'): location=self.dataflow_config.location, job_id=self.dataflow_job_id, multiple_jobs=False, + project_id=self.dataflow_config.project_id, ) return {"dataflow_job_id": self.dataflow_job_id} else: @@ -600,6 +601,7 @@ def execute(self, context: 'Context'): location=self.dataflow_config.location, job_id=self.dataflow_job_id, multiple_jobs=False, + project_id=self.dataflow_config.project_id, ) return {"dataflow_job_id": self.dataflow_job_id} else: diff --git a/tests/providers/apache/beam/operators/test_beam.py b/tests/providers/apache/beam/operators/test_beam.py index 74c4d9cf5e64c..6fd83d72689f4 100644 --- a/tests/providers/apache/beam/operators/test_beam.py +++ b/tests/providers/apache/beam/operators/test_beam.py @@ -149,6 +149,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock job_name=job_name, location='us-central1', multiple_jobs=False, + project_id=dataflow_config.project_id, ) dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with() @@ -415,6 +416,7 @@ def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock job_name=job_name, location='us-central1', multiple_jobs=False, + project_id=dataflow_config.project_id, ) dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with()