diff --git a/airflow/providers/cncf/kubernetes/example_dags/__init__.py b/airflow/providers/cncf/kubernetes/example_dags/__init__.py deleted file mode 100644 index 217e5db960782..0000000000000 --- a/airflow/providers/cncf/kubernetes/example_dags/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/docs/apache-airflow-providers-cncf-kubernetes/index.rst b/docs/apache-airflow-providers-cncf-kubernetes/index.rst index 57ddb81ae6965..a5f6ec3c6d50d 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/index.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/index.rst @@ -39,7 +39,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst index 92e067c51d6b4..1776ea295633b 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst @@ -89,7 +89,7 @@ Using this method will ensure correctness and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the :class:`~airflow.kubernetes.secret.Secret` class to simplify the process of generating secret volumes/env variables. -.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py :language: python :start-after: [START howto_operator_k8s_cluster_resources] :end-before: [END howto_operator_k8s_cluster_resources] @@ -122,7 +122,7 @@ Create the Secret using ``kubectl``: Then use it in your pod like so: -.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py :language: python :start-after: [START howto_operator_k8s_private_image] :end-before: [END howto_operator_k8s_private_image] @@ -136,7 +136,7 @@ alongside the Pod. The Pod must write the XCom value into this location at the ` See the following example on how this occurs: -.. exampleinclude:: /../../airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes.py :language: python :start-after: [START howto_operator_k8s_write_xcom] :end-before: [END howto_operator_k8s_write_xcom] diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/tests/system/providers/cncf/kubernetes/example_kubernetes.py similarity index 91% rename from airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py rename to tests/system/providers/cncf/kubernetes/example_kubernetes.py index b65dae9f4e52a..d77d0889d6aac 100644 --- a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py +++ b/tests/system/providers/cncf/kubernetes/example_kubernetes.py @@ -19,6 +19,7 @@ This is an example dag for using the KubernetesPodOperator. """ +import os from datetime import datetime from kubernetes.client import models as k8s @@ -97,6 +98,8 @@ # [END howto_operator_k8s_cluster_resources] +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_kubernetes_operator" with DAG( dag_id='example_kubernetes_operator', @@ -158,6 +161,17 @@ bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) - # [END howto_operator_k8s_write_xcom] write_xcom >> pod_task_xcom_result + # [END howto_operator_k8s_write_xcom] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py b/tests/system/providers/cncf/kubernetes/example_spark_kubernetes.py similarity index 64% rename from airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py rename to tests/system/providers/cncf/kubernetes/example_spark_kubernetes.py index d01d4b1328c68..20cba7a9f1f7e 100644 --- a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py +++ b/tests/system/providers/cncf/kubernetes/example_spark_kubernetes.py @@ -25,6 +25,7 @@ https://github.com/GoogleCloudPlatform/spark-on-k8s-operator """ +import os from datetime import datetime, timedelta # [START import_module] @@ -40,27 +41,43 @@ # [START instantiate_dag] -dag = DAG( - 'spark_pi', + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "spark_pi" + +with DAG( + DAG_ID, default_args={'max_active_runs': 1}, description='submit spark-pi as sparkApplication on kubernetes', schedule_interval=timedelta(days=1), start_date=datetime(2021, 1, 1), catchup=False, -) +) as dag: + # [START SparkKubernetesOperator_DAG] + t1 = SparkKubernetesOperator( + task_id='spark_pi_submit', + namespace="default", + application_file="example_spark_kubernetes_spark_pi.yaml", + do_xcom_push=True, + dag=dag, + ) + + t2 = SparkKubernetesSensor( + task_id='spark_pi_monitor', + namespace="default", + application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", + dag=dag, + ) + t1 >> t2 + + # [END SparkKubernetesOperator_DAG] + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() -t1 = SparkKubernetesOperator( - task_id='spark_pi_submit', - namespace="default", - application_file="example_spark_kubernetes_spark_pi.yaml", - do_xcom_push=True, - dag=dag, -) +from tests.system.utils import get_test_run # noqa: E402 -t2 = SparkKubernetesSensor( - task_id='spark_pi_monitor', - namespace="default", - application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", - dag=dag, -) -t1 >> t2 +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes_spark_pi.yaml b/tests/system/providers/cncf/kubernetes/example_spark_kubernetes_spark_pi.yaml similarity index 100% rename from airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes_spark_pi.yaml rename to tests/system/providers/cncf/kubernetes/example_spark_kubernetes_spark_pi.yaml