Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 163 additions & 23 deletions airflow/providers/slack/transfers/sql_to_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Iterable, Mapping, Sequence

from pandas import DataFrame
Expand All @@ -25,13 +26,59 @@
from airflow.hooks.base import BaseHook
from airflow.models import BaseOperator
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.providers.slack.utils import parse_filename

if TYPE_CHECKING:
from airflow.utils.context import Context


class SqlToSlackOperator(BaseOperator):
class BaseSqlToSlackOperator(BaseOperator):
"""
Operator implements base sql methods for SQL to Slack Transfer operators.

:param sql: The SQL query to be executed
:param sql_conn_id: reference to a specific DB-API Connection.
:param sql_hook_params: Extra config params to be passed to the underlying hook.
Should match the desired hook constructor params.
:param parameters: The parameters to pass to the SQL query.
"""

def __init__(
self,
*,
sql: str,
sql_conn_id: str,
sql_hook_params: dict | None = None,
parameters: Iterable | Mapping | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.sql_conn_id = sql_conn_id
self.sql_hook_params = sql_hook_params
self.sql = sql
self.parameters = parameters

def _get_hook(self) -> DbApiHook:
self.log.debug("Get connection for %s", self.sql_conn_id)
conn = BaseHook.get_connection(self.sql_conn_id)
hook = conn.get_hook(hook_params=self.sql_hook_params)
if not callable(getattr(hook, "get_pandas_df", None)):
raise AirflowException(
"This hook is not supported. The hook class must have get_pandas_df method."
)
return hook

def _get_query_results(self) -> DataFrame:
sql_hook = self._get_hook()

self.log.info("Running SQL query: %s", self.sql)
df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
return df


class SqlToSlackOperator(BaseSqlToSlackOperator):
"""
Executes an SQL statement in a given SQL connection and sends the results to Slack. The results of the
query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called
Expand Down Expand Up @@ -79,12 +126,10 @@ def __init__(
**kwargs,
) -> None:

super().__init__(**kwargs)
super().__init__(
sql=sql, sql_conn_id=sql_conn_id, sql_hook_params=sql_hook_params, parameters=parameters, **kwargs
)

self.sql_conn_id = sql_conn_id
self.sql_hook_params = sql_hook_params
self.sql = sql
self.parameters = parameters
self.slack_conn_id = slack_conn_id
self.slack_webhook_token = slack_webhook_token
self.slack_channel = slack_channel
Expand All @@ -97,23 +142,6 @@ def __init__(
"SqlToSlackOperator requires either a `slack_conn_id` or a `slack_webhook_token` argument"
)

def _get_hook(self) -> DbApiHook:
self.log.debug("Get connection for %s", self.sql_conn_id)
conn = BaseHook.get_connection(self.sql_conn_id)
hook = conn.get_hook(hook_params=self.sql_hook_params)
if not callable(getattr(hook, "get_pandas_df", None)):
raise AirflowException(
"This hook is not supported. The hook class must have get_pandas_df method."
)
return hook

def _get_query_results(self) -> DataFrame:
sql_hook = self._get_hook()

self.log.info("Running SQL query: %s", self.sql)
df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
return df

def _render_and_send_slack_message(self, context, df) -> None:
# Put the dataframe into the context and render the JINJA template fields
context[self.results_df_name] = df
Expand Down Expand Up @@ -157,3 +185,115 @@ def execute(self, context: Context) -> None:
self._render_and_send_slack_message(context, df)

self.log.debug("Finished sending SQL data to Slack")


class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are our options if we want to avoid creating a new operator for files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me just summarise what we have right not, what we use right now and what Slack supports

Send Message in Slack Channel supported by Airflow:

  1. Slack API chat.postMessage method via SlackHook.call method
  2. Slack Incoming Webhook via SlackHook.send_dict, SlackHook.send and SlackHook.send_text methods
  3. Slack Webhook based on Legacy Integration via SlackHook.send_dict, SlackHook.send and SlackHook.send_text methods

Full list for what could be use for send message into Slack Channel

Send File in Slack Channel (or Workspace) supported by Airflow:

  1. Slack API files.upload method via SlackHook.send_file or SlackHook.call methods

Which parameters could be provided for different methods

Slack API Method chat.postMessage (Mainstream)

parameter required scope
token Yes headers
channel Yes dict payload
attachments At least one of attachments blocks text dict payload
blocks At least one of attachments blocks text dict payload
text At least one of attachments blocks text dict payload
as_user No dict payload
icon_emoji No dict payload
icon_url No dict payload
link_names No dict payload
metadata No dict payload
mrkdwn No dict payload
boolean No dict payload
parse No dict payload
reply_broadcast No dict payload
thread_ts No dict payload
unfurl_links No dict payload
unfurl_media No dict payload
username No dict payload

Slack API Method files.upload (Mainstream)

parameter required scope
token Yes headers
channels No dict payload
content No (if file provided) dict payload
file No (if content provided) multipart/form-data
filename No dict payload
filetype No dict payload
initial_comment No dict payload
thread_ts No dict payload
title No dict payload

Slack Incoming Webhook (Mainstream)

There is no information about end list of parameters, due to the code of WebhookClient.send from slack_sdk only this parameters allowed (but not for 100% sure)

parameter required scope
token Yes URL
attachments At least one of attachments blocks text dict payload
blocks At least one of attachments blocks text dict payload
text At least one of attachments blocks text dict payload
response_type No dict payload
replace_original No dict payload
delete_original No dict payload
unfurl_links No dict payload
unfurl_media No dict payload

Slack Webhook based on Legacy Integration (Legacy)

Even less information than Slack Incoming Webhook. Due to investigation this parameters supported

parameter required scope
token Yes URL
channel No dict payload
attachments At least one of attachments blocks text dict payload
blocks At least one of attachments blocks text dict payload
text At least one of attachments blocks text dict payload
icon_emoji No dict payload
icon_url No dict payload
username No dict payload
unfurl_links No dict payload

And this additional parameters might supported

parameter required scope
response_type No dict payload
replace_original No dict payload
delete_original No dict payload
unfurl_media No dict payload

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow! . Quick question then - I am preparing for Provider's release. SHould I merge this one (code looks cool but I guess I need TL;DR; if the current state in this PR is "Releasable" if we merge it). I guess so, but wanted to get the feeling of others involved in the disucssion :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we should add operator just to send file. I understand the challange but I think we should try to mitigate this in the operator itself.

If Slack offers 3 diffrent approches than maybe we should have base class and 3 operators one per approch? Then each operator will be able to levrage the full capabilities of what slack offers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need to rush especially if we do not have agreement how to do it in a better way.

IMHO. For send SQL response as a message we could actually do by three different way without turn into the pain 🤣
most of the major parameters are presented in all 3 APIs requests.

For files situation are different we can use only Slack API and internally it do not have same parameters from different methods. Most close it text from chat.postMessage and initial_comment from files.upload even channel and channels working differently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Taragolis Under the assumption that we do not want to add a new operator for this - what are our options?
I think we are having hard time here since the capabilities are not clear on the hook level.
If slack has 3 different APIs to send message maybe we should have 3 hooks ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially when I started refactor slack provider I've also want to create additional hook but after deeply investigate I've found that Legacy and based on slack API Incoming Webhook has almost the same interface.
Legacy supports additional features which is not available in new one: change icons, username and channels.

And it is impossible (just my personal findings) to determine witch Webhook URL use for Legacy Incoming Webhook and which one for new one - even HTTP responses almost the same. That is why I combine usage in airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook with warnings about parameters which supported by Legacy only

If we want to just send message as a text than we might create some of generic interface because all three methods support blocks and fallback messages.

With send as file (upload) it is quite difficult because it is different method of API with different parameters and it only supported by Slack API not Webhook. And in case of send as file we do not need overwrite render_template_fields for add Jinja filter in runtime

"""
Executes an SQL statement in a given SQL connection and sends the results to Slack API as file.

:param sql: The SQL query to be executed
:param sql_conn_id: reference to a specific DB-API Connection.
:param slack_conn_id: :ref:`Slack API Connection <howto/connection:slack>`.
:param slack_filename: Filename for display in slack.
Should contain supported extension which referenced to ``SUPPORTED_FILE_FORMATS``.
It is also possible to set compression in extension:
``filename.csv.gzip``, ``filename.json.zip``, etc.
:param sql_hook_params: Extra config params to be passed to the underlying hook.
Should match the desired hook constructor params.
:param parameters: The parameters to pass to the SQL query.
:param slack_channels: Comma-separated list of channel names or IDs where the file will be shared.
If omitting this parameter, then file will send to workspace.
:param slack_initial_comment: The message text introducing the file in specified ``slack_channels``.
:param slack_title: Title of file.
:param df_kwargs: Keyword arguments forwarded to ``pandas.DataFrame.to_{format}()`` method.

Example:
.. code-block:: python

SqlToSlackApiFileOperator(
task_id="sql_to_slack",
sql="SELECT 1 a, 2 b, 3 c",
sql_conn_id="sql-connection",
slack_conn_id="slack-api-connection",
slack_filename="awesome.json.gz",
slack_channels="#random,#general",
slack_initial_comment="Awesome load to compressed multiline JSON.",
df_kwargs={
"orient": "records",
"lines": True,
},
)
"""

template_fields: Sequence[str] = (
"sql",
"slack_channels",
"slack_filename",
"slack_initial_comment",
"slack_title",
)
template_ext: Sequence[str] = (".sql", ".jinja", ".j2")
template_fields_renderers = {"sql": "sql", "slack_message": "jinja"}

SUPPORTED_FILE_FORMATS: Sequence[str] = ("csv", "json", "html")

def __init__(
self,
*,
sql: str,
sql_conn_id: str,
sql_hook_params: dict | None = None,
parameters: Iterable | Mapping | None = None,
slack_conn_id: str,
slack_filename: str,
slack_channels: str | Sequence[str] | None = None,
slack_initial_comment: str | None = None,
slack_title: str | None = None,
df_kwargs: dict | None = None,
**kwargs,
):
super().__init__(
sql=sql, sql_conn_id=sql_conn_id, sql_hook_params=sql_hook_params, parameters=parameters, **kwargs
)
self.slack_conn_id = slack_conn_id
self.slack_filename = slack_filename
self.slack_channels = slack_channels
self.slack_initial_comment = slack_initial_comment
self.slack_title = slack_title
self.df_kwargs = df_kwargs or {}

def execute(self, context: Context) -> None:
# Parse file format from filename
output_file_format, _ = parse_filename(
filename=self.slack_filename,
supported_file_formats=self.SUPPORTED_FILE_FORMATS,
)

slack_hook = SlackHook(slack_conn_id=self.slack_conn_id)
with NamedTemporaryFile(mode="w+", suffix=f"_{self.slack_filename}") as fp:
# tempfile.NamedTemporaryFile used only for create and remove temporary file,
# pandas will open file in correct mode itself depend on file type.
# So we close file descriptor here for avoid incidentally write anything.
fp.close()

output_file_name = fp.name
output_file_format = output_file_format.upper()
df_result = self._get_query_results()
if output_file_format == "CSV":
df_result.to_csv(output_file_name, **self.df_kwargs)
elif output_file_format == "JSON":
df_result.to_json(output_file_name, **self.df_kwargs)
elif output_file_format == "HTML":
df_result.to_html(output_file_name, **self.df_kwargs)
else:
# Not expected that this error happen. This only possible
# if SUPPORTED_FILE_FORMATS extended and no actual implementation for specific format.
raise AirflowException(f"Unexpected output file format: {output_file_format}")

slack_hook.send_file(
channels=self.slack_channels,
file=output_file_name,
filename=self.slack_filename,
initial_comment=self.slack_initial_comment,
title=self.slack_title,
)
40 changes: 39 additions & 1 deletion airflow/providers/slack/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

import warnings
from typing import Any
from typing import Any, Sequence

from airflow.utils.types import NOTSET

Expand Down Expand Up @@ -77,3 +77,41 @@ def getint(self, field, default: Any = NOTSET) -> Any:
if value != default:
value = int(value)
return value


def parse_filename(
filename: str, supported_file_formats: Sequence[str], fallback: str | None = None
) -> tuple[str, str | None]:
"""
Parse filetype and compression from given filename.
:param filename: filename to parse.
:param supported_file_formats: list of supported file extensions.
:param fallback: fallback to given file format.
:returns: filetype and compression (if specified)
"""
if not filename:
raise ValueError("Expected 'filename' parameter is missing.")
if fallback and fallback not in supported_file_formats:
raise ValueError(f"Invalid fallback value {fallback!r}, expected one of {supported_file_formats}.")

parts = filename.rsplit(".", 2)
try:
if len(parts) == 1:
raise ValueError(f"No file extension specified in filename {filename!r}.")
if parts[-1] in supported_file_formats:
return parts[-1], None
elif len(parts) == 2:
raise ValueError(
f"Unsupported file format {parts[-1]!r}, expected one of {supported_file_formats}."
)
else:
if parts[-2] not in supported_file_formats:
raise ValueError(
f"Unsupported file format '{parts[-2]}.{parts[-1]}', "
f"expected one of {supported_file_formats} with compression extension."
)
return parts[-2], parts[-1]
except ValueError as ex:
if fallback:
return fallback, None
raise ex from None
Loading