Add integration tests using InvocationMode.SUBPROCESS and validate output#2287
Conversation
✅ Deploy Preview for astronomer-cosmos ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
5ec35df to
3d192b7
Compare
0ca336d to
e6baa5c
Compare
fd7e90b to
58d3100
Compare
InvocationMode.SUBPROCESS and validate output
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2287 +/- ##
==========================================
+ Coverage 97.99% 98.07% +0.08%
==========================================
Files 100 100
Lines 6440 6452 +12
==========================================
+ Hits 6311 6328 +17
+ Misses 129 124 -5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR adds integration tests to validate that Cosmos 1.13 correctly outputs user-friendly logs (instead of JSON) when using ExecutionMode.WATCHER with InvocationMode.SUBPROCESS. The changes address a regression from Cosmos 1.12 where watcher mode always output JSON logs.
Changes:
- Added integration test for subprocess invocation mode with output validation
- Refactored watcher operator to set JSON logging only for subprocess mode
- Replaced
self.logwith cosmos logger in subprocess hook for consistent output capture - Removed duplicate unit tests that are now covered by integration tests
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/operators/test_watcher.py | Added subprocess integration test, removed duplicate unit tests, and added output validation |
| tests/dbt/test_executable.py | Added unit tests for dbt executable discovery functions |
| scripts/test/integration.sh | Created subprocess virtual environment for integration testing |
| cosmos/operators/watcher.py | Refactored to conditionally set JSON logging and log processor only for subprocess mode |
| cosmos/operators/local.py | Removed redundant output logging after subprocess execution |
| cosmos/operators/_watcher/base.py | Added JSON log parsing with user-friendly message extraction |
| cosmos/hooks/subprocess.py | Replaced Airflow's self.log with cosmos logger for consistent output |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pankajkoti
left a comment
There was a problem hiding this comment.
Tested the change locally. Looks good. I am seeing duplicate log lines for the watcher subprocess execution. I will investigate this shortly and see can create another PR to remove the duplicate logs.
…mestamps (#2301) Changes: - Remove duplicate logging of dbt messages in store_dbt_resource_status_from_log(). The message was being logged twice: once in the else block and again at the end of the function. Now it's only logged once with the appropriate log level. - Format timestamps to match dbt runner format (HH:MM:SS) instead of full ISO format. This provides consistent log output between subprocess and dbt_runner modes. Example: "13:16:05 Running with dbt=1.10.11" - Add unit tests to verify: - Log messages are logged exactly once (no duplicates) - Timestamps are formatted correctly as HH:MM:SS - Extend integration tests (test_dbt_dag_with_watcher, test_dbt_dag_with_watcher_and_subprocess) to verify no duplicate log messages in both dbt_runner and subprocess modes related: #2287 related: #2241
Features * Support cross-referencing models across dbt projects using dbt-loom by @pankajkoti in #2271 * Support use of YAML selectors when using ``LoadMode.DBT_MANIFEST`` by @YourRoyalLinus in #2261 * Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use the watcher with ``KubernetesPodOperator`` by @tatiana in #2207 * Add support for StarRocks profile mapping by @kurkim0661 in #2256 * Allow pushing URIs as XComs for Cosmos tasks by @corsettigyg in #2275 * Support defining custom callbacks alongside the ``WATCHER_KUBERNETES`` callback by @johnhoran in #2307 Enhancements * Refactor: remove duplicate ``_construct_dest_file_path`` by @jx2lee in #2077 * Leverage Airflow ``::group::`` to group logs associated with DAG parsing by @tatiana in #2235 * Refactor ``DbtConsumerWatcherSensor`` for reusability by @tatiana in #2245 * Restore plain text output when using ``ExecutionMode.WATCHER`` by @tiovader in #2241 Bug Fixes * Fix running empty models or ephemeral nodes in ``ExecutionMode.WATCHER`` by @tatiana in #2279 * Improve watcher producer task priority in scheduling and the UI by @tatiana in #2237 * Fix typos and formatting issues in documentation by @pankajkoti in #2259 * Allow watcher producer retries without erroring by @tatiana in #2283 * Fix ``TestBehavior.AFTER_ALL`` is missing project_name information when loading project using manifest file by @tuantran0910 in #2242 * Fix duplicate log lines in watcher subprocess execution and format timestamps by @pankajkoti in #2301 Docs * Add Watcher Kubernetes documentation by @tatiana in #2303 * Document newly added telemetry metrics in the privacy notice by @pankajkoti in #2249 * Add compatibility policy document by @pankajastro in #2251 * Improve watcher documentation related to dbt threads by @tatiana in #2273 * Fix link in watcher execution mode documentation by @jedcunningham in #2277 * Update Apache Airflow minimum compatibility policy by @tatiana in #2285 * Clarify Cosmos runtime support until "End of Basic Support" by @jedcunningham in #2286 * Update watcher docs by @tatiana in #2298 * Update watcher kubernetes documentation by @tatiana in #2306 Others * Add Airflow 3 DAG versioning tests for Cosmos by @michal-mrazek in #2177 * Add dbt Core 1.11 to the test matrix by @tatiana in #2230 * Add integration tests using InvocationMode.SUBPROCESS and validate output by @tatiana in #2287 * Fix main branch failing tests by @tatiana in #2296 * Update pre-commit hooks to the latest versions by @jedcunningham in #2289 * Pre-commit autoupdates by @pre-commit in #2222, #2264, #2274 and #2290 * Dependabot updates by @dependabot in #2218, #2219, #2220, #2280 and #2284 * Add Scarf metrics to understand Cosmos feature usage patterns - Add telemetry tracking for dbt docs plugin usage by @pankajkoti in #2240 - Add DAG run telemetry metrics for load mode, invocation, and render_config parameters by @pankajkoti in #2223 - Collect profile metrics for DAG runs by @pankajastro in #2228 - Compress telemetry metadata to reduce serialized DAG size by @pankajkoti in #2252 - Skip storing telemetry metadata when emission is disabled by @pankajkoti in #2278 - Hide telemetry metadata parameters from the Airflow trigger UI by @pankajkoti in #2247 closes: astronomer/oss-integrations-private#317 --------- Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Between Cosmos 1.11 and Cosmos 1.12, the Cosmos `ExecutionMode.WATCHER changed its logging behaviour to always output JSON, which is a much worse experience for end-users.
We had a contribution from @tiovader to fix this issue (#2241), and we will release it in Cosmos 1.13, planned for this week.
To avoid regressions in how Cosmos outputs user-friendly logs, this PR extends our integration tests to cover this behaviour and further improves the implementation.