diff --git a/airflow/providers/slack/CHANGELOG.rst b/airflow/providers/slack/CHANGELOG.rst index 6bfb86c6fdce1..51534738090ae 100644 --- a/airflow/providers/slack/CHANGELOG.rst +++ b/airflow/providers/slack/CHANGELOG.rst @@ -24,6 +24,16 @@ Changelog --------- +Breaking changes +~~~~~~~~~~~~~~~~ + +* The hook class :class:`airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook` not inherit from + :class:`airflow.providers.http.hooks.http.HttpHook` anymore. In practice the only impact on + user-defined classes based on **SlackWebhookHook** and you use attributes from **HttpHook**. +* Drop support deprecated ``webhook_token`` parameter in + :ref:`Slack Incoming Webhook Connection ` Extra. + + 5.1.0 ..... diff --git a/airflow/providers/slack/hooks/slack.py b/airflow/providers/slack/hooks/slack.py index 5d280a6c5b5b3..60d6ce78a9b7a 100644 --- a/airflow/providers/slack/hooks/slack.py +++ b/airflow/providers/slack/hooks/slack.py @@ -49,7 +49,7 @@ class SlackHook(BaseHook): .. warning:: This hook intend to use `Slack API` connection - and might not work correctly with `Slack Webhook` and `HTTP` connections. + and might not work correctly with `Slack Incoming Webhook` and `HTTP` connections. Takes both Slack API token directly and connection that has Slack API token. If both are supplied, Slack API token will be used. Also exposes the rest of slack.WebClient args. @@ -74,8 +74,8 @@ class SlackHook(BaseHook): and receive a response from Slack. If not set than default WebClient value will use. :param base_url: A string representing the Slack API base URL. If not set than default WebClient BASE_URL will use (``https://www.slack.com/api/``). - :param proxy: Proxy to make the Slack Incoming Webhook call. - :param retry_handlers: List of handlers to customize retry logic in WebClient. + :param proxy: Proxy to make the Slack API call. + :param retry_handlers: List of handlers to customize retry logic in ``slack_sdk.WebClient``. :param token: (deprecated) Slack API Token. """ diff --git a/airflow/providers/slack/hooks/slack_webhook.py b/airflow/providers/slack/hooks/slack_webhook.py index af28f15c671d7..d6659da381ebf 100644 --- a/airflow/providers/slack/hooks/slack_webhook.py +++ b/airflow/providers/slack/hooks/slack_webhook.py @@ -19,144 +19,454 @@ import json import warnings +from functools import wraps +from typing import TYPE_CHECKING, Any, Callable +from urllib.parse import urlparse +from slack_sdk import WebhookClient + +from airflow.compat.functools import cached_property from airflow.exceptions import AirflowException -from airflow.providers.http.hooks.http import HttpHook +from airflow.hooks.base import BaseHook +from airflow.models import Connection +from airflow.providers.slack.utils import ConnectionExtraConfig, prefixed_extra_field +from airflow.utils.log.secrets_masker import mask_secret + +if TYPE_CHECKING: + from slack_sdk.http_retry import RetryHandler + +DEFAULT_SLACK_WEBHOOK_ENDPOINT = "https://hooks.slack.com/services" +LEGACY_INTEGRATION_PARAMS = ("channel", "username", "icon_emoji", "icon_url") + +def check_webhook_response(func: Callable) -> Callable: + """Function decorator that check WebhookResponse and raise an error if status code != 200.""" -class SlackWebhookHook(HttpHook): + @wraps(func) + def wrapper(*args, **kwargs) -> Callable: + resp = func(*args, **kwargs) + if resp.status_code != 200: + raise AirflowException( + f"Response body: {resp.body!r}, Status Code: {resp.status_code}. " + "See: https://api.slack.com/messaging/webhooks#handling_errors" + ) + return resp + + return wrapper + + +class SlackWebhookHook(BaseHook): """ - This hook allows you to post messages to Slack using incoming webhooks. - Takes both Slack webhook token directly and connection that has Slack webhook token. - If both supplied, http_conn_id will be used as base_url, - and webhook_token will be taken as endpoint, the relative path of the url. + This class provide a thin wrapper around the ``slack_sdk.WebhookClient``. + This hook allows you to post messages to Slack by using Incoming Webhooks. + + .. seealso:: + - :ref:`Slack Incoming Webhook connection ` + - https://api.slack.com/messaging/webhooks + - https://slack.dev/python-slack-sdk/webhook/index.html + + .. note:: + You cannot override the default channel (chosen by the user who installed your app), + username, or icon when you're using Incoming Webhooks to post messages. + Instead, these values will always inherit from the associated Slack App configuration + (`link `_). + It is possible to change this values only in `Legacy Slack Integration Incoming Webhook + `_. .. warning:: - This hook intend to use `Slack Webhook` connection + This hook intend to use `Slack Incoming Webhook` connection and might not work correctly with `Slack API` connection. - Each Slack webhook token can be pre-configured to use a specific channel, username and - icon. You can override these defaults in this hook. - - :param http_conn_id: connection that has Slack webhook token in the password field - :param webhook_token: Slack webhook token - :param message: The message you want to send on Slack - :param attachments: The attachments to send on Slack. Should be a list of - dictionaries representing Slack attachments. - :param blocks: The blocks to send on Slack. Should be a list of - dictionaries representing Slack blocks. - :param channel: The channel the message should be posted to - :param username: The username to post to slack with - :param icon_emoji: The emoji to use as icon for the user posting to Slack - :param icon_url: The icon image URL string to use in place of the default icon. - :param link_names: Whether or not to find and link channel and usernames in your - message - :param proxy: Proxy to use to make the Slack webhook call + Examples: + .. code-block:: python + + # Create hook + hook = SlackWebhookHook(slack_webhook_conn_id="slack_default") + + # Post message in Slack channel by JSON formatted message + # See: https://api.slack.com/messaging/webhooks#posting_with_webhooks + hook.send_dict({"text": "Hello world!"}) + + # Post simple message in Slack channel + hook.send_text("Hello world!") + + # Use ``slack_sdk.WebhookClient`` + hook.client.send(text="Hello world!") + + :param slack_webhook_conn_id: Slack Incoming Webhook connection id + that has Incoming Webhook token in the password field. + :param timeout: The maximum number of seconds the client will wait to connect + and receive a response from Slack. If not set than default WebhookClient value will use. + :param proxy: Proxy to make the Slack Incoming Webhook call. + :param retry_handlers: List of handlers to customize retry logic in ``slack_sdk.WebhookClient``. + :param webhook_token: (deprecated) Slack Incoming Webhook token. + Use instead Slack Incoming Webhook connection password field. """ - conn_name_attr = 'http_conn_id' + conn_name_attr = 'slack_webhook_conn_id' default_conn_name = 'slack_default' conn_type = 'slackwebhook' - hook_name = 'Slack Webhook' + hook_name = 'Slack Incoming Webhook' def __init__( self, - http_conn_id=None, - webhook_token=None, - message="", - attachments=None, - blocks=None, - channel=None, - username=None, - icon_emoji=None, - icon_url=None, - link_names=False, - proxy=None, - *args, + slack_webhook_conn_id: str | None = None, + webhook_token: str | None = None, + timeout: int | None = None, + proxy: str | None = None, + retry_handlers: list[RetryHandler] | None = None, **kwargs, ): - super().__init__(http_conn_id=http_conn_id, *args, **kwargs) - self.webhook_token = self._get_token(webhook_token, http_conn_id) - self.message = message - self.attachments = attachments - self.blocks = blocks - self.channel = channel - self.username = username - self.icon_emoji = icon_emoji - self.icon_url = icon_url - self.link_names = link_names - self.proxy = proxy + super().__init__() - def _get_token(self, token: str, http_conn_id: str | None) -> str: - """ - Given either a manually set token or a conn_id, return the webhook_token to use. + http_conn_id = kwargs.pop("http_conn_id", None) + if http_conn_id: + warnings.warn( + 'Parameter `http_conn_id` is deprecated. Please use `slack_webhook_conn_id` instead.', + DeprecationWarning, + stacklevel=2, + ) + if slack_webhook_conn_id: + raise AirflowException("You cannot provide both `slack_webhook_conn_id` and `http_conn_id`.") + slack_webhook_conn_id = http_conn_id - :param token: The manually provided token - :param http_conn_id: The conn_id provided - :return: webhook_token to use - :rtype: str - """ - if token: - return token - elif http_conn_id: - conn = self.get_connection(http_conn_id) + if not slack_webhook_conn_id and not webhook_token: + raise AirflowException("Either `slack_webhook_conn_id` or `webhook_token` should be provided.") + if webhook_token: + mask_secret(webhook_token) + warnings.warn( + "Provide `webhook_token` as hook argument deprecated by security reason and will be removed " + "in a future releases. Please specify it in `Slack Webhook` connection.", + DeprecationWarning, + stacklevel=2, + ) + if not slack_webhook_conn_id: + warnings.warn( + "You have not set parameter `slack_webhook_conn_id`. Currently `Slack Incoming Webhook` " + "connection id optional but in a future release it will mandatory.", + FutureWarning, + stacklevel=2, + ) - if getattr(conn, 'password', None): - return conn.password - else: - extra = conn.extra_dejson - web_token = extra.get('webhook_token', '') + self.slack_webhook_conn_id = slack_webhook_conn_id + self.timeout = timeout + self.proxy = proxy + self.retry_handlers = retry_handlers + self._webhook_token = webhook_token - if web_token: + # Compatibility with previous version of SlackWebhookHook + deprecated_class_attrs = [] + for deprecated_attr in ( + "message", + "attachments", + "blocks", + "channel", + "username", + "icon_emoji", + "icon_url", + "link_names", + ): + if deprecated_attr in kwargs: + deprecated_class_attrs.append(deprecated_attr) + setattr(self, deprecated_attr, kwargs.pop(deprecated_attr)) + if deprecated_attr == "message": + # Slack WebHook Post Request not expected `message` as field, + # so we also set "text" attribute which will check by SlackWebhookHook._resolve_argument + self.text = getattr(self, deprecated_attr) + elif deprecated_attr == "link_names": warnings.warn( - "'webhook_token' in 'extra' is deprecated. Please use 'password' field", - DeprecationWarning, + "`link_names` has no affect, if you want to mention user see: " + "https://api.slack.com/reference/surfaces/formatting#mentioning-users", + UserWarning, stacklevel=2, ) - return web_token + if deprecated_class_attrs: + warnings.warn( + f"Provide {','.join(repr(a) for a in deprecated_class_attrs)} as hook argument(s) " + f"is deprecated and will be removed in a future releases. " + f"Please specify attributes in `{self.__class__.__name__}.send` method instead.", + DeprecationWarning, + stacklevel=2, + ) + + self.extra_client_args = kwargs + + @cached_property + def client(self) -> WebhookClient: + """Get the underlying slack_sdk.webhook.WebhookClient (cached).""" + return WebhookClient(**self._get_conn_params()) + + def get_conn(self) -> WebhookClient: + """Get the underlying slack_sdk.webhook.WebhookClient (cached).""" + return self.client + + @cached_property + def webhook_token(self) -> str: + """Return Slack Webhook Token URL.""" + warnings.warn( + "`SlackHook.webhook_token` property deprecated and will be removed in a future releases.", + DeprecationWarning, + stacklevel=2, + ) + return self._get_conn_params()["url"] + + def _get_conn_params(self) -> dict[str, Any]: + """Fetch connection params as a dict and merge it with hook parameters.""" + default_schema, _, default_host = DEFAULT_SLACK_WEBHOOK_ENDPOINT.partition("://") + if self.slack_webhook_conn_id: + conn = self.get_connection(self.slack_webhook_conn_id) else: - raise AirflowException('Cannot get token: No valid Slack webhook token nor conn_id supplied') + # If slack_webhook_conn_id not specified, then use connection with default schema and host + conn = Connection( + conn_id=None, conn_type=self.conn_type, host=default_schema, password=default_host + ) + extra_config = ConnectionExtraConfig( + conn_type=self.conn_type, + conn_id=conn.conn_id, + extra=conn.extra_dejson, + ) + conn_params: dict[str, Any] = {"retry_handlers": self.retry_handlers} + + webhook_token = None + if self._webhook_token: + self.log.debug("Retrieving Slack Webhook Token from hook attribute.") + webhook_token = self._webhook_token + elif conn.conn_id: + if conn.password: + self.log.debug( + "Retrieving Slack Webhook Token from Connection ID %r password.", + self.slack_webhook_conn_id, + ) + webhook_token = conn.password + + webhook_token = webhook_token or "" + if not webhook_token and not conn.host: + raise AirflowException("Cannot get token: No valid Slack token nor valid Connection ID supplied.") + elif webhook_token and "://" in webhook_token: + self.log.debug("Retrieving Slack Webhook Token URL from webhook token.") + url = webhook_token + else: + self.log.debug("Constructing Slack Webhook Token URL.") + if conn.host and "://" in conn.host: + base_url = conn.host + else: + schema = conn.schema if conn.schema else default_schema + host = conn.host if conn.host else default_host + base_url = f"{schema}://{host}" + + base_url = base_url.rstrip("/") + if not webhook_token: + parsed_token = (urlparse(base_url).path or "").strip("/") + if base_url == DEFAULT_SLACK_WEBHOOK_ENDPOINT or not parsed_token: + # Raise an error in case of password not specified and + # 1. Result of constructing base_url equal https://hooks.slack.com/services + # 2. Empty url path, e.g. if base_url = https://hooks.slack.com + raise AirflowException( + "Cannot get token: No valid Slack token nor valid Connection ID supplied." + ) + mask_secret(parsed_token) + warnings.warn( + f"Found Slack Webhook Token URL in Connection {conn.conn_id!r} `host` " + "and `password` field is empty. This behaviour deprecated " + "and could expose you token in the UI and will be removed in a future releases.", + DeprecationWarning, + stacklevel=2, + ) + url = (base_url.rstrip("/") + "/" + webhook_token.lstrip("/")).rstrip("/") + + conn_params["url"] = url + # Merge Hook parameters with Connection config + conn_params.update( + { + "timeout": self.timeout or extra_config.getint("timeout", default=None), + "proxy": self.proxy or extra_config.get("proxy", default=None), + } + ) + # Add additional client args + conn_params.update(self.extra_client_args) + if "logger" not in conn_params: + conn_params["logger"] = self.log + + return {k: v for k, v in conn_params.items() if v is not None} + + def _resolve_argument(self, name: str, value): + """ + Resolve message parameters. + + .. note:: + This method exist for compatibility and merge instance class attributes with + method attributes and not be required when assign class attributes to message + would completely remove. + """ + if value is None and name in ( + "text", + "attachments", + "blocks", + "channel", + "username", + "icon_emoji", + "icon_url", + "link_names", + ): + return getattr(self, name, None) - def _build_slack_message(self) -> str: + return value + + @check_webhook_response + def send_dict(self, body: dict[str, Any] | str, *, headers: dict[str, str] | None = None): """ - Construct the Slack message. All relevant parameters are combined here to a valid - Slack json message. + Performs a Slack Incoming Webhook request with given JSON data block. - :return: Slack message to send - :rtype: str + :param body: JSON data structure, expected dict or JSON-string. + :param headers: Request headers for this request. """ - cmd = {} - - if self.channel: - cmd['channel'] = self.channel - if self.username: - cmd['username'] = self.username - if self.icon_emoji: - cmd['icon_emoji'] = self.icon_emoji - if self.icon_url: - cmd['icon_url'] = self.icon_url - if self.link_names: - cmd['link_names'] = 1 - if self.attachments: - cmd['attachments'] = self.attachments - if self.blocks: - cmd['blocks'] = self.blocks - - cmd['text'] = self.message - return json.dumps(cmd) + if isinstance(body, str): + try: + body = json.loads(body) + except json.JSONDecodeError as err: + raise AirflowException( + f"Body expected valid JSON string, got {body!r}. Original error:\n * {err}" + ) from None + + if not isinstance(body, dict): + raise TypeError(f"Body expected dictionary, got {type(body).__name__}.") + + if any(legacy_attr in body for legacy_attr in ("channel", "username", "icon_emoji", "icon_url")): + warnings.warn( + "You cannot override the default channel (chosen by the user who installed your app), " + "username, or icon when you're using Incoming Webhooks to post messages. " + "Instead, these values will always inherit from the associated Slack app configuration. " + "See: https://api.slack.com/messaging/webhooks#advanced_message_formatting. " + "It is possible to change this values only in Legacy Slack Integration Incoming Webhook: " + "https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations", + UserWarning, + stacklevel=2, + ) + + return self.client.send_dict(body, headers=headers) + + def send( + self, + *, + text: str | None = None, + attachments: list[dict[str, Any]] | None = None, + blocks: list[dict[str, Any]] | None = None, + response_type: str | None = None, + replace_original: bool | None = None, + delete_original: bool | None = None, + unfurl_links: bool | None = None, + unfurl_media: bool | None = None, + headers: dict[str, str] | None = None, + **kwargs, + ): + """ + Performs a Slack Incoming Webhook request with given arguments. + + :param text: The text message + (even when having blocks, setting this as well is recommended as it works as fallback). + :param attachments: A collection of attachments. + :param blocks: A collection of Block Kit UI components. + :param response_type: The type of message (either 'in_channel' or 'ephemeral'). + :param replace_original: True if you use this option for response_url requests. + :param delete_original: True if you use this option for response_url requests. + :param unfurl_links: Option to indicate whether text url should unfurl. + :param unfurl_media: Option to indicate whether media url should unfurl. + :param headers: Request headers for this request. + """ + body = { + "text": self._resolve_argument("text", text), + "attachments": self._resolve_argument("attachments", attachments), + "blocks": self._resolve_argument("blocks", blocks), + "response_type": response_type, + "replace_original": replace_original, + "delete_original": delete_original, + "unfurl_links": unfurl_links, + "unfurl_media": unfurl_media, + # Legacy Integration Parameters + **{lip: self._resolve_argument(lip, kwargs.pop(lip, None)) for lip in LEGACY_INTEGRATION_PARAMS}, + } + if kwargs: + warnings.warn( + f"Found unexpected keyword-argument(s) {', '.join(repr(k) for k in kwargs)} " + "in `send` method. This argument(s) have no effect.", + UserWarning, + stacklevel=2, + ) + body = {k: v for k, v in body.items() if v is not None} + return self.send_dict(body=body, headers=headers) + + def send_text( + self, + text: str, + *, + unfurl_links: bool | None = None, + unfurl_media: bool | None = None, + headers: dict[str, str] | None = None, + ): + """ + Performs a Slack Incoming Webhook request with given text. + + :param text: The text message. + :param unfurl_links: Option to indicate whether text url should unfurl. + :param unfurl_media: Option to indicate whether media url should unfurl. + :param headers: Request headers for this request. + """ + return self.send(text=text, unfurl_links=unfurl_links, unfurl_media=unfurl_media, headers=headers) + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Returns dictionary of widgets to be added for the hook to handle extra values.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import IntegerField, StringField + + return { + prefixed_extra_field("timeout", cls.conn_type): IntegerField( + lazy_gettext("Timeout"), + widget=BS3TextFieldWidget(), + description="Optional. The maximum number of seconds the client will wait to connect " + "and receive a response from Slack Incoming Webhook.", + ), + prefixed_extra_field("proxy", cls.conn_type): StringField( + lazy_gettext('Proxy'), + widget=BS3TextFieldWidget(), + description="Optional. Proxy to make the Slack Incoming Webhook call.", + ), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Returns custom field behaviour.""" + return { + "hidden_fields": ["login", "port", "extra"], + "relabeling": { + "host": "Slack Webhook Endpoint", + "password": "Webhook Token", + }, + "placeholders": { + "schema": "https", + "host": "hooks.slack.com/services", + "password": "T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX", + prefixed_extra_field("timeout", cls.conn_type): "30", + prefixed_extra_field("proxy", cls.conn_type): "http://localhost:9000", + }, + } def execute(self) -> None: - """Remote Popen (actually execute the slack webhook call)""" - proxies = {} - if self.proxy: - # we only need https proxy for Slack, as the endpoint is https - proxies = {'https': self.proxy} - - slack_message = self._build_slack_message() - self.run( - endpoint=self.webhook_token, - data=slack_message, - headers={'Content-type': 'application/json'}, - extra_options={'proxies': proxies, 'check_response': True}, + """ + Remote Popen (actually execute the slack webhook call). + + .. note:: + This method exist for compatibility with previous version of operator + and expected that Slack Incoming Webhook message constructing from class attributes rather than + pass as method arguments. + """ + warnings.warn( + "`SlackWebhookHook.execute` method deprecated and will be removed in a future releases. " + "Please use `SlackWebhookHook.send` or `SlackWebhookHook.send_dict` or " + "`SlackWebhookHook.send_text` methods instead.", + DeprecationWarning, + stacklevel=2, ) + self.send() diff --git a/airflow/providers/slack/operators/slack_webhook.py b/airflow/providers/slack/operators/slack_webhook.py index ebd51980dc45a..6872772a69736 100644 --- a/airflow/providers/slack/operators/slack_webhook.py +++ b/airflow/providers/slack/operators/slack_webhook.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Sequence +from airflow.compat.functools import cached_property from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook @@ -81,6 +82,7 @@ def __init__( super().__init__(endpoint=webhook_token, **kwargs) self.http_conn_id = http_conn_id self.webhook_token = webhook_token + self.proxy = proxy self.message = message self.attachments = attachments self.blocks = blocks @@ -89,22 +91,27 @@ def __init__( self.icon_emoji = icon_emoji self.icon_url = icon_url self.link_names = link_names - self.proxy = proxy - self.hook: SlackWebhookHook | None = None + + @cached_property + def hook(self) -> SlackWebhookHook: + return SlackWebhookHook( + http_conn_id=self.http_conn_id, + webhook_token=self.webhook_token, + proxy=self.proxy, + ) def execute(self, context: Context) -> None: """Call the SlackWebhookHook to post the provided Slack message""" - self.hook = SlackWebhookHook( - self.http_conn_id, - self.webhook_token, - self.message, - self.attachments, - self.blocks, - self.channel, - self.username, - self.icon_emoji, - self.icon_url, - self.link_names, - self.proxy, + self.hook.send( + text=self.message, + attachments=self.attachments, + blocks=self.blocks, + # Parameters below use for compatibility with previous version of Operator and warn user if it set + # Legacy Integration Parameters + channel=self.channel, + username=self.username, + icon_emoji=self.icon_emoji, + icon_url=self.icon_url, + # Unused Parameters, if not None than warn user + link_names=self.link_names, ) - self.hook.execute() diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py index 97d06dc5653e4..2161899bf8f4e 100644 --- a/airflow/providers/slack/transfers/sql_to_slack.py +++ b/airflow/providers/slack/transfers/sql_to_slack.py @@ -129,14 +129,11 @@ def _render_and_send_slack_message(self, context, df) -> None: slack_hook = self._get_slack_hook() self.log.info('Sending slack message: %s', self.slack_message) - slack_hook.execute() + slack_hook.send(text=self.slack_message, channel=self.slack_channel) def _get_slack_hook(self) -> SlackWebhookHook: return SlackWebhookHook( - http_conn_id=self.slack_conn_id, - message=self.slack_message, - channel=self.slack_channel, - webhook_token=self.slack_webhook_token, + slack_webhook_conn_id=self.slack_conn_id, webhook_token=self.slack_webhook_token ) def render_template_fields(self, context, jinja_env=None) -> None: diff --git a/docs/apache-airflow-providers-slack/connections/slack-incoming-webhook.rst b/docs/apache-airflow-providers-slack/connections/slack-incoming-webhook.rst new file mode 100644 index 0000000000000..6017862721d88 --- /dev/null +++ b/docs/apache-airflow-providers-slack/connections/slack-incoming-webhook.rst @@ -0,0 +1,95 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _howto/connection:slack-incoming-webhook: + +Slack Incoming Webhook Connection +================================= + +The Slack Incoming Webhook connection type enables +`Slack Incoming Webhooks `_ Integrations. + +Authenticating to Slack +----------------------- + +Authenticate to Slack using a `Incoming Webhook URL +`_. + +Default Connection IDs +---------------------- + +.. warning:: + + The :class:`airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook` and community provided operators + not intend to use any Slack Incoming Webhook Connection by default right now. + It might change in the future to ``slack_default``. + +Configuring the Connection +-------------------------- + +Schema + Optional. Http schema, if not specified than **https** is used. + +Slack Webhook Endpoint (Host) + Optional. Reference to slack webhook endpoint, if not specified than **hooks.slack.com/services** is used. + In case if endpoint contain schema, than value from field ``Schema`` ignores. + +Webhook Token (Password) + Specify the Slack Incoming Webhook URL. It might specified as full url like + **https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX** in this case values + from ``Slack Webhook Endpoint (Host)`` and ``Schema`` fields ignores. + Or it might specified as URL path like **T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX** in this case + Slack Incoming Webhook URL will build from this field, ``Schema`` and ``Slack Webhook Endpoint (Host)``. + +Extra + Specify the extra parameters (as json dictionary) that can be used in + `slack_sdk.WebhookClient `_. + All parameters are optional. + + * ``timeout``: The maximum number of seconds the client will wait to connect + and receive a response from Slack Incoming Webhook. + * ``proxy``: Proxy to make the Slack Incoming Webhook call. + +If you are configuring the connection via a URI, ensure that all components of the URI are URL-encoded. + +Examples +-------- + +**Snippet for create Connection as URI**: + .. code-block:: python + + from airflow.models.connection import Connection + + conn = Connection( + conn_id="slack_default", + conn_type="slackwebhook", + password="T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX", + extra={ + # Specify extra parameters here + "timeout": "42", + }, + ) + + # Generate Environment Variable Name + env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}" + print(f"{env_key}='{conn.get_uri()}'") + +**Set Slack API Connection as Environment Variable (URI)** + .. code-block:: bash + + export AIRFLOW_CONN_SLACK_DEFAULT='slackwebhook://:T00000000%2FB00000000%2FXXXXXXXXXXXXXXXXXXXXXXXX@/?timeout=42' diff --git a/docs/apache-airflow-providers-slack/connections/slack.rst b/docs/apache-airflow-providers-slack/connections/slack.rst index c07ea2191f854..d4953fac1c489 100644 --- a/docs/apache-airflow-providers-slack/connections/slack.rst +++ b/docs/apache-airflow-providers-slack/connections/slack.rst @@ -50,7 +50,7 @@ Extra (optional) * ``timeout``: The maximum number of seconds the client will wait to connect and receive a response from Slack API. * ``base_url``: A string representing the Slack API base URL. - * ``proxy``: Proxy to make the Slack Incoming Webhook call. + * ``proxy``: Proxy to make the Slack API call. If you are configuring the connection via a URI, ensure that all components of the URI are URL-encoded. diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index fa37534f5cb11..06ec96fd3535f 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1566,6 +1566,7 @@ WebClient webhdfs Webhook webhook +WebhookClient webhooks webpage webProperty diff --git a/tests/providers/slack/hooks/test_slack_webhook.py b/tests/providers/slack/hooks/test_slack_webhook.py index 8081e1d96e524..d56ae15101795 100644 --- a/tests/providers/slack/hooks/test_slack_webhook.py +++ b/tests/providers/slack/hooks/test_slack_webhook.py @@ -15,158 +15,555 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +# + from __future__ import annotations import json -import unittest +from typing import Any from unittest import mock -from requests.exceptions import MissingSchema - -from airflow.models import Connection -from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook -from airflow.utils import db - - -class TestSlackWebhookHook(unittest.TestCase): - - _config = { - 'http_conn_id': 'slack-webhook-default', - 'webhook_token': 'manual_token', - 'message': 'Awesome message to put on Slack', - 'attachments': [{'fallback': 'Required plain-text summary'}], - 'blocks': [{'type': 'section', 'text': {'type': 'mrkdwn', 'text': '*bold text*'}}], - 'channel': '#general', - 'username': 'SlackMcSlackFace', - 'icon_emoji': ':hankey:', - 'icon_url': 'https://airflow.apache.org/_images/pin_large.png', - 'link_names': True, - 'proxy': 'https://my-horrible-proxy.proxyist.com:8080', - } - expected_message_dict = { - 'channel': _config['channel'], - 'username': _config['username'], - 'icon_emoji': _config['icon_emoji'], - 'icon_url': _config['icon_url'], - 'link_names': 1, - 'attachments': _config['attachments'], - 'blocks': _config['blocks'], - 'text': _config['message'], - } - expected_message = json.dumps(expected_message_dict) - expected_url = 'https://hooks.slack.com/services/T000/B000/XXX' - expected_method = 'POST' - - def setUp(self): - db.merge_conn( - Connection( - conn_id='slack-webhook-default', - conn_type='slackwebhook', - extra='{"webhook_token": "your_token_here"}', - ) - ) - db.merge_conn( - Connection( - conn_id='slack-webhook-url', - conn_type='slackwebhook', - host='https://hooks.slack.com/services/T000/B000/XXX', - ) +import pytest +from slack_sdk.http_retry.builtin_handlers import ConnectionErrorRetryHandler, RateLimitErrorRetryHandler +from slack_sdk.webhook.webhook_response import WebhookResponse + +from airflow.exceptions import AirflowException +from airflow.models.connection import Connection +from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook, check_webhook_response + +TEST_TOKEN = "T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX" +TEST_WEBHOOK_URL = f"https://hooks.slack.com/services/{TEST_TOKEN}" +TEST_CUSTOM_SCHEMA = "http" +TEST_CUSTOM_ENDPOINT = "example.org/slack/webhooks" +TEST_CUSTOM_WEBHOOK_URL = f"{TEST_CUSTOM_SCHEMA}://{TEST_CUSTOM_ENDPOINT}/{TEST_TOKEN}" +TEST_CONN_ID = SlackWebhookHook.default_conn_name +CONN_TYPE = "slackwebhook" +TEST_CONN_ERROR_RETRY_HANDLER = ConnectionErrorRetryHandler(max_retry_count=42) +TEST_RATE_LIMIT_RETRY_HANDLER = RateLimitErrorRetryHandler() +MOCK_WEBHOOK_RESPONSE = WebhookResponse(url="foo://bar", status_code=200, body="ok", headers={}) + + +@pytest.fixture(scope="module", autouse=True) +def slack_webhook_connections(): + """Create tests connections.""" + connections = [ + Connection( + conn_id=SlackWebhookHook.default_conn_name, + conn_type=CONN_TYPE, + password=TEST_TOKEN, + ), + Connection( + conn_id="conn_full_url_connection", + conn_type=CONN_TYPE, + password=TEST_WEBHOOK_URL, + ), + Connection( + conn_id="conn_full_url_connection_with_host", + conn_type=CONN_TYPE, + host="http://example.org/hooks/", + password=TEST_WEBHOOK_URL, + ), + Connection( + conn_id="conn_host_with_schema", + conn_type=CONN_TYPE, + host="https://hooks.slack.com/services/", + password=f"/{TEST_TOKEN}", + ), + Connection( + conn_id="conn_host_without_schema", + conn_type=CONN_TYPE, + host="hooks.slack.com/services/", + password=f"/{TEST_TOKEN}", + ), + Connection( + conn_id="conn_parts", + conn_type=CONN_TYPE, + host="hooks.slack.com/services", + schema="https", + password=f"/{TEST_TOKEN}", + ), + Connection( + conn_id="conn_deprecated_extra", + conn_type=CONN_TYPE, + host="https://hooks.slack.com/services/", + extra={"webhook_token": TEST_TOKEN}, + ), + Connection(conn_id="conn_token_in_host_1", conn_type=CONN_TYPE, host=TEST_WEBHOOK_URL), + Connection( + conn_id="conn_token_in_host_2", + conn_type=CONN_TYPE, + schema="https", + host="hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX", + ), + Connection( + conn_id="conn_custom_endpoint_1", + conn_type=CONN_TYPE, + schema=TEST_CUSTOM_SCHEMA, + host=TEST_CUSTOM_ENDPOINT, + password=TEST_TOKEN, + ), + Connection( + conn_id="conn_custom_endpoint_2", + conn_type=CONN_TYPE, + host=f"{TEST_CUSTOM_SCHEMA}://{TEST_CUSTOM_ENDPOINT}", + password=TEST_TOKEN, + ), + Connection( + conn_id="conn_custom_endpoint_3", + conn_type=CONN_TYPE, + password=TEST_CUSTOM_WEBHOOK_URL, + ), + Connection( + conn_id="conn_empty", + conn_type=CONN_TYPE, + ), + Connection( + conn_id="conn_password_empty_1", + conn_type=CONN_TYPE, + host="https://hooks.slack.com/services/", + ), + Connection( + conn_id="conn_password_empty_2", + conn_type=CONN_TYPE, + schema="http", + host="some.netloc", + ), + ] + + conn_uris = {f"AIRFLOW_CONN_{c.conn_id.upper()}": c.get_uri() for c in connections} + + with mock.patch.dict("os.environ", values=conn_uris): + yield + + +class TestCheckWebhookResponseDecorator: + def test_ok_response(self): + """Test OK response.""" + + @check_webhook_response + def decorated(): + return MOCK_WEBHOOK_RESPONSE + + assert decorated() is MOCK_WEBHOOK_RESPONSE + + @pytest.mark.parametrize( + "status_code,body", + [ + (400, "invalid_payload"), + (403, "action_prohibited"), + (404, "channel_not_found"), + (410, "channel_is_archived"), + (500, "rollup_error"), + (418, "i_am_teapot"), + ], + ) + def test_error_response(self, status_code, body): + """Test error response.""" + test_response = WebhookResponse(url="foo://bar", status_code=status_code, body=body, headers={}) + + @check_webhook_response + def decorated(): + return test_response + + error_message = fr"Response body: '{body}', Status Code: {status_code}\." + with pytest.raises(AirflowException, match=error_message): + assert decorated() + + +class TestSlackWebhookHook: + def test_no_credentials(self): + """Test missing credentials.""" + error_message = r"Either `slack_webhook_conn_id` or `webhook_token` should be provided\." + with pytest.raises(AirflowException, match=error_message): + SlackWebhookHook(slack_webhook_conn_id=None, webhook_token=None) + + @mock.patch("airflow.providers.slack.hooks.slack_webhook.mask_secret") + def test_webhook_token(self, mock_mask_secret): + webhook_token = "test-value" + warning_message = ( + r"Provide `webhook_token` as hook argument deprecated by security reason and will be removed " + r"in a future releases. Please specify it in `Slack Webhook` connection\." ) - db.merge_conn( - Connection( - conn_id='slack-webhook-host', - conn_type='slackwebhook', - host='https://hooks.slack.com/services/T000/', + with pytest.warns(DeprecationWarning, match=warning_message): + SlackWebhookHook(webhook_token=webhook_token) + mock_mask_secret.assert_called_once_with(webhook_token) + + def test_conn_id(self): + """Different conn_id arguments and options.""" + hook = SlackWebhookHook(slack_webhook_conn_id=SlackWebhookHook.default_conn_name, http_conn_id=None) + assert hook.slack_webhook_conn_id == SlackWebhookHook.default_conn_name + assert not hasattr(hook, "http_conn_id") + + hook = SlackWebhookHook(slack_webhook_conn_id=None, http_conn_id=SlackWebhookHook.default_conn_name) + assert hook.slack_webhook_conn_id == SlackWebhookHook.default_conn_name + assert not hasattr(hook, "http_conn_id") + + error_message = "You cannot provide both `slack_webhook_conn_id` and `http_conn_id`." + with pytest.raises(AirflowException, match=error_message): + SlackWebhookHook( + slack_webhook_conn_id=SlackWebhookHook.default_conn_name, + http_conn_id=SlackWebhookHook.default_conn_name, ) + + @pytest.mark.parametrize( + "conn_id", + [ + TEST_CONN_ID, + "conn_full_url_connection", + "conn_full_url_connection_with_host", + "conn_host_with_schema", + "conn_host_without_schema", + "conn_parts", + "conn_token_in_host_1", + "conn_token_in_host_2", + ], + ) + def test_construct_webhook_url(self, conn_id): + """Test valid connections.""" + hook = SlackWebhookHook(slack_webhook_conn_id=conn_id) + conn_params = hook._get_conn_params() + assert "url" in conn_params + assert conn_params["url"] == TEST_WEBHOOK_URL + + @mock.patch("airflow.providers.slack.hooks.slack_webhook.mask_secret") + @pytest.mark.parametrize("conn_id", ["conn_token_in_host_1", "conn_token_in_host_2"]) + def test_construct_webhook_url_deprecated_full_url_in_host(self, mock_mask_secret, conn_id): + """Test deprecated option with full URL in host/schema and empty password.""" + hook = SlackWebhookHook(slack_webhook_conn_id=conn_id) + warning_message = ( + r"Found Slack Webhook Token URL in Connection .* `host` and `password` field is empty\." ) - db.merge_conn( - Connection( - conn_id='slack-webhook-with-password', - conn_type='slackwebhook', - password='your_token_here', - ) + with pytest.warns(DeprecationWarning, match=warning_message): + conn_params = hook._get_conn_params() + mock_mask_secret.assert_called_once_with(mock.ANY) + assert "url" in conn_params + assert conn_params["url"] == TEST_WEBHOOK_URL + + @pytest.mark.parametrize( + "conn_id", ["conn_custom_endpoint_1", "conn_custom_endpoint_2", "conn_custom_endpoint_3"] + ) + def test_construct_webhook_url_with_non_default_host(self, conn_id): + """Test valid connections with endpoint != https://hooks.slack.com/hooks.""" + hook = SlackWebhookHook(slack_webhook_conn_id=conn_id) + conn_params = hook._get_conn_params() + assert "url" in conn_params + assert conn_params["url"] == TEST_CUSTOM_WEBHOOK_URL + + @pytest.mark.parametrize( + "conn_id", + [ + "conn_empty", + "conn_password_empty_1", + "conn_password_empty_2", + ], + ) + def test_no_password_in_connection_field(self, conn_id): + """Test connection which missing password field in connection.""" + hook = SlackWebhookHook(slack_webhook_conn_id=conn_id) + error_message = r"Cannot get token\: No valid Slack token nor valid Connection ID supplied\." + with pytest.raises(AirflowException, match=error_message): + hook._get_conn_params() + + @pytest.mark.parametrize("conn_id", [None, "conn_empty"]) + @pytest.mark.parametrize("token", [TEST_TOKEN, TEST_WEBHOOK_URL, f"/{TEST_TOKEN}"]) + def test_empty_connection_field_with_token(self, conn_id, token): + """Test connections which is empty or not set and valid webhook_token specified.""" + hook = SlackWebhookHook(slack_webhook_conn_id="conn_empty", webhook_token=token) + conn_params = hook._get_conn_params() + assert "url" in conn_params + assert conn_params["url"] == TEST_WEBHOOK_URL + + @pytest.mark.parametrize( + "hook_config,conn_extra,expected", + [ + ( # Test Case: hook config + { + "timeout": 42, + "proxy": "https://hook-proxy:1234", + "retry_handlers": [TEST_CONN_ERROR_RETRY_HANDLER, TEST_RATE_LIMIT_RETRY_HANDLER], + }, + {}, + { + "timeout": 42, + "proxy": "https://hook-proxy:1234", + "retry_handlers": [TEST_CONN_ERROR_RETRY_HANDLER, TEST_RATE_LIMIT_RETRY_HANDLER], + }, + ), + ( # Test Case: connection config + {}, + { + "timeout": 9000, + "proxy": "https://conn-proxy:4321", + }, + { + "timeout": 9000, + "proxy": "https://conn-proxy:4321", + }, + ), + ( # Test Case: Connection from the UI + {}, + { + "extra__slackwebhook__timeout": 9000, + "extra__slackwebhook__proxy": "https://conn-proxy:4321", + }, + { + "timeout": 9000, + "proxy": "https://conn-proxy:4321", + }, + ), + ( # Test Case: Merge configs - hook args overwrite conn config + { + "timeout": 1, + "proxy": "https://hook-proxy:777", + }, + { + "timeout": 9000, + "proxy": "https://conn-proxy:4321", + }, + { + "timeout": 1, + "proxy": "https://hook-proxy:777", + }, + ), + ( # Test Case: Merge configs - resolve config + { + "timeout": 1, + }, + { + "timeout": 9000, + "proxy": "https://conn-proxy:4334", + }, + { + "timeout": 1, + "proxy": "https://conn-proxy:4334", + }, + ), + ( # Test Case: empty configs + {}, + {}, + {}, + ), + ( # Test Case: extra_client_args + {"foo": "bar"}, + {}, + {"foo": "bar"}, + ), + ( # Test Case: ignored not expected connection extra + {}, + {"spam": "egg"}, + {}, + ), + ], + ) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient") + def test_client_configuration( + self, mock_webhook_client_cls, hook_config, conn_extra, expected: dict[str, Any] + ): + """Test read/parse/merge WebhookClient config from connection and hook arguments.""" + expected["url"] = TEST_WEBHOOK_URL + test_conn = Connection( + conn_id="test-slack-incoming-webhook-conn", + conn_type=CONN_TYPE, + password=TEST_WEBHOOK_URL, + extra=conn_extra, ) + test_conn_env = f"AIRFLOW_CONN_{test_conn.conn_id.upper()}" + mock_webhook_client = mock_webhook_client_cls.return_value - def test_get_token_manual_token(self): - # Given - manual_token = 'manual_token_here' - hook = SlackWebhookHook(webhook_token=manual_token) + with mock.patch.dict("os.environ", values={test_conn_env: test_conn.get_uri()}): + hook = SlackWebhookHook(slack_webhook_conn_id=test_conn.conn_id, **hook_config) + expected["logger"] = hook.log + conn_params = hook._get_conn_params() + assert conn_params == expected - # When - webhook_token = hook._get_token(manual_token, None) + client = hook.client + assert client == mock_webhook_client + assert hook.get_conn() == mock_webhook_client + assert hook.get_conn() is client # cached + mock_webhook_client_cls.assert_called_once_with(**expected) - # Then - assert webhook_token == manual_token + @pytest.mark.parametrize("headers", [None, {"User-Agent": "Airflow"}]) + @pytest.mark.parametrize( + "send_body", + [ + {"text": "Test Text"}, + {"text": "Fallback Text", "blocks": ["Dummy Block"]}, + {"text": "Fallback Text", "blocks": ["Dummy Block"], "unfurl_media": True, "unfurl_links": True}, + ], + ) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient") + def test_hook_send_dict(self, mock_webhook_client_cls, send_body, headers): + """Test `SlackWebhookHook.send_dict` method.""" + mock_webhook_client = mock_webhook_client_cls.return_value + mock_webhook_client_send_dict = mock_webhook_client.send_dict + mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE - def test_get_token_conn_id(self): - # Given - conn_id = 'slack-webhook-default' - hook = SlackWebhookHook(http_conn_id=conn_id) - expected_webhook_token = 'your_token_here' + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID) + # Test with regular dictionary + hook.send_dict(body=send_body, headers=headers) + mock_webhook_client_send_dict.assert_called_once_with(send_body, headers=headers) - # When - webhook_token = hook._get_token(None, conn_id) + # Test with JSON-string + mock_webhook_client_send_dict.reset_mock() + hook.send_dict(body=json.dumps(send_body), headers=headers) + mock_webhook_client_send_dict.assert_called_once_with(send_body, headers=headers) - # Then - assert webhook_token == expected_webhook_token + @pytest.mark.parametrize("send_body", [("text", "Test Text"), 42, "null", "42"]) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient") + def test_hook_send_dict_invalid_type(self, mock_webhook_client_cls, send_body): + """Test invalid body type for `SlackWebhookHook.send_dict` method.""" + mock_webhook_client = mock_webhook_client_cls.return_value + mock_webhook_client_send_dict = mock_webhook_client.send_dict + mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE - def test_get_token_conn_id_password(self): - # Given - conn_id = 'slack-webhook-with-password' - hook = SlackWebhookHook(http_conn_id=conn_id) - expected_webhook_token = 'your_token_here' + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID) + with pytest.raises(TypeError, match=r"Body expected dictionary, got .*\."): + hook.send_dict(body=send_body) + assert mock_webhook_client_send_dict.assert_not_called - # When - webhook_token = hook._get_token(None, conn_id) + @pytest.mark.parametrize("json_string", ["{'text': 'Single quotes'}", '{"text": "Missing }"']) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient") + def test_hook_send_dict_invalid_json_string(self, mock_webhook_client_cls, json_string): + """Test invalid JSON-string passed to `SlackWebhookHook.send_dict` method.""" + mock_webhook_client = mock_webhook_client_cls.return_value + mock_webhook_client_send_dict = mock_webhook_client.send_dict + mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE - # Then - assert webhook_token == expected_webhook_token + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID) + error_message = r"Body expected valid JSON string, got .*\. Original error:.*" + with pytest.raises(AirflowException, match=error_message): + hook.send_dict(body=json_string) + assert mock_webhook_client_send_dict.assert_not_called - def test_build_slack_message(self): - # Given - hook = SlackWebhookHook(**self._config) + @pytest.mark.parametrize( + "legacy_attr", + [ + "channel", + "username", + "icon_emoji", + "icon_url", + ], + ) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient") + def test_hook_send_dict_legacy_slack_integration(self, mock_webhook_client_cls, legacy_attr): + """Test `SlackWebhookHook.send_dict` warn users about Legacy Slack Integrations.""" + mock_webhook_client = mock_webhook_client_cls.return_value + mock_webhook_client_send_dict = mock_webhook_client.send_dict + mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE - # When - message = hook._build_slack_message() + legacy_slack_integration_body = {legacy_attr: "test-value"} + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID) + warning_message = ( + r"You cannot override the default channel \(chosen by the user who installed your app\), " + r"username, or icon when you're using Incoming Webhooks to post messages\. " + r"Instead, these values will always inherit from the associated Slack app configuration\. " + r"See: .*\. It is possible to change this values only in " + r"Legacy Slack Integration Incoming Webhook: .*" + ) + with pytest.warns(UserWarning, match=warning_message): + hook.send_dict(body=legacy_slack_integration_body) + mock_webhook_client_send_dict.assert_called_once_with(legacy_slack_integration_body, headers=None) - # Then - assert self.expected_message_dict == json.loads(message) + @pytest.mark.parametrize("headers", [None, {"User-Agent": "Airflow"}]) + @pytest.mark.parametrize( + "send_params", + [ + {"text": "Test Text"}, + {"text": "Fallback Text", "blocks": ["Dummy Block"]}, + {"text": "Fallback Text", "blocks": ["Dummy Block"], "unfurl_media": True, "unfurl_links": True}, + ], + ) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook.send_dict") + def test_hook_send(self, mock_hook_send_dict, send_params, headers): + """Test `SlackWebhookHook.send` method.""" + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID) + hook.send(**send_params, headers=headers) + mock_hook_send_dict.assert_called_once_with(body=send_params, headers=headers) - @mock.patch('requests.Session') - @mock.patch('requests.Request') - def test_url_generated_by_http_conn_id(self, mock_request, mock_session): - hook = SlackWebhookHook(http_conn_id='slack-webhook-url') - try: - hook.execute() - except MissingSchema: - pass - mock_request.assert_called_once_with( - self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY + @pytest.mark.parametrize( + "deprecated_hook_attr", + [ + "message", + "attachments", + "blocks", + "channel", + "username", + "icon_emoji", + "icon_url", + ], + ) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook.send_dict") + def test_hook_send_by_hook_attributes(self, mock_hook_send_dict, deprecated_hook_attr): + """Test `SlackWebhookHook.send` with parameters set in hook attributes.""" + send_params = {deprecated_hook_attr: "test-value"} + expected_body = {deprecated_hook_attr if deprecated_hook_attr != "message" else "text": "test-value"} + warning_message = ( + r"Provide .* as hook argument\(s\) is deprecated and will be removed in a future releases\. " + r"Please specify attributes in `SlackWebhookHook\.send` method instead\." ) - mock_request.reset_mock() + with pytest.warns(DeprecationWarning, match=warning_message): + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID, **send_params) + assert getattr(hook, deprecated_hook_attr) == "test-value" + if deprecated_hook_attr == "message": + assert getattr(hook, "text") == "test-value" + # Test ``.send()`` method + hook.send() + mock_hook_send_dict.assert_called_once_with(body=expected_body, headers=None) - @mock.patch('requests.Session') - @mock.patch('requests.Request') - def test_url_generated_by_endpoint(self, mock_request, mock_session): - hook = SlackWebhookHook(webhook_token=self.expected_url) - try: + # Test deprecated ``.execute()`` method + mock_hook_send_dict.reset_mock() + warning_message = ( + "`SlackWebhookHook.execute` method deprecated and will be removed in a future releases. " + "Please use `SlackWebhookHook.send` or `SlackWebhookHook.send_dict` or " + "`SlackWebhookHook.send_text` methods instead." + ) + with pytest.warns(DeprecationWarning, match=warning_message): hook.execute() - except MissingSchema: - pass - mock_request.assert_called_once_with( - self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY + mock_hook_send_dict.assert_called_once_with(body=expected_body, headers=None) + + @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient") + def test_hook_ignored_attributes(self, mock_webhook_client_cls, recwarn): + """Test hook constructor warn users about ignored attributes.""" + mock_webhook_client = mock_webhook_client_cls.return_value + mock_webhook_client_send_dict = mock_webhook_client.send_dict + mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE + + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID, link_names="test-value") + assert len(recwarn) == 2 + assert str(recwarn.pop(UserWarning).message).startswith( + "`link_names` has no affect, if you want to mention user see:" + ) + assert str(recwarn.pop(DeprecationWarning).message).startswith( + "Provide 'link_names' as hook argument(s) is deprecated and will be removed in a future releases." ) - mock_request.reset_mock() + hook.send() + mock_webhook_client_send_dict.assert_called_once_with({}, headers=None) - @mock.patch('requests.Session') - @mock.patch('requests.Request') - def test_url_generated_by_http_conn_id_and_endpoint(self, mock_request, mock_session): - hook = SlackWebhookHook(http_conn_id='slack-webhook-host', webhook_token='B000/XXX') - try: - hook.execute() - except MissingSchema: - pass - mock_request.assert_called_once_with( - self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY + @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient") + def test_hook_send_unexpected_arguments(self, mock_webhook_client_cls, recwarn): + """Test `SlackWebhookHook.send` unexpected attributes.""" + mock_webhook_client = mock_webhook_client_cls.return_value + mock_webhook_client_send_dict = mock_webhook_client.send_dict + mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE + + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID) + warning_message = ( + r"Found unexpected keyword-argument\(s\) 'link_names', 'as_user' " + r"in `send` method\. This argument\(s\) have no effect\." + ) + with pytest.warns(UserWarning, match=warning_message): + hook.send(link_names="foo-bar", as_user="root", text="Awesome!") + + mock_webhook_client_send_dict.assert_called_once_with({"text": "Awesome!"}, headers=None) + + @pytest.mark.parametrize("headers", [None, {"User-Agent": "Airflow"}]) + @pytest.mark.parametrize("unfurl_links", [None, False, True]) + @pytest.mark.parametrize("unfurl_media", [None, False, True]) + @mock.patch("airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook.send") + def test_hook_send_text(self, mock_hook_send, headers, unfurl_links, unfurl_media): + """Test `SlackWebhookHook.send_text` method.""" + hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID) + hook.send_text("Test Text", headers=headers, unfurl_links=unfurl_links, unfurl_media=unfurl_media) + mock_hook_send.assert_called_once_with( + text="Test Text", headers=headers, unfurl_links=unfurl_links, unfurl_media=unfurl_media ) - mock_request.reset_mock() diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py index 5df4df40a446a..b0beed15578d8 100644 --- a/tests/providers/slack/transfers/test_sql_to_slack.py +++ b/tests/providers/slack/transfers/test_sql_to_slack.py @@ -21,7 +21,7 @@ import pandas as pd import pytest -from airflow import AirflowException +from airflow.exceptions import AirflowException from airflow.models import DAG, Connection from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator from airflow.utils import timezone @@ -64,14 +64,15 @@ def test_rendering_and_message_execution(self, mock_slack_hook_class): # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', - message=f'message: 2017-01-01, {test_df}', - channel='#test', + slack_webhook_conn_id='slack_connection', webhook_token=None, ) - # Test that the Slack hook's execute method gets run once - slack_webhook_hook.execute.assert_called_once() + # Test that the `SlackWebhookHook.send` method gets run once + slack_webhook_hook.send.assert_called_once_with( + text=f'message: 2017-01-01, {test_df}', + channel='#test', + ) @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_class): @@ -98,14 +99,15 @@ def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_c # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', - message=f'message: 2017-01-01, {test_df}', - channel='#test', + slack_webhook_conn_id='slack_connection', webhook_token='test_token', ) - # Test that the Slack hook's execute method gets run once - slack_webhook_hook.execute.assert_called_once() + # Test that the `SlackWebhookHook.send` method gets run once + slack_webhook_hook.send.assert_called_once_with( + text=f'message: 2017-01-01, {test_df}', + channel='#test', + ) def test_non_existing_slack_parameters_provided_exception_thrown(self): operator_args = { @@ -141,14 +143,15 @@ def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class) # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', - message=f'message: 2017-01-01, {test_df}', - channel='#test', + slack_webhook_conn_id='slack_connection', webhook_token=None, ) - # Test that the Slack hook's execute method gets run once - slack_webhook_hook.execute.assert_called_once() + # Test that the `SlackWebhookHook.send` method gets run once + slack_webhook_hook.send.assert_called_once_with( + text=f'message: 2017-01-01, {test_df}', + channel='#test', + ) @mock.patch('airflow.providers.common.sql.operators.sql.BaseHook.get_connection') def test_hook_params_building(self, mock_get_conn): diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py index 7dffa8a8f9ed9..bbc02abbde9c8 100644 --- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py +++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py @@ -46,6 +46,7 @@ def _construct_operator(**kwargs): @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') def test_hooks_and_rendering(self, mock_slack_hook_class): + slack_webhook_hook = mock_slack_hook_class.return_value operator_args = { 'snowflake_conn_id': 'snowflake_connection', 'sql': "sql {{ ds }}", @@ -71,10 +72,14 @@ def test_hooks_and_rendering(self, mock_slack_hook_class): # Test that the Slack hook is instantiated with the right parameters mock_slack_hook_class.assert_called_once_with( - message='message: 2017-01-01, 1234', + slack_webhook_conn_id='slack_default', webhook_token='test_token', + ) + + # Test that the `SlackWebhookHook.send` method gets run once + slack_webhook_hook.send.assert_called_once_with( + text='message: 2017-01-01, 1234', channel=None, - http_conn_id='slack_default', ) def test_hook_params_building(self):