Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions airflow_dbt_python/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def __init__(
profiles_conn_id: Optional[str] = None,
**kwargs,
):
self.remotes: DbtRemoteHooksDict = {}
self.dbt_conn_id = dbt_conn_id
self.project_conn_id = project_conn_id
self.profiles_conn_id = profiles_conn_id
Expand All @@ -150,12 +149,7 @@ def get_remote(self, scheme: str, conn_id: Optional[str]) -> DbtRemoteHook:
"""
from .remote import get_remote

try:
return self.remotes[(scheme, conn_id)]
except KeyError:
remote = get_remote(scheme, conn_id)
self.remotes[(scheme, conn_id)] = remote
return remote
return get_remote(scheme, conn_id)

def download_dbt_profiles(
self,
Expand All @@ -168,7 +162,7 @@ def download_dbt_profiles(
supported for remotes that require it.
"""
scheme = urlparse(str(profiles_dir)).scheme
remote = self.get_remote(scheme, self.project_conn_id)
remote = self.get_remote(scheme, self.profiles_conn_id)
Copy link
Owner

@tomasfarias tomasfarias Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Makes sense 👍, thanks for the bugfix.


return remote.download_dbt_profiles(profiles_dir, destination)

Expand Down Expand Up @@ -254,7 +248,7 @@ def run_dbt_task(
)
requires_profile = isinstance(task, (CleanTask, DepsTask))

self.setup_dbt_logging(task, config.debug)
self.setup_dbt_logging(config.debug)

if runtime_config is not None and not requires_profile:
# The deps command installs the dependencies, which means they
Expand Down Expand Up @@ -373,25 +367,24 @@ def prepare_directory(
project_dir,
tmp_dir,
)
new_project_dir = str(project_dir_path) + "/"

if (project_dir_path / "profiles.yml").exists():
# We may have downloaded the profiles.yml file together
# with the project.
return new_project_dir, new_project_dir

if profiles_dir is not None:
profiles_file_path = self.download_dbt_profiles(
profiles_dir,
tmp_dir,
)
new_profiles_dir = str(profiles_file_path.parent) + "/"
profiles_dir_path = profiles_file_path.parent
elif (project_dir_path / "profiles.yml").exists():
profiles_dir_path = project_dir_path
else:
new_profiles_dir = None
profiles_dir_path = None

return new_project_dir, new_profiles_dir
return (
str(project_dir_path) + os.sep,
str(profiles_dir_path) + os.sep if profiles_dir_path is not None else None,
Copy link
Owner

@tomasfarias tomasfarias Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: This adding a separator thing may come back to bite us (it bit me many times before). Nothing is changing here in this PR as we were still doing the same thing before, so this is a good fix, just making a note for the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember, I think I tried to delete it, but ran into the error that dbt mistook it for a file and not a directory. I'll check it out

)

def setup_dbt_logging(self, task: BaseTask, debug: Optional[bool]):
def setup_dbt_logging(self, debug: Optional[bool]):
"""Setup dbt logging.

Starting with dbt v1, dbt initializes two loggers: default_file and
Expand All @@ -407,6 +400,7 @@ def setup_dbt_logging(self, task: BaseTask, debug: Optional[bool]):
configured_file = logging.getLogger("configured_file")
file_log = logging.getLogger("file_log")
stdout_log = logging.getLogger("stdout_log")
stdout_log.handlers.clear()
Copy link
Owner

@tomasfarias tomasfarias Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Not much to say, great work 👍

stdout_log.propagate = True

if not debug:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

from abc import ABC, abstractmethod
from functools import cache
from pathlib import Path
from typing import Optional, Type

Expand All @@ -32,7 +33,7 @@ class DbtRemoteHook(ABC, LoggingMixin):
"""

@abstractmethod
def download(
def _download(
Copy link
Owner

@tomasfarias tomasfarias Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Any reason for adding an underscore prefix to all these methods? I know some linters could report this as a private method when calling it, but beyond that nothing is really enforcing the privacy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods started to overlap with methods in GCSHook.
So I had a choice: rename them or add a prefix. I chose the latter since they are not used as public.

self,
source: URL,
destination: URL,
Expand All @@ -43,7 +44,7 @@ def download(
return NotImplemented

@abstractmethod
def upload(
def _upload(
self,
source: URL,
destination: URL,
Expand Down Expand Up @@ -71,7 +72,7 @@ def download_dbt_project(self, source: URLLike, destination: URLLike) -> Path:
if source_url.is_archive():
destination_url = destination_url / source_url.name

self.download(source_url, destination_url)
self._download(source_url, destination_url)

if destination_url.exists() and destination_url.is_archive():
destination_url.extract()
Expand Down Expand Up @@ -103,7 +104,7 @@ def download_dbt_profiles(self, source: URLLike, destination: URLLike) -> Path:
if destination_url.is_dir() or destination_url.name != "profiles.yml":
destination_url = destination_url / "profiles.yml"

self.download(source_url, destination_url)
self._download(source_url, destination_url)

return destination_url.path

Expand Down Expand Up @@ -133,12 +134,13 @@ def upload_dbt_project(
source_url.archive(zip_url)
source_url = zip_url

self.upload(source_url, destination_url, replace, delete_before)
self._upload(source_url, destination_url, replace, delete_before)

if destination_url.is_archive():
source_url.unlink()


@cache
def get_remote(scheme: str, conn_id: Optional[str] = None) -> DbtRemoteHook:
"""Get a DbtRemoteHook as long as the scheme is supported.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(
**kwargs,
)

def upload(
def _upload(
self,
source: URL,
destination: URL,
Expand Down Expand Up @@ -130,7 +130,7 @@ def update_refs(refs):
generate_pack_data=repo.generate_pack_data,
)

def download(
def _download(
self,
source: URL,
destination: URL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from __future__ import annotations

import shutil
import sys
from functools import partial
from pathlib import Path
from typing import Optional
Expand Down Expand Up @@ -53,7 +52,7 @@ def get_url(self, url: Optional[URL]) -> URL:

return URL(self.basepath) / url

def download(
def _download(
self,
source: URL,
destination: URL,
Expand All @@ -71,7 +70,7 @@ def download(
else:
self.copy_one(source, destination, replace)

def upload(
def _upload(
self,
source: URL,
destination: URL,
Expand Down Expand Up @@ -127,33 +126,6 @@ def copy(

copy_function = partial(self.copy_one, replace=replace)

if sys.version_info.major == 3 and sys.version_info.minor < 8:
py37_copytree(source, destination, replace)
else:
shutil.copytree( # type: ignore
source, destination, copy_function=copy_function, dirs_exist_ok=True
)


def py37_copytree(source: URL, destination: URL, replace: bool = True):
"""A (probably) poor attempt at replicating shutil.copytree for Python 3.7.

shutil.copytree is available in Python 3.7, however it doesn't have the
dirs_exist_ok parameter, and we really need that. If the destination path doesn't
exist, we can use shutil.copytree, however if it does then we need to copy files
one by one and make any subdirectories ourselves.
"""
if destination.exists():
for url in source:
if url.is_dir():
continue

target_url = destination / url.relative_to(source)
if target_url.exists() and not replace:
# shutil.copy replaces by default
continue

target_url.parent.mkdir(exist_ok=True, parents=True)
shutil.copy(url, target_url)
else:
shutil.copytree(source, destination)
shutil.copytree( # type: ignore
source, destination, copy_function=copy_function, dirs_exist_ok=True
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, *args, **kwargs):
"""Initialize a dbt remote for AWS S3."""
super().__init__(*args, **kwargs)

def upload(
def _upload(
self,
source: URL,
destination: URL,
Expand Down Expand Up @@ -110,7 +110,7 @@ def load_file_handle_replace_error(

return success

def download(
def _download(
self,
source: URL,
destination: URL,
Expand Down Expand Up @@ -188,7 +188,7 @@ def download_s3_object(

except IsADirectoryError:
# Uploading files manually via the AWS UI to S3 can cause files
# with empty names to appear. When we attemp to download it, we build
# with empty names to appear. When we attempt to download it, we build
# a relative path that is equal to the parent directory that already
# exists.
self.log.warning("A file with no name was found in S3 at %s", s3_object)
14 changes: 11 additions & 3 deletions airflow_dbt_python/utils/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,28 @@ class BaseConfig:
default=None, repr=False
)

require_resource_names_without_spaces: bool = False

add_package: Optional[Package] = None
dry_run: bool = False
lock: bool = False
static: bool = False
upgrade: bool = False

require_model_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
exclude_resource_types: list[str] = dataclasses.field(
default_factory=list, repr=False
)

# legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md
Copy link
Owner

@tomasfarias tomasfarias Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: neat 👍

require_batched_execution_for_custom_microbatch_strategy: bool = False
require_explicit_package_overrides_for_builtin_materializations: bool = True
require_resource_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
skip_nodes_if_on_run_start_fails: bool = False
state_modified_compare_more_unrendered_values: bool = False
state_modified_compare_vars: bool = False
require_yaml_configuration_for_mf_time_spines: bool = False
require_nested_cumulative_type_params: bool = False

def __post_init__(self):
"""Post initialization actions for a dbt configuration."""
self.vars = parse_yaml_args(self.vars)
Expand Down
6 changes: 3 additions & 3 deletions docs/reference/hooks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ The DbtRemoteHook interface
*dbt* git remote
^^^^^^^^^^^^^^^^

.. automodule:: airflow_dbt_python.hooks.git
.. automodule:: airflow_dbt_python.hooks.remote.git
:members:

*dbt* localfs remote
^^^^^^^^^^^^^^^^^^^^

.. automodule:: airflow_dbt_python.hooks.localfs
.. automodule:: airflow_dbt_python.hooks.remote.localfs
Copy link
Owner

@tomasfarias tomasfarias Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue(non-blocking): It is very contradictory to read remote.local, maybe we could rename this one to just fs (or filesystem if you prefer to be more verbose)? Could also be left as work for later (I can write an issue for that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favor of renaming it to fs

:members:

*dbt* S3 remote
^^^^^^^^^^^^^^^^

.. automodule:: airflow_dbt_python.hooks.s3
.. automodule:: airflow_dbt_python.hooks.remote.s3
:members:
Loading