diff --git a/airflow/providers/qubole/example_dags/__init__.py b/airflow/providers/qubole/example_dags/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/airflow/providers/qubole/example_dags/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/docs/apache-airflow-providers-qubole/index.rst b/docs/apache-airflow-providers-qubole/index.rst index 1e5b673d2cc80..81e48e78a09b7 100644 --- a/docs/apache-airflow-providers-qubole/index.rst +++ b/docs/apache-airflow-providers-qubole/index.rst @@ -38,7 +38,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources diff --git a/docs/apache-airflow-providers-qubole/operators/index.rst b/docs/apache-airflow-providers-qubole/operators/index.rst index f7b0aae616f9d..45a5ec2b6ffba 100644 --- a/docs/apache-airflow-providers-qubole/operators/index.rst +++ b/docs/apache-airflow-providers-qubole/operators/index.rst @@ -29,4 +29,4 @@ Qubole Operators .. note:: You can learn how to use Google Cloud integrations by analyzing the - `source code `_ of the particular example DAGs. + `source code `_ of the particular example DAGs. diff --git a/docs/apache-airflow-providers-qubole/operators/qubole.rst b/docs/apache-airflow-providers-qubole/operators/qubole.rst index 77d86aa6b4aa8..162be4025ff83 100644 --- a/docs/apache-airflow-providers-qubole/operators/qubole.rst +++ b/docs/apache-airflow-providers-qubole/operators/qubole.rst @@ -43,7 +43,7 @@ Run Hive command To run query that shows all tables you can use -.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py +.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py :language: python :dedent: 4 :start-after: [START howto_operator_qubole_run_hive_query] @@ -51,7 +51,7 @@ To run query that shows all tables you can use Also you can run script that locates in the bucket by passing path to query file -.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py +.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py :language: python :dedent: 4 :start-after: [START howto_operator_qubole_run_hive_script] @@ -62,7 +62,7 @@ Run Hadoop command To run jar file in your Hadoop cluster use -.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py +.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py :language: python :dedent: 4 :start-after: [START howto_operator_qubole_run_hadoop_jar] @@ -73,7 +73,7 @@ Run Pig command To run script script in *Pig Latin* in your Hadoop cluster use -.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py +.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py :language: python :dedent: 4 :start-after: [START howto_operator_qubole_run_pig_script] @@ -84,7 +84,7 @@ Run Shell command To run Shell-script script use -.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py +.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py :language: python :dedent: 4 :start-after: [START howto_operator_qubole_run_shell_script] @@ -95,7 +95,7 @@ Run Presto command To run query using Presto use -.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py +.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py :language: python :dedent: 4 :start-after: [START howto_operator_qubole_run_presto_query] @@ -106,7 +106,7 @@ Run DB commands To run query as `DbTap `_ use -.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py +.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py :language: python :dedent: 4 :start-after: [START howto_operator_qubole_run_db_tap_query] @@ -114,7 +114,7 @@ To run query as `DbTap > db_import >> spark_cmd >> join -with DAG( - dag_id='example_qubole_sensor', - schedule_interval=None, - start_date=START_DATE, - tags=['example'], -) as dag2: - dag2.doc_md = textwrap.dedent( - """ - This is only an example DAG to highlight usage of QuboleSensor in various scenarios, - some of these tasks may or may not work based on your QDS account setup. + from tests.system.utils.watcher import watcher - Run a shell command from Qubole Analyze against your Airflow cluster with following to - trigger it manually `airflow dags trigger example_qubole_sensor`. + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() - *Note: Make sure that connection `qubole_default` is properly set before running - this example.* - """ - ) - # [START howto_sensor_qubole_run_file_sensor] - check_s3_file = QuboleFileSensor( - task_id='check_s3_file', - poke_interval=60, - timeout=600, - data={ - "files": [ - "s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar", - "s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv", - ] # will check for availability of all the files in array - }, - ) - # [END howto_sensor_qubole_run_file_sensor] - - # [START howto_sensor_qubole_run_partition_sensor] - check_hive_partition = QubolePartitionSensor( - task_id='check_hive_partition', - poke_interval=10, - timeout=60, - data={ - "schema": "default", - "table": "my_partitioned_table", - "columns": [ - {"column": "month", "values": ["{{ ds.split('-')[1] }}"]}, - {"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]}, - ], # will check for partitions like [month=12/day=12,month=12/day=13] - }, - ) - # [END howto_sensor_qubole_run_partition_sensor] +from tests.system.utils import get_test_run # noqa: E402 - check_s3_file >> check_hive_partition +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/qubole/example_qubole_sensors.py b/tests/system/providers/qubole/example_qubole_sensors.py new file mode 100644 index 0000000000000..2ee52276f56c1 --- /dev/null +++ b/tests/system/providers/qubole/example_qubole_sensors.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import textwrap +from datetime import datetime + +from airflow import DAG +from airflow.providers.qubole.sensors.qubole import QuboleFileSensor, QubolePartitionSensor + +START_DATE = datetime(2021, 1, 1) + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_qubole_sensor" + + +with DAG( + dag_id=DAG_ID, + schedule_interval=None, + start_date=START_DATE, + tags=['example'], +) as dag: + dag.doc_md = textwrap.dedent( + """ + This is only an example DAG to highlight usage of QuboleSensor in various scenarios, + some of these tasks may or may not work based on your QDS account setup. + + Run a shell command from Qubole Analyze against your Airflow cluster with following to + trigger it manually `airflow dags trigger example_qubole_sensor`. + + *Note: Make sure that connection `qubole_default` is properly set before running + this example.* + """ + ) + + # [START howto_sensor_qubole_run_file_sensor] + check_s3_file = QuboleFileSensor( + task_id='check_s3_file', + poke_interval=60, + timeout=600, + data={ + "files": [ + "s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar", + "s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv", + ] # will check for availability of all the files in array + }, + ) + # [END howto_sensor_qubole_run_file_sensor] + + # [START howto_sensor_qubole_run_partition_sensor] + check_hive_partition = QubolePartitionSensor( + task_id='check_hive_partition', + poke_interval=10, + timeout=60, + data={ + "schema": "default", + "table": "my_partitioned_table", + "columns": [ + {"column": "month", "values": ["{{ ds.split('-')[1] }}"]}, + {"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]}, + ], # will check for partitions like [month=12/day=12,month=12/day=13] + }, + ) + # [END howto_sensor_qubole_run_partition_sensor] + + check_s3_file >> check_hive_partition + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)