diff --git a/cosmos/operators/__init__.py b/cosmos/operators/__init__.py index b7e36abff5..a220304d55 100644 --- a/cosmos/operators/__init__.py +++ b/cosmos/operators/__init__.py @@ -2,6 +2,7 @@ from .local import DbtDocsAzureStorageLocalOperator as DbtDocsAzureStorageOperator from .local import DbtDocsLocalOperator as DbtDocsOperator from .local import DbtDocsS3LocalOperator as DbtDocsS3Operator +from .local import DbtFreshnessS3LocalOperator as DbtFreshnessS3Operator from .local import DbtDocsGCSLocalOperator as DbtDocsGCSOperator from .local import DbtLSLocalOperator as DbtLSOperator from .local import DbtRunLocalOperator as DbtRunOperator @@ -20,6 +21,7 @@ "DbtDepsOperator", "DbtDocsOperator", "DbtDocsS3Operator", + "DbtFreshnessS3Operator", "DbtDocsAzureStorageOperator", "DbtDocsGCSOperator", ] diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 1c00f476c8..0b132134d4 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -538,13 +538,85 @@ class DbtDocsLocalOperator(DbtLocalBaseOperator): ui_color = "#8194E0" - required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json"] + required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json", "run_results.json"] def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["docs", "generate"] +class DbtFreshnessLocalOperator(DbtLocalBaseOperator): + """ + Executes `dbt source freshness` command. + Use the `callback parameter to specify a callback function to run after the command completes. + """ + + ui_color = "#8194E0" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self.base_cmd = ["source", "freshness"] + + +class DbtFreshnessS3LocalOperator(DbtFreshnessLocalOperator): + """ + Executes `dbt source freshness` command and upload to S3 storage. Returns the S3 path to the generated documentation. + + :param aws_conn_id: S3's Airflow connection ID + :param bucket_name: S3's bucket name + :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be + uploaded. + """ + + ui_color = "#FF9900" + + def __init__( + self, + connection_id: str, + bucket_name: str, + folder_dir: str | None = None, + **kwargs: str, + ) -> None: + "Initializes the operator." + self.connection_id = connection_id + self.bucket_name = bucket_name + self.folder_dir = folder_dir + + super().__init__(**kwargs) + + # override the callback with our own + self.callback = self.upload_to_s3 + + def upload_to_s3(self, project_dir: str) -> None: + "Uploads the generated documentation to S3." + logger.info( + 'Attempting to upload generated docs to S3 using S3Hook("%s")', + self.connection_id, + ) + + from airflow.providers.amazon.aws.hooks.s3 import S3Hook + + target_dir = f"{project_dir}/target" + + hook = S3Hook( + self.connection_id, + extra_args={ + "ContentType": "text/html", + }, + ) + + logger.info("Uploading %s to %s", "sources.json", f"s3://{self.bucket_name}/sources.json") + + key = f"{self.folder_dir}/sources.json" if self.folder_dir else "sources.json" + + hook.load_file( + filename=f"{target_dir}/sources.json", + bucket_name=self.bucket_name, + key=key, + replace=True, + ) + + class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC): """ Abstract class for operators that upload the generated documentation to cloud storage. diff --git a/dev/dags/dbt_docs.py b/dev/dags/dbt_docs.py index edf89bdab6..42a4eb39d3 100644 --- a/dev/dags/dbt_docs.py +++ b/dev/dags/dbt_docs.py @@ -20,6 +20,7 @@ from cosmos.operators import ( DbtDocsAzureStorageOperator, DbtDocsS3Operator, + DbtFreshnessS3Operator, DbtDocsGCSOperator, ) from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -83,6 +84,14 @@ def which_upload(): bucket_name="cosmos-docs", ) + generate_dbt_freshness_aws = DbtFreshnessS3Operator( + task_id="generate_dbt_freshness_aws", + project_dir=DBT_ROOT_PATH / "jaffle_shop", + profile_config=profile_config, + connection_id=S3_CONN_ID, + bucket_name="cosmos-docs", + ) + generate_dbt_docs_azure = DbtDocsAzureStorageOperator( task_id="generate_dbt_docs_azure", project_dir=DBT_ROOT_PATH / "jaffle_shop", diff --git a/docs/configuration/generating-docs.rst b/docs/configuration/generating-docs.rst index 88459fd14e..73743f65db 100644 --- a/docs/configuration/generating-docs.rst +++ b/docs/configuration/generating-docs.rst @@ -10,6 +10,7 @@ Many users choose to generate and serve these docs on a static website. This is Cosmos offers two pre-built ways of generating and uploading dbt docs and a fallback option to run custom code after the docs are generated: - :class:`~cosmos.operators.DbtDocsS3Operator`: generates and uploads docs to a S3 bucket. +- :class:`~cosmos.operators.DbtFreshnessS3Operator`: generates and uploads `sources.json `_ doc to an S3 bucket - :class:`~cosmos.operators.DbtDocsAzureStorageOperator`: generates and uploads docs to an Azure Blob Storage. - :class:`~cosmos.operators.DbtDocsGCSOperator`: generates and uploads docs to a GCS bucket. - :class:`~cosmos.operators.DbtDocsOperator`: generates docs and runs a custom callback. @@ -41,6 +42,15 @@ You can use the :class:`~cosmos.operators.DbtDocsS3Operator` to generate and upl bucket_name="test_bucket", ) + generate_dbt_freshness_aws = DbtFreshnessS3Operator( + task_id="generate_dbt_freshness_aws", + project_dir="path/to/jaffle_shop", + profile_config=profile_config, + # docs-specific arguments + connection_id="test_aws", + bucket_name="test_bucket", + ) + Upload to Azure Blob Storage ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~