Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source freshness checks to upstream assets #138

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

cnolanminich
Copy link
Contributor

@cnolanminich cnolanminich commented Jan 2, 2025

This PR was a learning experience for me. Sometimes we hear from prospects that they really like using dbt source freshness and would like to continue doing so with Dagster. This PR demonstrates how you would add a custom dbt command to Dagster, namely:

  • a multi asset check that automatically adds a dbt source freshness check when there is a source freshness defined in the sources.yml manifest
  • is a blocking asset check, so that if the source isn't fresh it will not allow the downstream dbt step to run in refresh_analytics_jop -- example below
  • computes one dbt freshness for a set of dbt sources selected and then reports individual asset check status
image
  • demonstrates how you can add an automation condition to an asset check -- this one uses the allow_outdated_and_missing_parents_condition, which I confess to not really understanding but works. To demo this you have to run the refresh_analytics_model_job from the launchpad and un-select running the asset checks, and then it will run.

Totally fine if we choose not to add it in but it was fun to work through! One thing I wanted to investigate still is adding the proper metadata to it

Copy link

github-actions bot commented Jan 2, 2025

Your pull request at commit 0b97ffceef1804d3586c786cde49fc01f7b26696 is automatically being deployed to Dagster Cloud.

Location Status Link Updated
data-eng-pipeline View in Cloud Jan 02, 2025 at 08:41 PM (UTC)
basics Building... Jan 02, 2025 at 08:37 PM (UTC)
hooli_bi Building... Jan 02, 2025 at 08:37 PM (UTC)
batch_enrichment Building... Jan 02, 2025 at 08:37 PM (UTC)
hooli_data_ingest Building... Jan 02, 2025 at 08:37 PM (UTC)
snowflake_insights Building... Jan 02, 2025 at 08:37 PM (UTC)

@slopp
Copy link
Collaborator

slopp commented Jan 2, 2025

This is awesome! What is the intuition behind using declarative automation for these checks?

I would assume we'd kind of want to just run the source freshness check on a regular basis? (I guess maybe this example in hooli is a bit contrived, I imagine it'd be more interesting to have freshness on external source assets we're observing instead of upstreams we're running?)

log_freshness_result = dbt_event.raw_event['info']
context.log.info(f"Filtered LogFreshnessResult: {log_freshness_result}")
passed = True if log_freshness_result['level'] == 'info' else False
severity = AssetCheckSeverity.ERROR if log_freshness_result['level'] == 'error' else AssetCheckSeverity.WARN
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool and something I think we should make more of a default practice across the integration now that blocking asset checks are more generally available with DA

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 ! They don't work with dbt tests on dbt assets I don't think? But will play around more with this as I'd like to understand it better

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC dbt tests on dbt assets will pretty much always block other stuff in the same run, since they fail the dbt op... but idk about how they work with DA

passed = True if log_freshness_result['level'] == 'info' else False
severity = AssetCheckSeverity.ERROR if log_freshness_result['level'] == 'error' else AssetCheckSeverity.WARN
yield AssetCheckResult(
asset_key=AssetKey([dbt_event.raw_event['data']['node_info']['node_relation']['schema'].upper(), dbt_event.raw_event['data']['node_info']['node_name']]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit: there is a lot asset key to dbt name transformations going on in this PR, and a nice refactor might be to pull those out into separate functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Another thing I didn't love about this was putting asset checks inside dbt_asset.py. Since I'm removing the automation condition it would be straightforward to move this into a separate file (or at least the supporting functions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second look, none of these are repeated -- do you think it makes sense to have one-off functions for each of these use cases?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately up to you, I think functions would help future us remember why we're doing all this .upper / .lower stuff lol, but I can also buy the case that you could figure it out reasonably well from the context

@cnolanminich
Copy link
Contributor Author

This is awesome! What is the intuition behind using declarative automation for these checks?
I would assume we'd kind of want to just run the source freshness check on a regular basis? (I guess maybe this example in hooli is a bit contrived, I imagine it'd be more interesting to have freshness on external source assets we're observing instead of upstreams we're running?)

It's mostly a historical artifact. When I started I imagined the freshness might be more of assessing freshness per dbt model (like: are all CLEANED/orders_cleaned` sources up to date, implemented within the dbt_asset function body. But that wasn't going to work so I abandoned it for this multi asset approach.

Currently these will run when the Dagster assets are materialized, I agree it would be cool to set up an external source (maybe the S3 asset that we currently have a sensor for?) to show how you could use something like this + an external asset to monitor state on that asset -- I'll remove from this PR though

@slopp
Copy link
Collaborator

slopp commented Jan 2, 2025

This is awesome! What is the intuition behind using declarative automation for these checks?
I would assume we'd kind of want to just run the source freshness check on a regular basis? (I guess maybe this example in hooli is a bit contrived, I imagine it'd be more interesting to have freshness on external source assets we're observing instead of upstreams we're running?)

It's mostly a historical artifact. When I started I imagined the freshness might be more of assessing freshness per dbt model (like: are all CLEANED/orders_cleaned` sources up to date, implemented within the dbt_asset function body. But that wasn't going to work so I abandoned it for this multi asset approach.

Currently these will run when the Dagster assets are materialized, I agree it would be cool to set up an external source (maybe the S3 asset that we currently have a sensor for?) to show how you could use something like this + an external asset to monitor state on that asset -- I'll remove from this PR though

And just to sanity check my own understanding then - for these particular freshness checks... since we're always running orders and users and then the dbt downstreams... it should always pass the check right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants