Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow_dbt_python/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""Hooks module provides DbtHooks and DbtRemoteHooks."""
"""Hooks module provides DbtHooks and DbtFSHooks."""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Fantastic work in this commit.

32 changes: 16 additions & 16 deletions airflow_dbt_python/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
if TYPE_CHECKING:
from dbt.contracts.results import RunResult

from airflow_dbt_python.hooks.remote import DbtRemoteHook
from airflow_dbt_python.hooks.fs import DbtFSHook
from airflow_dbt_python.hooks.target import DbtConnectionHook
from airflow_dbt_python.utils.configs import BaseConfig
from airflow_dbt_python.utils.url import URLLike

DbtRemoteHooksDict = Dict[Tuple[str, Optional[str]], DbtRemoteHook]
DbtFSHooksDict = Dict[Tuple[str, Optional[str]], DbtFSHook]


class DbtTaskResult(NamedTuple):
Expand Down Expand Up @@ -103,16 +103,16 @@ def get_dbt_target_hook(conn_id: str) -> DbtConnectionHook:
return DbtConnectionHook.get_db_conn_hook(conn_id)

@staticmethod
def get_remote(scheme: str, conn_id: Optional[str]) -> DbtRemoteHook:
"""Get a remote to interact with dbt files.
def get_fs_hook(scheme: str, conn_id: Optional[str]) -> DbtFSHook:
"""Get a fs_hook to interact with dbt files.

RemoteHooks are defined by the scheme we are looking for and an optional
FSHooks are defined by the scheme we are looking for and an optional
connection id if we are looking to interface with any Airflow hook that
uses a connection.
"""
from .remote import get_remote
from .fs import get_fs_hook

return get_remote(scheme, conn_id)
return get_fs_hook(scheme, conn_id)

def download_dbt_profiles(
self,
Expand All @@ -121,13 +121,13 @@ def download_dbt_profiles(
) -> Path:
"""Pull a dbt profiles.yml file from a given profiles_dir.

This operation is delegated to a DbtRemoteHook. An optional connection id is
This operation is delegated to a DbtFSHook. An optional connection id is
supported for remotes that require it.
"""
scheme = urlparse(str(profiles_dir)).scheme
remote = self.get_remote(scheme, self.profiles_conn_id)
fs_hook = self.get_fs_hook(scheme, self.profiles_conn_id)

return remote.download_dbt_profiles(profiles_dir, destination)
return fs_hook.download_dbt_profiles(profiles_dir, destination)

def download_dbt_project(
self,
Expand All @@ -136,13 +136,13 @@ def download_dbt_project(
) -> Path:
"""Pull a dbt project from a given project_dir.

This operation is delegated to a DbtRemoteHook. An optional connection id is
This operation is delegated to a DbtFSHook. An optional connection id is
supported for remotes that require it.
"""
scheme = urlparse(str(project_dir)).scheme
remote = self.get_remote(scheme, self.project_conn_id)
fs_hook = self.get_fs_hook(scheme, self.project_conn_id)

return remote.download_dbt_project(project_dir, destination)
return fs_hook.download_dbt_project(project_dir, destination)

def upload_dbt_project(
self,
Expand All @@ -153,13 +153,13 @@ def upload_dbt_project(
) -> None:
"""Push a dbt project from a given project_dir.

This operation is delegated to a DbtRemoteHook. An optional connection id is
This operation is delegated to a DbtFSHook. An optional connection id is
supported for remotes that require it.
"""
scheme = urlparse(str(destination)).scheme
remote = self.get_remote(scheme, self.project_conn_id)
fs_hook = self.get_fs_hook(scheme, self.project_conn_id)

return remote.upload_dbt_project(
return fs_hook.upload_dbt_project(
project_dir, destination, replace=replace, delete_before=delete_before
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""The DbtRemoteHook interface includes methods for downloading and uploading files.
"""The DbtFSHook interface includes methods for downloading and uploading files.

Internally, DbtRemoteHooks can use Airflow hooks to execute the actual operations.
Internally, DbtFSHooks can use Airflow hooks to execute the actual operations.

Currently, only AWS S3 and the local filesystem are supported as remotes.
"""
Expand All @@ -17,7 +17,7 @@
StrPath = str


class DbtRemoteHook(ABC, LoggingMixin):
class DbtFSHook(ABC, LoggingMixin):
"""Represents a dbt project storing any dbt files.

A concrete backend class should implement the push and pull methods to fetch one
Expand Down Expand Up @@ -141,33 +141,33 @@ def upload_dbt_project(


@cache
def get_remote(scheme: str, conn_id: Optional[str] = None) -> DbtRemoteHook:
"""Get a DbtRemoteHook as long as the scheme is supported.
def get_fs_hook(scheme: str, conn_id: Optional[str] = None) -> DbtFSHook:
"""Get a DbtFSHook as long as the scheme is supported.

In the future we should make our hooks discoverable and package ourselves as a
proper Airflow providers package.
"""
if scheme == "s3":
from .s3 import DbtS3RemoteHook
from .s3 import DbtS3FSHook

remote_cls: Type[DbtRemoteHook] = DbtS3RemoteHook
fs_hook_cls: Type[DbtFSHook] = DbtS3FSHook
elif scheme == "gs":
from .gcs import DbtGCSRemoteHook
from .gcs import DbtGCSFSHook

remote_cls = DbtGCSRemoteHook
fs_hook_cls = DbtGCSFSHook
elif scheme in ("https", "git", "git+ssh", "ssh", "http"):
from .git import DbtGitRemoteHook
from .git import DbtGitFSHook

remote_cls = DbtGitRemoteHook
fs_hook_cls = DbtGitFSHook
elif scheme == "":
from .localfs import DbtLocalFsRemoteHook
from .local import DbtLocalFsHook

remote_cls = DbtLocalFsRemoteHook
fs_hook_cls = DbtLocalFsHook
else:
raise NotImplementedError(f"Backend {scheme} is not supported")

if conn_id is not None:
remote = remote_cls(conn_id)
fs_hook = fs_hook_cls(conn_id)
else:
remote = remote_cls()
return remote
fs_hook = fs_hook_cls()
return fs_hook
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url
from google.cloud.storage import Blob

from airflow_dbt_python.hooks.remote import DbtRemoteHook
from airflow_dbt_python.hooks.fs import DbtFSHook
from airflow_dbt_python.utils.url import URL, URLLike


class DbtGCSRemoteHook(GCSHook, DbtRemoteHook):
class DbtGCSFSHook(GCSHook, DbtFSHook):
"""A dbt remote implementation for GCS.

This concrete remote class implements the DbtRemote interface by using GCS as a
This concrete remote class implements the DbtFs interface by using GCS as a
storage for uploading and downloading dbt files to and from.
The DbtGCSRemoteHook subclasses Airflow's GCSHook to interact with GCS.
The DbtGCSFSHook subclasses Airflow's GCSHook to interact with GCS.
A connection id may be passed to set the connection to use with GCS.
"""

Expand Down Expand Up @@ -128,7 +128,7 @@ def _download(
destination: A destination URL where to download the objects to. The
existing sub-directory hierarchy in GCS will be preserved.
replace: Indicates whether to replace existing files when downloading.
This flag is kept here to comply with the DbtRemote interface but its
This flag is kept here to comply with the DbtFs interface but its
ignored as files downloaded from GCS always overwrite local files.
delete_before: Delete destination directory before download.
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""A concrete DbtRemoteHook for git repositories with dulwich."""
"""A concrete DbtFSHook for git repositories with dulwich."""

import datetime as dt
from typing import Callable, Optional, Tuple, Union
Expand All @@ -11,7 +11,7 @@
from dulwich.protocol import ZERO_SHA
from dulwich.repo import Repo

from airflow_dbt_python.hooks.remote import DbtRemoteHook
from airflow_dbt_python.hooks.fs import DbtFSHook
from airflow_dbt_python.utils.url import URL

GitClients = Union[HttpGitClient, SSHGitClient, TCPGitClient]
Expand All @@ -22,13 +22,13 @@ def no_filter(_: URL) -> bool:
return True


class DbtGitRemoteHook(SSHHook, DbtRemoteHook):
class DbtGitFSHook(SSHHook, DbtFSHook):
"""A dbt remote implementation for git repositories.

This concrete remote class implements the DbtRemote interface by using any git
This concrete remote class implements the DbtFs interface by using any git
repository to upload and download dbt files to and from.

The DbtGitRemoteHook subclasses Airflow's SSHHook to interact with to utilize its
The DbtGitFSHook subclasses Airflow's SSHHook to interact with to utilize its
defined methods to operate with SSH connections. However, SSH connections are not
the only ones supported for interacting with git repositories: HTTP (http:// or
https://) and plain TCP (git://) may be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@

from airflow.hooks.filesystem import FSHook

from airflow_dbt_python.hooks.remote import DbtRemoteHook
from airflow_dbt_python.hooks.fs import DbtFSHook
from airflow_dbt_python.utils.url import URL


class DbtLocalFsRemoteHook(FSHook, DbtRemoteHook):
"""A concrete dbt remote for a local filesystem.
class DbtLocalFsHook(FSHook, DbtFSHook):
"""A concrete dbt hook for a local filesystem.

This remote is intended to be used when running Airflow with a LocalExecutor, and
This hook is intended to be used when running Airflow with a LocalExecutor, and
it relies on shutil from the standard library to do all the file manipulation. For
these reasons, running multiple concurrent tasks with this remote may lead to race
conditions if attempting to push files to the remote.
Expand All @@ -28,7 +28,7 @@ class DbtLocalFsRemoteHook(FSHook, DbtRemoteHook):
conn_name_attr = "fs_conn_id"
default_conn_name = "fs_default"
conn_type = "filesystem"
hook_name = "dbt Local Filesystem RemoteHook"
hook_name = "dbt Local Filesystem FSHook"

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from airflow_dbt_python.hooks.remote import DbtRemoteHook
from airflow_dbt_python.hooks.fs import DbtFSHook
from airflow_dbt_python.utils.url import URL, URLLike


class DbtS3RemoteHook(S3Hook, DbtRemoteHook):
class DbtS3FSHook(S3Hook, DbtFSHook):
"""A dbt remote implementation for S3.

This concrete remote class implements the DbtRemote interface by using S3 as a
This concrete remote class implements the DbtFs interface by using S3 as a
storage for uploading and downloading dbt files to and from.
The DbtS3RemoteHook subclasses Airflow's S3Hook to interact with S3. A connection id
The DbtS3FSHook subclasses Airflow's S3Hook to interact with S3. A connection id
may be passed to set the connection to use with S3.
"""

Expand Down Expand Up @@ -126,7 +126,7 @@ def _download(
destination: A destination URL where to download the objects to. The
existing sub-directory hierarchy in S3 will be preserved.
replace: Indicates whether to replace existing files when downloading.
This flag is kept here to comply with the DbtRemote interface but its
This flag is kept here to comply with the DbtFs interface but its
ignored as files downloaded from S3 always overwrite local files.
delete_before: Delete destination directory before download.
"""
Expand Down
6 changes: 3 additions & 3 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ When Airflow is installed is running on a multi- machine or cloud installation,

For these deployments we must rely on a *dbt* remote to download and, eventually, upload all required *dbt* files. The remote *dbt* URL may be used in place of a local ``project_dir`` or ``profiles_dir`` to have *airflow-dbt-python* download the *dbt* files in the remote into a temporary directory for execution.

Interactions with storages are supported by subclasses of ``DbtRemoteHook``. Read the documentation :ref:`dbt_remote_hooks` to learn more about these hooks.
Interactions with storages are supported by subclasses of ``DbtFSHook``. Read the documentation :ref:`dbt_remote_hooks` to learn more about these hooks.

As an example, let's upload our *dbt* project to an AWS S3 bucket. The files may end up structured in the bucket as:

Expand Down Expand Up @@ -285,7 +285,7 @@ Then, we can alter the previous example DAG to set ``project_dir`` and ``profile
profile="my-project",
)

*airflow-dbt-python* uses the URL scheme (in this example, ``"s3"``) to figure out the type of remote, and the corresponding ``DbtRemoteHook`` to download all required files. An exception would be raised if the scheme does not point to a supported remote.
*airflow-dbt-python* uses the URL scheme (in this example, ``"s3"``) to figure out the type of remote, and the corresponding ``DbtFSHook`` to download all required files. An exception would be raised if the scheme does not point to a supported remote.

*airflow-dbt-python* takes care of adjusting any path-like arguments so that they are pointing to files in a local temporary directory once all the *dbt* files are download from the remote storage.

Expand Down Expand Up @@ -320,7 +320,7 @@ The DAG looks the same as the AWS S3 example, except that now we use the GitHub
profile="my-project",
)

*airflow-dbt-python* can determine this URL requires a ``DbtGitRemoteHook`` by looking at the URL's scheme (``"git+ssh"``). As we are passing an SSH URL, ``DbtGitRemoteHook`` can utilize an Airflow `SSH Connection <https://airflow.apache.org/docs/apache-airflow-providers-ssh/stable/connections/ssh.html>`_ as it subclasses Airflow's ``SSHHook``. This connection type allows us to setup the necessary SSH keys to access GitHub. Of course, as this is a public repository, we could have just used an HTTP URL, but for private repositories an SSH key may be required.
*airflow-dbt-python* can determine this URL requires a ``DbtGitFSHook`` by looking at the URL's scheme (``"git+ssh"``). As we are passing an SSH URL, ``DbtGitFSHook`` can utilize an Airflow `SSH Connection <https://airflow.apache.org/docs/apache-airflow-providers-ssh/stable/connections/ssh.html>`_ as it subclasses Airflow's ``SSHHook``. This connection type allows us to setup the necessary SSH keys to access GitHub. Of course, as this is a public repository, we could have just used an HTTP URL, but for private repositories an SSH key may be required.

.. note::
*airflow-dbt-python* can utilize Airflow Connections to fetch connection details for *dbt* remotes as well as for *dbt* targets (e.g. for your data warehouse). The ``project_conn_id`` and ``profiles_conn_id`` arguments that all *dbt* operators have refer to Airflow Connections to used to fetch *dbt* projects and *profiles.yml* respectively, whereas the ``target`` argument can point to an Airflow Connection used to setup *dbt* to access your data warehouse.
Expand Down
8 changes: 4 additions & 4 deletions docs/how_does_it_work.dot
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ digraph HowDoesItWork {

DbtHook;
DbtOperator -> DbtHook [label="run_dbt_task"];
DbtRemoteHooks -> DbtHook [label="download"];
DbtHook -> DbtRemoteHooks [label="upload", labelfloat=true];
DbtFSHooks -> DbtHook [label="download"];
DbtHook -> DbtFSHooks [label="upload", labelfloat=true];
}

"dbt-core" [style=filled, fillcolor="#CBCBCB", color="#FF7557"];
Expand All @@ -34,7 +34,7 @@ digraph HowDoesItWork {
{rank=same; split; DbtOperator; }

"Remote storage" [style=filled, fillcolor="#CBCBCB", color="#FF7557"];
DbtRemoteHooks -> "Remote storage" [headlabel="interacts", labeldistance=4.0];
{rank=same; "Remote storage"; DbtRemoteHooks; }
DbtFSHooks -> "Remote storage" [headlabel="interacts", labeldistance=4.0];
{rank=same; "Remote storage"; DbtFSHooks; }

}
12 changes: 6 additions & 6 deletions docs/how_does_it_work.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ How does it work?
To achieve this goal *airflow-dbt-python* provides Airflow operators, hooks, and other utilities. Hooks in particular come in two flavors:

* A ``DbtHook`` that abstracts all interaction with *dbt* internals.
* Subclasses of ``DbtRemoteHook`` that expose an interface to interact with *dbt* remote storages where project files are located (like AWS S3 buckets or git repositories).
* Subclasses of ``DbtFSHook`` that expose an interface to interact with *dbt* remote storages where project files are located (like AWS S3 buckets or git repositories).

.. graphviz:: how_does_it_work.dot

Expand Down Expand Up @@ -53,14 +53,14 @@ In order to respect this constraint, *airflow-dbt-python* hooks run each *dbt* c

This ensures *dbt* can work with any Airflow deployment, including most production deployments running `Remote Executors <https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html#executor-types>`_ that do not guarantee any files will be shared between tasks, since each task may run in a completely different worker.

.. _dbt_remote_hooks:
.. _dbt_fs_hooks:

*dbt* remote hooks
*dbt* filesystem hooks
------------------

*dbt* remote hooks implement a simple interface to communicate with *dbt* remotes. A *dbt* remote can be any external storage that contains a *dbt* project and potentially also a *profiles.yml* file for example: an AWS S3 bucket, a Google Cloud Storage or a GitHub repository. See the reference for a list of which remotes are currently supported.
*dbt* fs hooks implement a simple interface to communicate with *dbt* remotes. A *dbt* fs can be any external storage that contains a *dbt* project and potentially also a *profiles.yml* file for example: an AWS S3 bucket, a Google Cloud Storage or a GitHub repository. See the reference for a list of which remotes are currently supported.

Implementing the ``DbtRemoteHook`` interface
Implementing the ``DbtFSHook`` interface
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Supporting a new remote to store *dbt* files requires implementing the ``DbtRemoteHook`` interface. There are only two methods in the interface: ``DbtRemoteHook.download`` and ``DbtRemoteHook.upload``.
Supporting a new fs to store *dbt* files requires implementing the ``DbtFSHook`` interface. There are only two methods in the interface: ``DbtFSHook.download`` and ``DbtFSHook.upload``.
2 changes: 1 addition & 1 deletion docs/reference/hooks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Core *dbt* hooks
Supported *dbt* remote hooks
----------------------------

The DbtRemoteHook interface
The DbtFSHook interface
^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. automodule:: airflow_dbt_python.hooks.remote
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,14 @@ def mocked_gcs_client():
@pytest.fixture
def gcs_hook(gcp_conn_id):
"""Provide an GCS for testing."""
from airflow_dbt_python.hooks.remote.gcs import DbtGCSRemoteHook
from airflow_dbt_python.hooks.fs.gcs import DbtGCSFSHook

with patch(
"airflow.providers.google.cloud.hooks.gcs.GCSHook.get_credentials_and_project_id",
lambda x: ({}, "test-project"),
):
with patch("google.cloud.storage.Client", MockStorageClient):
yield DbtGCSRemoteHook()
yield DbtGCSFSHook()


@pytest.fixture
Expand Down
Loading