diff --git a/airflow/providers/amazon/aws/hooks/emr.py b/airflow/providers/amazon/aws/hooks/emr.py index 143bdcdcc8913..2141b38ed8fc4 100644 --- a/airflow/providers/amazon/aws/hooks/emr.py +++ b/airflow/providers/amazon/aws/hooks/emr.py @@ -20,7 +20,7 @@ from botocore.exceptions import ClientError -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -41,8 +41,8 @@ class EmrHook(AwsBaseHook): conn_type = 'emr' hook_name = 'Amazon Elastic MapReduce' - def __init__(self, emr_conn_id: Optional[str] = default_conn_name, *args, **kwargs) -> None: - self.emr_conn_id = emr_conn_id + def __init__(self, emr_conn_id: str = default_conn_name, *args, **kwargs) -> None: + self.emr_conn_id: str = emr_conn_id kwargs["client_type"] = "emr" super().__init__(*args, **kwargs) @@ -78,12 +78,11 @@ def create_job_flow(self, job_flow_overrides: Dict[str, Any]) -> Dict[str, Any]: run_job_flow method. Overrides for this config may be passed as the job_flow_overrides. """ - if not self.emr_conn_id: - raise AirflowException('emr_conn_id must be present to use create_job_flow') - - emr_conn = self.get_connection(self.emr_conn_id) - - config = emr_conn.extra_dejson.copy() + try: + emr_conn = self.get_connection(self.emr_conn_id) + config = emr_conn.extra_dejson.copy() + except AirflowNotFoundException: + config = {} config.update(job_flow_overrides) response = self.get_conn().run_job_flow(**config) diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index 510c77184fa5d..67ae54af505f8 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -285,8 +285,13 @@ class EmrCreateJobFlowOperator(BaseOperator): For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrCreateJobFlowOperator` - :param aws_conn_id: aws connection to uses - :param emr_conn_id: emr connection to use + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node) + :param emr_conn_id: emr connection to use for run_job_flow request body. + This will be overridden by the job_flow_overrides param :param job_flow_overrides: boto3 style arguments or reference to an arguments file (must be '.json') to override emr_connection extra. (templated) :param region_name: Region named passed to EmrHook