diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py b/airflow/providers/amazon/aws/example_dags/example_emr.py similarity index 71% rename from airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py rename to airflow/providers/amazon/aws/example_dags/example_emr.py index d18237ecb0723..786b4a0ad4f71 100644 --- a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py +++ b/airflow/providers/amazon/aws/example_dags/example_emr.py @@ -23,13 +23,15 @@ from airflow.providers.amazon.aws.operators.emr import ( EmrAddStepsOperator, EmrCreateJobFlowOperator, + EmrModifyClusterOperator, EmrTerminateJobFlowOperator, ) -from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor +from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, EmrStepSensor JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole') SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole') +# [START howto_operator_emr_steps_config] SPARK_STEPS = [ { 'Name': 'calculate_pi', @@ -58,48 +60,66 @@ 'KeepJobFlowAliveWhenNoSteps': False, 'TerminationProtected': False, }, + 'Steps': SPARK_STEPS, 'JobFlowRole': JOB_FLOW_ROLE, 'ServiceRole': SERVICE_ROLE, } - +# [END howto_operator_emr_steps_config] with DAG( - dag_id='example_emr_job_flow_manual_steps', + dag_id='example_emr', schedule_interval=None, start_date=datetime(2021, 1, 1), tags=['example'], catchup=False, ) as dag: - - cluster_creator = EmrCreateJobFlowOperator( + # [START howto_operator_emr_create_job_flow] + job_flow_creator = EmrCreateJobFlowOperator( task_id='create_job_flow', job_flow_overrides=JOB_FLOW_OVERRIDES, ) + # [END howto_operator_emr_create_job_flow] + + # [START howto_sensor_emr_job_flow] + job_sensor = EmrJobFlowSensor( + task_id='check_job_flow', + job_flow_id=job_flow_creator.output, + ) + # [END howto_sensor_emr_job_flow] + + # [START howto_operator_emr_modify_cluster] + cluster_modifier = EmrModifyClusterOperator( + task_id='modify_cluster', cluster_id=job_flow_creator.output, step_concurrency_level=1 + ) + # [END howto_operator_emr_modify_cluster] # [START howto_operator_emr_add_steps] step_adder = EmrAddStepsOperator( task_id='add_steps', - job_flow_id=cluster_creator.output, + job_flow_id=job_flow_creator.output, steps=SPARK_STEPS, ) # [END howto_operator_emr_add_steps] - # [START howto_sensor_emr_step_sensor] + # [START howto_sensor_emr_step] step_checker = EmrStepSensor( task_id='watch_step', - job_flow_id=cluster_creator.output, + job_flow_id=job_flow_creator.output, step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}", ) - # [END howto_sensor_emr_step_sensor] + # [END howto_sensor_emr_step] # [START howto_operator_emr_terminate_job_flow] cluster_remover = EmrTerminateJobFlowOperator( task_id='remove_cluster', - job_flow_id=cluster_creator.output, + job_flow_id=job_flow_creator.output, ) # [END howto_operator_emr_terminate_job_flow] chain( + job_flow_creator, + job_sensor, + cluster_modifier, step_adder, step_checker, cluster_remover, diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py similarity index 82% rename from airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py rename to airflow/providers/amazon/aws/example_dags/example_emr_eks.py index e8932630d9d45..467db6aadba7c 100644 --- a/airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py +++ b/airflow/providers/amazon/aws/example_dags/example_emr_eks.py @@ -19,7 +19,9 @@ from datetime import datetime from airflow import DAG +from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator +from airflow.providers.amazon.aws.sensors.emr import EmrContainerSensor VIRTUAL_CLUSTER_ID = os.getenv("VIRTUAL_CLUSTER_ID", "test-cluster") JOB_ROLE_ARN = os.getenv("JOB_ROLE_ARN", "arn:aws:iam::012345678912:role/emr_eks_default_role") @@ -51,18 +53,13 @@ # [END howto_operator_emr_eks_config] with DAG( - dag_id='example_emr_eks_job', + dag_id='example_emr_eks', schedule_interval=None, start_date=datetime(2021, 1, 1), tags=['example'], catchup=False, ) as dag: - - # An example of how to get the cluster id and arn from an Airflow connection - # VIRTUAL_CLUSTER_ID = '{{ conn.emr_eks.extra_dejson["virtual_cluster_id"] }}' - # JOB_ROLE_ARN = '{{ conn.emr_eks.extra_dejson["job_role_arn"] }}' - - # [START howto_operator_emr_eks_job] + # [START howto_operator_emr_container] job_starter = EmrContainerOperator( task_id="start_job", virtual_cluster_id=VIRTUAL_CLUSTER_ID, @@ -71,5 +68,14 @@ job_driver=JOB_DRIVER_ARG, configuration_overrides=CONFIGURATION_OVERRIDES_ARG, name="pi.py", + wait_for_completion=False, ) - # [END howto_operator_emr_eks_job] + # [END howto_operator_emr_container] + + # [START howto_sensor_emr_container] + job_waiter = EmrContainerSensor( + task_id="job_waiter", virtual_cluster_id=VIRTUAL_CLUSTER_ID, job_id=str(job_starter.output) + ) + # [END howto_sensor_emr_container] + + chain(job_starter, job_waiter) diff --git a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py b/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py deleted file mode 100644 index ab92a3cb21a38..0000000000000 --- a/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import os -from datetime import datetime - -from airflow import DAG -from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator -from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor - -JOB_FLOW_ROLE = os.getenv('EMR_JOB_FLOW_ROLE', 'EMR_EC2_DefaultRole') -SERVICE_ROLE = os.getenv('EMR_SERVICE_ROLE', 'EMR_DefaultRole') - -# [START howto_operator_emr_automatic_steps_config] -SPARK_STEPS = [ - { - 'Name': 'calculate_pi', - 'ActionOnFailure': 'CONTINUE', - 'HadoopJarStep': { - 'Jar': 'command-runner.jar', - 'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'], - }, - } -] - -JOB_FLOW_OVERRIDES = { - 'Name': 'PiCalc', - 'ReleaseLabel': 'emr-5.29.0', - 'Applications': [{'Name': 'Spark'}], - 'Instances': { - 'InstanceGroups': [ - { - 'Name': 'Primary node', - 'Market': 'ON_DEMAND', - 'InstanceRole': 'MASTER', - 'InstanceType': 'm5.xlarge', - 'InstanceCount': 1, - }, - ], - 'KeepJobFlowAliveWhenNoSteps': False, - 'TerminationProtected': False, - }, - 'Steps': SPARK_STEPS, - 'JobFlowRole': JOB_FLOW_ROLE, - 'ServiceRole': SERVICE_ROLE, -} -# [END howto_operator_emr_automatic_steps_config] - - -with DAG( - dag_id='example_emr_job_flow_automatic_steps', - schedule_interval=None, - start_date=datetime(2021, 1, 1), - tags=['example'], - catchup=False, -) as dag: - - # [START howto_operator_emr_create_job_flow] - job_flow_creator = EmrCreateJobFlowOperator( - task_id='create_job_flow', - job_flow_overrides=JOB_FLOW_OVERRIDES, - ) - # [END howto_operator_emr_create_job_flow] - - # [START howto_sensor_emr_job_flow_sensor] - job_sensor = EmrJobFlowSensor( - task_id='check_job_flow', - job_flow_id=job_flow_creator.output, - ) - # [END howto_sensor_emr_job_flow_sensor] diff --git a/airflow/providers/amazon/aws/operators/emr.py b/airflow/providers/amazon/aws/operators/emr.py index a1f3fa753d817..510c77184fa5d 100644 --- a/airflow/providers/amazon/aws/operators/emr.py +++ b/airflow/providers/amazon/aws/operators/emr.py @@ -122,6 +122,10 @@ class EmrContainerOperator(BaseOperator): """ An operator that submits jobs to EMR on EKS virtual clusters. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:EmrContainerOperator` + :param name: The name of the job run. :param virtual_cluster_id: The EMR on EKS virtual cluster ID :param execution_role_arn: The IAM role ARN associated with the job run. @@ -133,6 +137,7 @@ class EmrContainerOperator(BaseOperator): Use this if you want to specify a unique ID to prevent two jobs from getting started. If no token is provided, a UUIDv4 token will be generated for you. :param aws_conn_id: The Airflow connection used for AWS credentials. + :param wait_for_completion: Whether or not to wait in the operator for the job to complete. :param poll_interval: Time (in seconds) to wait between two consecutive calls to check query status on EMR :param max_tries: Maximum number of times to wait for the job run to finish. Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state. @@ -160,6 +165,7 @@ def __init__( configuration_overrides: Optional[dict] = None, client_request_token: Optional[str] = None, aws_conn_id: str = "aws_default", + wait_for_completion: bool = True, poll_interval: int = 30, max_tries: Optional[int] = None, tags: Optional[dict] = None, @@ -174,6 +180,7 @@ def __init__( self.configuration_overrides = configuration_overrides or {} self.aws_conn_id = aws_conn_id self.client_request_token = client_request_token or str(uuid4()) + self.wait_for_completion = wait_for_completion self.poll_interval = poll_interval self.max_tries = max_tries self.tags = tags @@ -198,19 +205,20 @@ def execute(self, context: 'Context') -> Optional[str]: self.client_request_token, self.tags, ) - query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.poll_interval) - - if query_status in EmrContainerHook.FAILURE_STATES: - error_message = self.hook.get_job_failure_reason(self.job_id) - raise AirflowException( - f"EMR Containers job failed. Final state is {query_status}. " - f"query_execution_id is {self.job_id}. Error: {error_message}" - ) - elif not query_status or query_status in EmrContainerHook.INTERMEDIATE_STATES: - raise AirflowException( - f"Final state of EMR Containers job is {query_status}. " - f"Max tries of poll status exceeded, query_execution_id is {self.job_id}." - ) + if self.wait_for_completion: + query_status = self.hook.poll_query_status(self.job_id, self.max_tries, self.poll_interval) + + if query_status in EmrContainerHook.FAILURE_STATES: + error_message = self.hook.get_job_failure_reason(self.job_id) + raise AirflowException( + f"EMR Containers job failed. Final state is {query_status}. " + f"query_execution_id is {self.job_id}. Error: {error_message}" + ) + elif not query_status or query_status in EmrContainerHook.INTERMEDIATE_STATES: + raise AirflowException( + f"Final state of EMR Containers job is {query_status}. " + f"Max tries of poll status exceeded, query_execution_id is {self.job_id}." + ) return self.job_id diff --git a/airflow/providers/amazon/aws/sensors/athena.py b/airflow/providers/amazon/aws/sensors/athena.py index 927f512143362..1186f8cdef23b 100644 --- a/airflow/providers/amazon/aws/sensors/athena.py +++ b/airflow/providers/amazon/aws/sensors/athena.py @@ -37,7 +37,7 @@ class AthenaSensor(BaseSensorOperator): If the query fails, the task will fail. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:AthenaSensor` diff --git a/airflow/providers/amazon/aws/sensors/cloud_formation.py b/airflow/providers/amazon/aws/sensors/cloud_formation.py index fb01bdc7f6827..972dbace376e8 100644 --- a/airflow/providers/amazon/aws/sensors/cloud_formation.py +++ b/airflow/providers/amazon/aws/sensors/cloud_formation.py @@ -36,7 +36,7 @@ class CloudFormationCreateStackSensor(BaseSensorOperator): Waits for a stack to be created successfully on AWS CloudFormation. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:CloudFormationCreateStackSensor` @@ -74,7 +74,7 @@ class CloudFormationDeleteStackSensor(BaseSensorOperator): Waits for a stack to be deleted successfully on AWS CloudFormation. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:CloudFormationDeleteStackSensor` :param stack_name: The name of the stack to wait for (templated) diff --git a/airflow/providers/amazon/aws/sensors/dms.py b/airflow/providers/amazon/aws/sensors/dms.py index 26e6b7148fdcd..0437ee4d98850 100644 --- a/airflow/providers/amazon/aws/sensors/dms.py +++ b/airflow/providers/amazon/aws/sensors/dms.py @@ -91,7 +91,7 @@ class DmsTaskCompletedSensor(DmsTaskBaseSensor): Pokes DMS task until it is completed. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:DmsTaskCompletedSensor` :param replication_task_arn: AWS DMS replication task ARN diff --git a/airflow/providers/amazon/aws/sensors/eks.py b/airflow/providers/amazon/aws/sensors/eks.py index 92ed55da4d31e..76b2703e891d1 100644 --- a/airflow/providers/amazon/aws/sensors/eks.py +++ b/airflow/providers/amazon/aws/sensors/eks.py @@ -121,7 +121,7 @@ class EksFargateProfileStateSensor(BaseSensorOperator): Check the state of an AWS Fargate profile until it reaches the target state or another terminal state. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:EksFargateProfileStateSensor` :param cluster_name: The name of the Cluster which the AWS Fargate profile is attached to. (templated) diff --git a/airflow/providers/amazon/aws/sensors/glacier.py b/airflow/providers/amazon/aws/sensors/glacier.py index e92f5a4326b3a..8e8b74c58c17b 100644 --- a/airflow/providers/amazon/aws/sensors/glacier.py +++ b/airflow/providers/amazon/aws/sensors/glacier.py @@ -38,7 +38,7 @@ class GlacierJobOperationSensor(BaseSensorOperator): Glacier sensor for checking job state. This operator runs only in reschedule mode. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:GlacierJobOperationSensor` :param aws_conn_id: The reference to the AWS connection details diff --git a/airflow/providers/amazon/aws/sensors/rds.py b/airflow/providers/amazon/aws/sensors/rds.py index 3c24c82fbf940..54ee50875e90a 100644 --- a/airflow/providers/amazon/aws/sensors/rds.py +++ b/airflow/providers/amazon/aws/sensors/rds.py @@ -70,7 +70,7 @@ class RdsSnapshotExistenceSensor(RdsBaseSensor): Waits for RDS snapshot with a specific status. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:RdsSnapshotExistenceSensor` :param db_type: Type of the DB - either "instance" or "cluster" @@ -112,7 +112,7 @@ class RdsExportTaskExistenceSensor(RdsBaseSensor): Waits for RDS export task with a specific status. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:RdsExportTaskExistenceSensor` :param export_task_identifier: A unique identifier for the snapshot export task. diff --git a/docs/apache-airflow-providers-amazon/operators/emr.rst b/docs/apache-airflow-providers-amazon/operators/emr.rst index a62b503740dcf..6bb16bd494639 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr.rst @@ -53,10 +53,10 @@ JobFlow configuration To create a job flow on EMR, you need to specify the configuration for the EMR cluster: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr.py :language: python - :start-after: [START howto_operator_emr_automatic_steps_config] - :end-before: [END howto_operator_emr_automatic_steps_config] + :start-after: [START howto_operator_emr_steps_config] + :end-before: [END howto_operator_emr_steps_config] Here we create an EMR single-node Cluster *PiCalc*. It only has a single step *calculate_pi* which calculates the value of ``Pi`` using Spark. The config ``'KeepJobFlowAliveWhenNoSteps': False`` @@ -76,7 +76,7 @@ Create the Job Flow In the following code we are creating a new job flow using the configuration as explained above. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr.py :language: python :dedent: 4 :start-after: [START howto_operator_emr_create_job_flow] @@ -90,7 +90,7 @@ Add Steps to an EMR job flow To add steps to an existing EMR Job flow you can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr.py :language: python :dedent: 4 :start-after: [START howto_operator_emr_add_steps] @@ -104,7 +104,7 @@ Terminate an EMR job flow To terminate an EMR Job Flow you can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr.py :language: python :dedent: 4 :start-after: [START howto_operator_emr_terminate_job_flow] @@ -118,17 +118,15 @@ Modify Amazon EMR container To modify an existing EMR container you can use :class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`. +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_emr_modify_cluster] + :end-before: [END howto_operator_emr_modify_cluster] + Sensors ------- -.. _howto/sensor:EmrContainerSensor: - -Wait on an Amazon EMR container state -===================================== - -To monitor the state of an EMR container you can use -:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor`. - .. _howto/sensor:EmrJobFlowSensor: Wait on an Amazon EMR job flow state @@ -137,11 +135,11 @@ Wait on an Amazon EMR job flow state To monitor the state of an EMR job flow you can use :class:`~airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr.py :language: python :dedent: 4 - :start-after: [START howto_sensor_emr_job_flow_sensor] - :end-before: [END howto_sensor_emr_job_flow_sensor] + :start-after: [START howto_sensor_emr_job_flow] + :end-before: [END howto_sensor_emr_job_flow] .. _howto/sensor:EmrStepSensor: @@ -151,11 +149,11 @@ Wait on an Amazon EMR step state To monitor the state of a step running an existing EMR Job flow you can use :class:`~airflow.providers.amazon.aws.sensors.emr.EmrStepSensor`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr.py :language: python :dedent: 4 - :start-after: [START howto_sensor_emr_step_sensor] - :end-before: [END howto_sensor_emr_step_sensor] + :start-after: [START howto_sensor_emr_step] + :end-before: [END howto_sensor_emr_step] Reference --------- diff --git a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst index b84fbf5ee5dd5..3e9e58fbcacde 100644 --- a/docs/apache-airflow-providers-amazon/operators/emr_eks.rst +++ b/docs/apache-airflow-providers-amazon/operators/emr_eks.rst @@ -23,9 +23,6 @@ Amazon EMR on Amazon EKS provides a deployment option for Amazon EMR that allows you to run open-source big data frameworks on Amazon EKS. -Airflow provides the :class:`~airflow.providers.amazon.aws.operators.emr.EmrContainerOperator` -to submit Apache Spark jobs to your EMR on EKS virtual cluster. - Prerequisite Tasks ------------------ @@ -34,18 +31,18 @@ Prerequisite Tasks Operators --------- -.. _howto/operator:EMRContainersOperators: +.. _howto/operator:EmrContainerOperator: -Run a Spark job on EMR on EKS -============================= +Submit a job to an Amazon EMR virtual cluster +============================================= .. note:: This example assumes that you already have an EMR on EKS virtual cluster configured. See the `EMR on EKS Getting Started guide `__ for more information. -The ``EMRContainerOperator`` will submit a new job to an Amazon EMR on Amazon EKS virtual cluster and wait for -the job to complete. The example job below calculates the mathematical constant ``Pi``. In a +The ``EmrContainerOperator`` will submit a new job to an Amazon EMR on Amazon EKS virtual cluster +The example job below calculates the mathematical constant ``Pi``. In a production job, you would usually refer to a Spark script on Amazon Simple Storage Service (S3). To create a job for Amazon EMR on Amazon EKS, you need to specify your virtual cluster ID, the release of Amazon EMR you @@ -59,28 +56,42 @@ and ``monitoringConfiguration`` to send logs to the ``/aws/emr-eks-spark`` log g Refer to the `EMR on EKS guide `__ for more details on job configuration. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py :language: python :start-after: [START howto_operator_emr_eks_config] :end-before: [END howto_operator_emr_eks_config] - We pass the ``virtual_cluster_id`` and ``execution_role_arn`` values as operator parameters, but you can store them in a connection or provide them in the DAG. Your AWS region should be defined either in the ``aws_default`` connection as ``{"region_name": "us-east-1"}`` or a custom connection name -that gets passed to the operator with the ``aws_conn_id`` parameter. +that gets passed to the operator with the ``aws_conn_id`` parameter. The operator returns the Job ID of the job run. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks_job.py +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py :language: python :dedent: 4 - :start-after: [START howto_operator_emr_eks_job] - :end-before: [END howto_operator_emr_eks_job] + :start-after: [START howto_operator_emr_container] + :end-before: [END howto_operator_emr_container] + +Sensors +------- + +.. _howto/sensor:EmrContainerSensor: -With the ``EmrContainerOperator``, it will wait until the successful completion of the job or raise -an ``AirflowException`` if there is an error. The operator returns the Job ID of the job run. +Wait on an Amazon EMR virtual cluster job +========================================= + +To wait on the status of an Amazon EMR virtual cluster job to reach a terminal state, you can use +:class:`~airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor` + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_eks.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_emr_container] + :end-before: [END howto_sensor_emr_container] Reference --------- +* `AWS boto3 library documentation for EMR Containers `__ * `Amazon EMR on EKS Job runs `__ * `EMR on EKS Best Practices `__ diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 1c1bedf7fcf3b..d0b9e1c060b99 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -433,9 +433,6 @@ class TestAmazonProviderProjectStructure(ExampleCoverageTest): } MISSING_EXAMPLES_FOR_CLASSES = { - # EMR legitimately missing, needs development - 'airflow.providers.amazon.aws.operators.emr.EmrModifyClusterOperator', - 'airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor', # S3 Exasol transfer difficult to test, see: https://github.com/apache/airflow/issues/22632 'airflow.providers.amazon.aws.transfers.exasol_to_s3.ExasolToS3Operator', # Glue Catalog sensor difficult to test