diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 8bab15352577a..19df2a5f706c7 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -728,7 +728,7 @@ hashicorp google microsoft.azure google,oracle,sftp mysql amazon,presto,trino,vertica postgres amazon -presto google +presto google,slack salesforce tableau sftp ssh slack http diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json index 79a58e5bc2ef1..dd58aac77d7d5 100644 --- a/airflow/providers/dependencies.json +++ b/airflow/providers/dependencies.json @@ -74,7 +74,8 @@ "amazon" ], "presto": [ - "google" + "google", + "slack" ], "salesforce": [ "tableau" diff --git a/airflow/providers/presto/provider.yaml b/airflow/providers/presto/provider.yaml index 66c493e3f6f7c..e07a77c94006d 100644 --- a/airflow/providers/presto/provider.yaml +++ b/airflow/providers/presto/provider.yaml @@ -53,6 +53,11 @@ transfers: how-to-guide: /docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst python-module: airflow.providers.presto.transfers.gcs_to_presto + - source-integration-name: Presto + target-integration-name: Slack + how-to-guide: /docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst + python-module: airflow.providers.presto.transfers.presto_to_slack + hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+ - airflow.providers.presto.hooks.presto.PrestoHook diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py new file mode 100644 index 0000000000000..6dd0ecb3ab137 --- /dev/null +++ b/airflow/providers/presto/transfers/presto_to_slack.py @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union + +from pandas import DataFrame +from tabulate import tabulate + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.presto.hooks.presto import PrestoHook +from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class PrestoToSlackOperator(BaseOperator): + """ + Executes a single SQL statement in Presto and sends the results to Slack. The results of the query are + rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{ + results_df }}'. The 'results_df' variable name can be changed by specifying a different + 'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to + allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df | + tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PrestoToSlackOperator` + + :param sql: The SQL statement to execute on Presto (templated) + :param slack_message: The templated Slack message to send with the data returned from Presto. + You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the + SQL results + :param presto_conn_id: destination presto connection + :param slack_conn_id: The connection id for Slack + :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' + :param parameters: The parameters to pass to the SQL query + :param slack_token: The token to use to authenticate to Slack. If this is not provided, the + 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id + :param slack_channel: The channel to send message. Override default from Slack connection. + """ + + template_fields: Sequence[str] = ('sql', 'slack_message', 'slack_channel') + template_ext: Sequence[str] = ('.sql', '.jinja', '.j2') + template_fields_renderers = {"sql": "sql", "slack_message": "jinja"} + times_rendered = 0 + + def __init__( + self, + *, + sql: str, + slack_message: str, + presto_conn_id: str = 'presto_default', + slack_conn_id: str = 'slack_default', + results_df_name: str = 'results_df', + parameters: Optional[Union[Iterable, Mapping]] = None, + slack_token: Optional[str] = None, + slack_channel: Optional[str] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.presto_conn_id = presto_conn_id + self.sql = sql + self.parameters = parameters + self.slack_conn_id = slack_conn_id + self.slack_token = slack_token + self.slack_message = slack_message + self.results_df_name = results_df_name + self.slack_channel = slack_channel + + def _get_query_results(self) -> DataFrame: + presto_hook = self._get_presto_hook() + + self.log.info('Running SQL query: %s', self.sql) + df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters) + return df + + def _render_and_send_slack_message(self, context, df) -> None: + # Put the dataframe into the context and render the JINJA template fields + context[self.results_df_name] = df + self.render_template_fields(context) + + slack_hook = self._get_slack_hook() + self.log.info('Sending slack message: %s', self.slack_message) + slack_hook.execute() + + def _get_presto_hook(self) -> PrestoHook: + return PrestoHook(presto_conn_id=self.presto_conn_id) + + def _get_slack_hook(self) -> SlackWebhookHook: + return SlackWebhookHook( + http_conn_id=self.slack_conn_id, + message=self.slack_message, + webhook_token=self.slack_token, + slack_channel=self.slack_channel, + ) + + def render_template_fields(self, context, jinja_env=None) -> None: + # If this is the first render of the template fields, exclude slack_message from rendering since + # the presto results haven't been retrieved yet. + if self.times_rendered == 0: + fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields) + else: + fields_to_render = self.template_fields + + if not jinja_env: + jinja_env = self.get_template_env() + + # Add the tabulate library into the JINJA environment + jinja_env.filters['tabulate'] = tabulate + + self._do_render_template_fields(self, fields_to_render, context, jinja_env, set()) + self.times_rendered += 1 + + def execute(self, context: 'Context') -> None: + if not self.sql.strip(): + raise AirflowException("Expected 'sql' parameter is missing.") + if not self.slack_message.strip(): + raise AirflowException("Expected 'slack_message' parameter is missing.") + + df = self._get_query_results() + + self._render_and_send_slack_message(context, df) + + self.log.debug('Finished sending Presto data to Slack') diff --git a/docs/apache-airflow-providers-presto/index.rst b/docs/apache-airflow-providers-presto/index.rst index 9fbd1b9347728..b87b42d5c8c15 100644 --- a/docs/apache-airflow-providers-presto/index.rst +++ b/docs/apache-airflow-providers-presto/index.rst @@ -28,6 +28,12 @@ Content PrestoTransferOperator types +.. toctree:: + :maxdepth: 1 + :caption: Guides + + PrestoToSlackOperator types + .. toctree:: :maxdepth: 1 :caption: References @@ -100,6 +106,7 @@ You can install such cross-provider dependencies when installing from PyPI. For Dependent package Extra ==================================================================================================== ========== `apache-airflow-providers-google `_ ``google`` +`apache-airflow-providers-slack `_ ``slack`` ==================================================================================================== ========== Downloading official packages diff --git a/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst b/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst new file mode 100644 index 0000000000000..5dded6bd0e9ac --- /dev/null +++ b/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst @@ -0,0 +1,38 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:PrestoToSlackOperator: + +PrestoToSlackOperator +======================== + +Use the :class:`~airflow.providers.presto.transfers.presto_to_slack.presto_to_slack` to post messages to predefined Slack +channels. + +Using the Operator +^^^^^^^^^^^^^^^^^^ + +This operator will execute a custom query in Presto and publish a Slack message that can be formatted +and contain the resulting dataset (e.g. ASCII formatted dataframe). + +An example usage of the PrestoToSlackOperator is as follows: + +.. exampleinclude:: /../../tests/system/providers/presto/example_presto_to_slack.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_presto_to_slack] + :end-before: [END howto_operator_presto_to_slack] diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py new file mode 100644 index 0000000000000..78aa2867ec375 --- /dev/null +++ b/tests/providers/presto/transfers/test_presto_to_slack.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from airflow.models import DAG +from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator +from airflow.utils import timezone +from tests.test_utils.db import clear_db_runs + +TEST_DAG_ID = 'presto_to_slack_unit_test' +DEFAULT_DATE = timezone.datetime(2022, 1, 1) + + +class TestPrestoToSlackOperator: + def setup_class(self): + clear_db_runs() + + def setup_method(self): + self.example_dag = DAG('unit_test_dag_presto_to_slack', start_date=DEFAULT_DATE) + + def teardown_method(self): + clear_db_runs() + + @staticmethod + def _construct_operator(**kwargs): + operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs) + return operator + + @mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook') + @mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook') + def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class): + operator_args = { + 'presto_conn_id': 'presto_connection', + 'slack_conn_id': 'slack_connection', + 'sql': "sql {{ ds }}", + 'results_df_name': 'xxxx', + 'parameters': ['1', '2', '3'], + 'slack_message': 'message: {{ ds }}, {{ xxxx }}', + 'slack_token': 'test_token', + 'slack_channel': 'my_channel', + 'dag': self.example_dag, + } + presto_to_slack_operator = self._construct_operator(**operator_args) + presto_hook = mock_presto_hook_class.return_value + presto_hook.get_pandas_df.return_value = '1234' + slack_webhook_hook = mock_slack_hook_class.return_value + presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + mock_presto_hook_class.assert_called_once_with( + presto_conn_id='presto_connection', + ) + + presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01', parameters=['1', '2', '3']) + + mock_slack_hook_class.assert_called_once_with( + http_conn_id='slack_connection', + message='message: 2022-01-01, 1234', + webhook_token='test_token', + slack_channel='my_channel', + ) + + slack_webhook_hook.execute.assert_called_once() diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/presto/example_presto_to_slack.py new file mode 100644 index 0000000000000..91ab9c42e6f99 --- /dev/null +++ b/tests/system/providers/presto/example_presto_to_slack.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example DAG using PrestoToSlackOperator. +""" + +import os +from datetime import datetime + +from airflow import models +from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator + +PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_presto_to_slack" + +with models.DAG( + dag_id=DAG_ID, + schedule_interval='@once', # Override to match your needs + start_date=datetime(2022, 1, 1), + catchup=False, + tags=["example"], +) as dag: + # [START howto_operator_presto_to_slack] + PrestoToSlackOperator( + task_id="presto_to_slack", + sql=f"SELECT col FROM {PRESTO_TABLE}", + slack_channel="my_channel", + slack_message="message: {{ ds }}, {{ results_df }}", + ) + # [END howto_operator_presto_to_slack] + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)