Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cosmos/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .local import DbtDocsAzureStorageLocalOperator as DbtDocsAzureStorageOperator
from .local import DbtDocsLocalOperator as DbtDocsOperator
from .local import DbtDocsS3LocalOperator as DbtDocsS3Operator
from .local import DbtFreshnessS3LocalOperator as DbtFreshnessS3Operator
from .local import DbtDocsGCSLocalOperator as DbtDocsGCSOperator
from .local import DbtLSLocalOperator as DbtLSOperator
from .local import DbtRunLocalOperator as DbtRunOperator
Expand All @@ -20,6 +21,7 @@
"DbtDepsOperator",
"DbtDocsOperator",
"DbtDocsS3Operator",
"DbtFreshnessS3Operator",
"DbtDocsAzureStorageOperator",
"DbtDocsGCSOperator",
]
74 changes: 73 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,13 +538,85 @@ class DbtDocsLocalOperator(DbtLocalBaseOperator):

ui_color = "#8194E0"

required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json"]
required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json", "run_results.json"]

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["docs", "generate"]


class DbtFreshnessLocalOperator(DbtLocalBaseOperator):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment: what do you think about renaming DbtFreshnessLocalOperator to DbtSourceFreshnessLocalOperator?

"""
Executes `dbt source freshness` command.
Use the `callback parameter to specify a callback function to run after the command completes.
"""

ui_color = "#8194E0"

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["source", "freshness"]


class DbtFreshnessS3LocalOperator(DbtFreshnessLocalOperator):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here, I wonder if DbtSourceFreshnessS3LocalOperator would be more representative.

"""
Executes `dbt source freshness` command and upload to S3 storage. Returns the S3 path to the generated documentation.

:param aws_conn_id: S3's Airflow connection ID
:param bucket_name: S3's bucket name
:param folder_dir: This can be used to specify under which directory the generated DBT documentation should be
uploaded.
"""

ui_color = "#FF9900"

def __init__(
self,
connection_id: str,
bucket_name: str,
folder_dir: str | None = None,
**kwargs: str,
) -> None:
"Initializes the operator."
self.connection_id = connection_id
self.bucket_name = bucket_name
self.folder_dir = folder_dir

super().__init__(**kwargs)

# override the callback with our own
self.callback = self.upload_to_s3

def upload_to_s3(self, project_dir: str) -> None:
"Uploads the generated documentation to S3."
logger.info(
'Attempting to upload generated docs to S3 using S3Hook("%s")',
self.connection_id,
)

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

target_dir = f"{project_dir}/target"

hook = S3Hook(
self.connection_id,
extra_args={
"ContentType": "text/html",
},
)

logger.info("Uploading %s to %s", "sources.json", f"s3://{self.bucket_name}/sources.json")

key = f"{self.folder_dir}/sources.json" if self.folder_dir else "sources.json"

hook.load_file(
filename=f"{target_dir}/sources.json",
bucket_name=self.bucket_name,
key=key,
replace=True,
)


class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC):
"""
Abstract class for operators that upload the generated documentation to cloud storage.
Expand Down
9 changes: 9 additions & 0 deletions dev/dags/dbt_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from cosmos.operators import (
DbtDocsAzureStorageOperator,
DbtDocsS3Operator,
DbtFreshnessS3Operator,
DbtDocsGCSOperator,
)
from cosmos.profiles import PostgresUserPasswordProfileMapping
Expand Down Expand Up @@ -83,6 +84,14 @@ def which_upload():
bucket_name="cosmos-docs",
)

generate_dbt_freshness_aws = DbtFreshnessS3Operator(
task_id="generate_dbt_freshness_aws",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
connection_id=S3_CONN_ID,
bucket_name="cosmos-docs",
)

generate_dbt_docs_azure = DbtDocsAzureStorageOperator(
task_id="generate_dbt_docs_azure",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/generating-docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Many users choose to generate and serve these docs on a static website. This is
Cosmos offers two pre-built ways of generating and uploading dbt docs and a fallback option to run custom code after the docs are generated:

- :class:`~cosmos.operators.DbtDocsS3Operator`: generates and uploads docs to a S3 bucket.
- :class:`~cosmos.operators.DbtFreshnessS3Operator`: generates and uploads `sources.json <https://docs.getdbt.com/reference/artifacts/sources-json>`_ doc to an S3 bucket
- :class:`~cosmos.operators.DbtDocsAzureStorageOperator`: generates and uploads docs to an Azure Blob Storage.
- :class:`~cosmos.operators.DbtDocsGCSOperator`: generates and uploads docs to a GCS bucket.
- :class:`~cosmos.operators.DbtDocsOperator`: generates docs and runs a custom callback.
Expand Down Expand Up @@ -41,6 +42,15 @@ You can use the :class:`~cosmos.operators.DbtDocsS3Operator` to generate and upl
bucket_name="test_bucket",
)

generate_dbt_freshness_aws = DbtFreshnessS3Operator(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth to add a comment on why someone would like to send the source freshness files to S3, and perhaps a link to your blog post: https://parakeet.solutions/ingest-cosmos-dbt-into-datahub/, so people can have more

task_id="generate_dbt_freshness_aws",
project_dir="path/to/jaffle_shop",
profile_config=profile_config,
# docs-specific arguments
connection_id="test_aws",
bucket_name="test_bucket",
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised we're adding the code manually in this doc code.

WDYT if we added a reference to the example DAG, similar to what we did in:

# [START local_example]

.. literalinclude:: ../../dev/dags/basic_cosmos_dag.py
:language: python
:start-after: [START local_example]
:end-before: [END local_example]

This way, the documentation will be up-to-date if the operator changes without additional effort.

Upload to Azure Blob Storage
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down