From 564e3da3ab94e1e040664d3e5f96d6b6675bc279 Mon Sep 17 00:00:00 2001 From: Chris Hronek Date: Tue, 7 Nov 2023 12:20:57 -0700 Subject: [PATCH 1/5] Add local operator that runs dbt source freshness command and uploads result to s3 --- cosmos/operators/local.py | 69 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 1c00f476c8..b0f0b368c4 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -544,6 +544,75 @@ def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["docs", "generate"] +class DbtFreshnessLocalOperator(DbtLocalBaseOperator): + """ + Executes `dbt source freshness` command. + Use the `callback parameter to specify a callback function to run after the command completes. + """ + + ui_color = "#8194E0" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self.base_cmd = ["source", "freshness"] + +class DbtFreshnessS3LocalOperator(DbtFreshnessLocalOperator): + """ + Executes `dbt source freshness` command and upload to S3 storage. Returns the S3 path to the generated documentation. + + :param aws_conn_id: S3's Airflow connection ID + :param bucket_name: S3's bucket name + :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be + uploaded. + """ + + ui_color = "#FF9900" + + def __init__( + self, + aws_conn_id: str, + bucket_name: str, + folder_dir: str | None = None, + **kwargs: str, + ) -> None: + "Initializes the operator." + self.aws_conn_id = aws_conn_id + self.bucket_name = bucket_name + self.folder_dir = folder_dir + + super().__init__(**kwargs) + + # override the callback with our own + self.callback = self.upload_to_s3 + + def upload_to_s3(self, project_dir: str) -> None: + "Uploads the generated documentation to S3." + logger.info( + 'Attempting to upload generated docs to S3 using S3Hook("%s")', + self.aws_conn_id, + ) + + from airflow.providers.amazon.aws.hooks.s3 import S3Hook + + target_dir = f"{project_dir}/target" + + hook = S3Hook( + self.aws_conn_id, + extra_args={ + "ContentType": "text/html", + }, + ) + + logger.info("Uploading %s to %s", "sources.json", f"s3://{self.bucket_name}/sources.json") + + key = f"{self.folder_dir}/sources.json" if self.folder_dir else "sources.json" + + hook.load_file( + filename=f"{target_dir}/sources.json", + bucket_name=self.bucket_name, + key=key, + replace=True, + ) class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC): """ From c118fc1a00220cda1c60f71adee337dfe8585058 Mon Sep 17 00:00:00 2001 From: Chris Hronek Date: Tue, 7 Nov 2023 12:23:55 -0700 Subject: [PATCH 2/5] Add run_results.json to list of required files --- cosmos/operators/local.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index b0f0b368c4..c0bdbbc007 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -538,12 +538,13 @@ class DbtDocsLocalOperator(DbtLocalBaseOperator): ui_color = "#8194E0" - required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json"] + required_files = ["index.html", "manifest.json", "graph.gpickle", "catalog.json", "run_results.json"] def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["docs", "generate"] + class DbtFreshnessLocalOperator(DbtLocalBaseOperator): """ Executes `dbt source freshness` command. @@ -556,6 +557,7 @@ def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.base_cmd = ["source", "freshness"] + class DbtFreshnessS3LocalOperator(DbtFreshnessLocalOperator): """ Executes `dbt source freshness` command and upload to S3 storage. Returns the S3 path to the generated documentation. @@ -569,11 +571,11 @@ class DbtFreshnessS3LocalOperator(DbtFreshnessLocalOperator): ui_color = "#FF9900" def __init__( - self, - aws_conn_id: str, - bucket_name: str, - folder_dir: str | None = None, - **kwargs: str, + self, + aws_conn_id: str, + bucket_name: str, + folder_dir: str | None = None, + **kwargs: str, ) -> None: "Initializes the operator." self.aws_conn_id = aws_conn_id @@ -614,6 +616,7 @@ def upload_to_s3(self, project_dir: str) -> None: replace=True, ) + class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC): """ Abstract class for operators that upload the generated documentation to cloud storage. From 35b630eb8a606a954faef99f13ad34ef933bae83 Mon Sep 17 00:00:00 2001 From: Chris Hronek Date: Tue, 7 Nov 2023 12:40:56 -0700 Subject: [PATCH 3/5] Make conn_id parameter match the DbtDocsS3Operator --- cosmos/operators/local.py | 8 ++++---- dev/dags/dbt_docs.py | 9 +++++++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index c0bdbbc007..0b132134d4 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -572,13 +572,13 @@ class DbtFreshnessS3LocalOperator(DbtFreshnessLocalOperator): def __init__( self, - aws_conn_id: str, + connection_id: str, bucket_name: str, folder_dir: str | None = None, **kwargs: str, ) -> None: "Initializes the operator." - self.aws_conn_id = aws_conn_id + self.connection_id = connection_id self.bucket_name = bucket_name self.folder_dir = folder_dir @@ -591,7 +591,7 @@ def upload_to_s3(self, project_dir: str) -> None: "Uploads the generated documentation to S3." logger.info( 'Attempting to upload generated docs to S3 using S3Hook("%s")', - self.aws_conn_id, + self.connection_id, ) from airflow.providers.amazon.aws.hooks.s3 import S3Hook @@ -599,7 +599,7 @@ def upload_to_s3(self, project_dir: str) -> None: target_dir = f"{project_dir}/target" hook = S3Hook( - self.aws_conn_id, + self.connection_id, extra_args={ "ContentType": "text/html", }, diff --git a/dev/dags/dbt_docs.py b/dev/dags/dbt_docs.py index edf89bdab6..42a4eb39d3 100644 --- a/dev/dags/dbt_docs.py +++ b/dev/dags/dbt_docs.py @@ -20,6 +20,7 @@ from cosmos.operators import ( DbtDocsAzureStorageOperator, DbtDocsS3Operator, + DbtFreshnessS3Operator, DbtDocsGCSOperator, ) from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -83,6 +84,14 @@ def which_upload(): bucket_name="cosmos-docs", ) + generate_dbt_freshness_aws = DbtFreshnessS3Operator( + task_id="generate_dbt_freshness_aws", + project_dir=DBT_ROOT_PATH / "jaffle_shop", + profile_config=profile_config, + connection_id=S3_CONN_ID, + bucket_name="cosmos-docs", + ) + generate_dbt_docs_azure = DbtDocsAzureStorageOperator( task_id="generate_dbt_docs_azure", project_dir=DBT_ROOT_PATH / "jaffle_shop", From 9a6b346ec7a65b381856de2e1484223612006299 Mon Sep 17 00:00:00 2001 From: Chris Hronek Date: Tue, 7 Nov 2023 12:51:48 -0700 Subject: [PATCH 4/5] Add DbtFreshnessS3Operator operator to dbt_docs dag for testing --- cosmos/operators/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cosmos/operators/__init__.py b/cosmos/operators/__init__.py index b7e36abff5..a220304d55 100644 --- a/cosmos/operators/__init__.py +++ b/cosmos/operators/__init__.py @@ -2,6 +2,7 @@ from .local import DbtDocsAzureStorageLocalOperator as DbtDocsAzureStorageOperator from .local import DbtDocsLocalOperator as DbtDocsOperator from .local import DbtDocsS3LocalOperator as DbtDocsS3Operator +from .local import DbtFreshnessS3LocalOperator as DbtFreshnessS3Operator from .local import DbtDocsGCSLocalOperator as DbtDocsGCSOperator from .local import DbtLSLocalOperator as DbtLSOperator from .local import DbtRunLocalOperator as DbtRunOperator @@ -20,6 +21,7 @@ "DbtDepsOperator", "DbtDocsOperator", "DbtDocsS3Operator", + "DbtFreshnessS3Operator", "DbtDocsAzureStorageOperator", "DbtDocsGCSOperator", ] From 6225db8570fbfd92a81e4c497b5b1f651ac0ae6e Mon Sep 17 00:00:00 2001 From: Chris Hronek Date: Tue, 7 Nov 2023 13:14:11 -0700 Subject: [PATCH 5/5] Add DbtFreshnessS3OPerator to the docs --- docs/configuration/generating-docs.rst | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/configuration/generating-docs.rst b/docs/configuration/generating-docs.rst index 88459fd14e..73743f65db 100644 --- a/docs/configuration/generating-docs.rst +++ b/docs/configuration/generating-docs.rst @@ -10,6 +10,7 @@ Many users choose to generate and serve these docs on a static website. This is Cosmos offers two pre-built ways of generating and uploading dbt docs and a fallback option to run custom code after the docs are generated: - :class:`~cosmos.operators.DbtDocsS3Operator`: generates and uploads docs to a S3 bucket. +- :class:`~cosmos.operators.DbtFreshnessS3Operator`: generates and uploads `sources.json `_ doc to an S3 bucket - :class:`~cosmos.operators.DbtDocsAzureStorageOperator`: generates and uploads docs to an Azure Blob Storage. - :class:`~cosmos.operators.DbtDocsGCSOperator`: generates and uploads docs to a GCS bucket. - :class:`~cosmos.operators.DbtDocsOperator`: generates docs and runs a custom callback. @@ -41,6 +42,15 @@ You can use the :class:`~cosmos.operators.DbtDocsS3Operator` to generate and upl bucket_name="test_bucket", ) + generate_dbt_freshness_aws = DbtFreshnessS3Operator( + task_id="generate_dbt_freshness_aws", + project_dir="path/to/jaffle_shop", + profile_config=profile_config, + # docs-specific arguments + connection_id="test_aws", + bucket_name="test_bucket", + ) + Upload to Azure Blob Storage ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~