Skip to content

[Feature] Refactor and add support for schedule conditions in DAG configuration:#320

Merged
tatiana merged 25 commits into
astronomer:mainfrom
ErickSeo:feat/enable_schedule_dataset_condition
Jan 10, 2025
Merged

[Feature] Refactor and add support for schedule conditions in DAG configuration:#320
tatiana merged 25 commits into
astronomer:mainfrom
ErickSeo:feat/enable_schedule_dataset_condition

Conversation

@ErickSeo

@ErickSeo ErickSeo commented Dec 16, 2024

Copy link
Copy Markdown
Contributor

Description

This feature introduces a enhancement to DAG scheduling in Airflow, enabling support for dynamic schedules based on dataset conditions. By leveraging dataset filters and logical conditions, users can now create more flexible and precise scheduling rules tailored to their workflows.

Key Features:

  • Condition-Based Scheduling: Allows defining schedules using logical conditions between datasets (e.g., ('dataset_1' & 'dataset_2') | 'dataset_3'), enabling workflows to trigger dynamically based on dataset availability.

  • Dynamic Dataset Processing: Introduced the process_file_with_datasets function to evaluate and process dataset URIs from external files, supporting both simple and condition-based schedules.

  • Improved Dataset Evaluation: Developed the evaluate_condition_with_datasets function to transform dataset URIs into valid variable names and evaluate logical conditions securely.

Workflow Example:
Given the following condition:

example_custom_config_condition_dataset_consumer_dag:
  description: "Example DAG consumer custom config condition datasets"
  schedule:
    file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml
    datasets:  "((dataset_custom_1 & dataset_custom_2) | dataset_custom_3)"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"
example_without_custom_config_condition_dataset_consumer_dag:
  description: "Example DAG consumer custom config condition datasets"
  schedule:
    datasets: "((s3://bucket-cjmm/raw/dataset_custom_1 & s3://bucket-cjmm/raw/dataset_custom_2) | s3://bucket-cjmm/raw/dataset_custom_3)"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"
example_without_custom_config_condition_dataset_consumer_dag:
  description: "Example DAG consumer custom config condition datasets"
  schedule:
    datasets: 
      !or  
        - !and  
          - "s3://bucket-cjmm/raw/dataset_custom_1"  
          - "s3://bucket-cjmm/raw/dataset_custom_2"
        - "s3://bucket-cjmm/raw/dataset_custom_3"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"

The system evaluates the datasets, ensuring valid references, and schedules the DAG dynamically when the condition resolves to True.

Example Use Case:
Consider a data pipeline that processes files only when multiple interdependent datasets are updated. With this feature, users can create dynamic DAG schedules that automatically adjust based on dataset availability and conditions, optimizing resource allocation and execution timing.

Images:
Captura de tela 2024-12-16 181059
Captura de tela 2024-12-16 181103
Captura de tela 2024-12-16 181131

- Added support for schedules defined by conditions, enabling dynamic scheduling based on dataset filters and conditions.
- Introduced `configure_schedule` function to streamline DAG schedule setup based on Airflow version and parameters.
- Created `process_file_with_datasets` function to handle dataset processing and conditional evaluation from files.
- Implemented `evaluate_condition_with_datasets` to evaluate schedule conditions while ensuring valid variable names for dataset URIs.
- Replaced repetitive code with reusable functions for better modularity and maintainability.
- Enhanced code readability by adding detailed docstrings for all functions, following a standard format.
- Improved safety by avoiding reliance on `globals()` in `evaluate_condition_with_datasets`.
- remove self from unit test
- Implemented logic to handle schedules with both file and datasets attributes.
- Added support for evaluating conditions with datasets for Airflow version 2.9 and above.
- Cleaned up schedule dictionary by removing processed keys.
- Added logic to handle schedules with both file and datasets attributes.
- Implemented support for evaluating conditions with datasets for Airflow version 2.9 and above.
- Cleaned up schedule dictionary by removing processed keys after use.
@codecov-commenter

codecov-commenter commented Dec 19, 2024

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 97.43590% with 3 lines in your changes missing coverage. Please review.

Project coverage is 94.01%. Comparing base (48e5575) to head (fb33b67).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
dagfactory/utils.py 88.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #320      +/-   ##
==========================================
+ Coverage   93.62%   94.01%   +0.39%     
==========================================
  Files          10       11       +1     
  Lines         784      886     +102     
==========================================
+ Hits          734      833      +99     
- Misses         50       53       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana added this to the DAG Factory 0.22.0 milestone Dec 30, 2024
@tatiana tatiana self-assigned this Dec 30, 2024
Comment thread dagfactory/dagbuilder.py Outdated

@tatiana tatiana left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ErickSeo, this feature is super exciting. It significantly improves how DAG factory handles datasets. I'm sorry for the delay in reviewing it. I've left some comments in-line and am happy to discuss alternatives.

If we're able to reach a consensus on the implementation, we can aim to release this feature in DAG Factory 0.22, planned to be released on 10 January 2025.

Comment thread dagfactory/__init__.py Outdated
Comment thread dagfactory/dagbuilder.py Outdated
Comment thread dagfactory/dagbuilder.py
Comment thread dagfactory/dagbuilder.py
Comment thread dev/dags/datasets/example_dag_datasets.yml Outdated
Comment thread dev/dags/datasets/example_dag_datasets.yml Outdated
@pankajkoti pankajkoti mentioned this pull request Jan 10, 2025

@tatiana tatiana left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very exciting feature, @ErickSeo ! Thank you very much for adding support to a more powerful way of scheduling DAGs in DAG Factory.

Thank you for addressing all the feedback. I'm sure there will be use-cases we did not think about - but we can always iterate over them.

@tatiana tatiana merged commit 4f2a57f into astronomer:main Jan 10, 2025
@pankajkoti

Copy link
Copy Markdown
Contributor

Yes, indeed thanks a lot for the contribution and quickly addressing the review comments, much helpful @ErickSeo 🚀

tatiana pushed a commit that referenced this pull request Jan 10, 2025
### Added

- Propagate provided dag_display_name to built dag by @pankajkoti in
#326
- Add incipient documentation tooling by @tatiana in #328
- Support loading `default_args` from shared `defaults.yml` by
@pankajastro in #330
- Add security policy by @tatiana in #339
- Add Robust Support for Callbacks at Task and TaskGroup Level by
@@jroach-astronomer in #322
- Support `ExternalTaskSensor` `execution_date_fn` and `execution_delta`
by @tatiana in #354
- Refactor and add support for schedule conditions in DAG configuration
by @ErickSeo in #320

### Fixed

- Handle gracefully exceptions during telemetry collection by @tatiana
in #335
- Adjust `markdownlint` configuration to enforce 4-space indentation for
proper `mkdocs` rendering by @pankajkoti in #345

### Docs

- Create initial documentation index by @tatiana in #325
- Use absolute URLs for failing links in docs/index.md by @pankajkoti in
#331
- Add quick start docs by @pankajastro in #324
- Add docs comparing Python and YAML-based DAGs by @tatiana in #327
- Add docs about project contributors and their roles by @tatiana in
#341
- Add documentation to support developers by @tatiana in #343
- Add docs for configuring workflows, environment variables and defaults
by @pankajkoti in #338
- Add code of conduct for contributors and DAG factory community by
@tatiana in #340
- Document Dynamic Task Mapping feature by @pankajkoti in #344
- Fix warning message 404 in code_of_conduct docs by @pankajastro in
#346
- Update theme for documentation by @pankajastro in #348
- Fix markdownlint errors and some rendering improvements by
@pankajastro in #356
- Reword content in documentation by @yanmastin-astro in #336

### Others

- Improve integration tests scripts by @tatiana in #316
- Add Markdown pre-commit checks by @tatiana in #329
- Remove Airflow <> 2.0.0 check by @pankajastro in #334
- Reduce telemetry timeout from 5 to 1 second by @tatiana in #337
- Add GH action job to deploy docs by @pankajastro in #342
- Enable Depandabot to scan outdated Github Actions dependencies by
@tatiana in #347
- Improve docs deploy job by @pankajastro in #352
- Unify how we build dagfactory by @tatiana in #353
- Fix running make docker run when previous versions were run locally by
@tatiana in #362
- Install `jq` in `dev` container by @pankajastro in #363
- Dependabot GitHub actions version upgrades in #349, #350, #351


Closes: #306
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants