Skip to content

Fix duplicate log lines in watcher subprocess execution and format timestamps#2301

Merged
pankajkoti merged 2 commits into
mainfrom
fix/watcher-subprocess-duplicate-logs
Jan 29, 2026
Merged

Fix duplicate log lines in watcher subprocess execution and format timestamps#2301
pankajkoti merged 2 commits into
mainfrom
fix/watcher-subprocess-duplicate-logs

Conversation

@pankajkoti
Copy link
Copy Markdown
Contributor

Changes:

  • Remove duplicate logging of dbt messages in store_dbt_resource_status_from_log(). The message was being logged twice: once in the else block and again at the end of the function. Now it's only logged once with the appropriate log level.

  • Format timestamps to match dbt runner format (HH:MM:SS) instead of full ISO format. This provides consistent log output between subprocess and dbt_runner modes. Example: "13:16:05 Running with dbt=1.10.11"

  • Add unit tests to verify:

    • Log messages are logged exactly once (no duplicates)
    • Timestamps are formatted correctly as HH:MM:SS
  • Extend integration tests (test_dbt_dag_with_watcher, test_dbt_dag_with_watcher_and_subprocess) to verify no duplicate log messages in both dbt_runner and subprocess modes

related: #2287
related: #2241

@netlify
Copy link
Copy Markdown

netlify Bot commented Jan 29, 2026

Deploy Preview for astronomer-cosmos canceled.

Name Link
🔨 Latest commit e6d35bf
🔍 Latest deploy log https://app.netlify.com/projects/astronomer-cosmos/deploys/697b8a522c9d600008a3d085

…mestamps

Changes:
- Remove duplicate logging of dbt messages in store_dbt_resource_status_from_log()
  The message was being logged twice: once in the else block and again at the end
  of the function. Now it's only logged once with the appropriate log level.

- Format timestamps to match dbt runner format (HH:MM:SS) instead of full ISO format
  This provides consistent log output between subprocess and dbt_runner modes.
  Example: "13:16:05  Running with dbt=1.10.11"

- Add unit tests to verify:
  - Log messages are logged exactly once (no duplicates)
  - Timestamps are formatted correctly as HH:MM:SS

- Extend integration tests (test_dbt_dag_with_watcher, test_dbt_dag_with_watcher_and_subprocess)
  to verify no duplicate log messages in both dbt_runner and subprocess modes
@pankajkoti pankajkoti force-pushed the fix/watcher-subprocess-duplicate-logs branch from 199fac6 to 7f474d3 Compare January 29, 2026 14:10
Comment thread cosmos/operators/_watcher/base.py
Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, @pankajkoti. It was probably my mistake during rebasing.

Looks great! I do not believe we need timestamp parity. From my perspective, this makes our codebase more complex without any clear benefit, because we can already see the Airflow timestamp when we're invoking from Airflow. If there was an easy way, I'd remove that timestamp from the dbtRunner implementation 😂

That said, I believe the duplicate issue is the biggest problem and I'm happy for things to be the same, if you prefer - we'll just need to add tests to cover the additional logic.

@tatiana tatiana added this to the Cosmos 1.13.0 milestone Jan 29, 2026
@pankajkoti
Copy link
Copy Markdown
Contributor Author

Previously, we were seeing duplicate logs in the Watcher Subprocess execution
Screenshot 2026-01-29 at 7 27 19 PM

Now, after the change in this PR, we have only one log, and it also includes the timestamp to be consistent with the dbt Runner logs output

Subprocess output:
Screenshot 2026-01-29 at 7 27 07 PM

dbt Runner output:
Screenshot 2026-01-29 at 7 27 00 PM

@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.07%. Comparing base (6fabe60) to head (e6d35bf).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2301   +/-   ##
=======================================
  Coverage   98.07%   98.07%           
=======================================
  Files         100      100           
  Lines        6452     6458    +6     
=======================================
+ Hits         6328     6334    +6     
  Misses        124      124           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread cosmos/operators/_watcher/base.py
@pankajkoti
Copy link
Copy Markdown
Contributor Author

Looks great! I do not believe we need timestamp parity. From my perspective, this makes our codebase more complex without any clear benefit, because we can already see the Airflow timestamp when we're invoking from Airflow. If there was an easy way, I'd remove that timestamp from the dbtRunner implementation 😂

That said, I believe the duplicate issue is the biggest problem and I'm happy for things to be the same, if you prefer - we'll just need to add tests to cover the additional logic.

Yes, I am a bit torn on this one. I am slightly inclined to preserve the timestamp for the following reasons:

  1. Only watcher subprocess logs do not have a timestamp value. Other execution modes' subprocess runs still include the timestamp
  2. This timestamp is the real timestamp given by dbt event. Airflow logs might have slight delays (at times, maybe?) and reorderings of logs are possible at a time, based on how the flushing works(?).
  3. If users have a regex configured for filtering/processing logs forwarded to their log store, the consistency in logs could still benefit, I believe.

Hence, I am thinking of preserving this for now. But if we feel it is a maintenance burden happy for it to be removed in a subsequent PR.

@pankajkoti pankajkoti marked this pull request as ready for review January 29, 2026 17:08
Copilot AI review requested due to automatic review settings January 29, 2026 17:08
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes duplicate log lines in watcher subprocess execution and standardizes timestamp formatting. The changes ensure that dbt log messages are logged exactly once (previously logged twice due to duplicate logic) and format timestamps in HH:MM:SS format to match dbt runner output instead of full ISO format.

Changes:

  • Removed duplicate logging in store_dbt_resource_status_from_log() function
  • Added timestamp formatting to convert ISO format to HH:MM:SS format
  • Added comprehensive unit and integration tests to verify single logging and timestamp formatting

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
cosmos/operators/_watcher/base.py Removed duplicate log statement and added timestamp formatting logic with error handling
tests/operators/test_watcher.py Added unit tests for single logging, timestamp formatting, and invalid timestamp handling; extended integration tests to verify no duplicates

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@pankajkoti pankajkoti merged commit 1aa4485 into main Jan 29, 2026
90 checks passed
@pankajkoti pankajkoti deleted the fix/watcher-subprocess-duplicate-logs branch January 29, 2026 17:21
@pankajastro pankajastro mentioned this pull request Jan 29, 2026
tatiana added a commit that referenced this pull request Jan 30, 2026
Features

* Support cross-referencing models across dbt projects using dbt-loom by
@pankajkoti in #2271
* Support use of YAML selectors when using ``LoadMode.DBT_MANIFEST`` by
@YourRoyalLinus in #2261
* Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use the watcher with
``KubernetesPodOperator`` by @tatiana in #2207
* Add support for StarRocks profile mapping by @kurkim0661 in #2256
* Allow pushing URIs as XComs for Cosmos tasks by @corsettigyg in #2275
* Support defining custom callbacks alongside the ``WATCHER_KUBERNETES``
callback by @johnhoran in #2307

Enhancements

* Refactor: remove duplicate ``_construct_dest_file_path`` by @jx2lee in
#2077
* Leverage Airflow ``::group::`` to group logs associated with DAG
parsing by @tatiana in #2235
* Refactor ``DbtConsumerWatcherSensor`` for reusability by @tatiana in
#2245
* Restore plain text output when using ``ExecutionMode.WATCHER`` by
@tiovader in #2241

Bug Fixes

* Fix running empty models or ephemeral nodes in
``ExecutionMode.WATCHER`` by @tatiana in #2279
* Improve watcher producer task priority in scheduling and the UI by
@tatiana in #2237
* Fix typos and formatting issues in documentation by @pankajkoti in
#2259
* Allow watcher producer retries without erroring by @tatiana in #2283
* Fix ``TestBehavior.AFTER_ALL`` is missing project_name information
when loading project using manifest file by @tuantran0910 in #2242
* Fix duplicate log lines in watcher subprocess execution and format
timestamps by @pankajkoti in #2301

Docs

* Add Watcher Kubernetes documentation by @tatiana in #2303
* Document newly added telemetry metrics in the privacy notice by
@pankajkoti in #2249
* Add compatibility policy document by @pankajastro in #2251
* Improve watcher documentation related to dbt threads by @tatiana in
#2273
* Fix link in watcher execution mode documentation by @jedcunningham in
#2277
* Update Apache Airflow minimum compatibility policy by @tatiana in
#2285
* Clarify Cosmos runtime support until "End of Basic Support" by
@jedcunningham in #2286
* Update watcher docs by @tatiana in #2298
* Update watcher kubernetes documentation by @tatiana in #2306

Others

* Add Airflow 3 DAG versioning tests for Cosmos by @michal-mrazek in
#2177
* Add dbt Core 1.11 to the test matrix by @tatiana in #2230
* Add integration tests using InvocationMode.SUBPROCESS and validate
output by @tatiana in #2287
* Fix main branch failing tests by @tatiana in #2296
* Update pre-commit hooks to the latest versions by @jedcunningham in
#2289
* Pre-commit autoupdates by @pre-commit in #2222, #2264, #2274 and #2290
* Dependabot updates by @dependabot in #2218, #2219, #2220, #2280 and
#2284
* Add Scarf metrics to understand Cosmos feature usage patterns
- Add telemetry tracking for dbt docs plugin usage by @pankajkoti in
#2240
- Add DAG run telemetry metrics for load mode, invocation, and
render_config parameters by @pankajkoti in #2223
  - Collect profile metrics for DAG runs by @pankajastro in #2228
- Compress telemetry metadata to reduce serialized DAG size by
@pankajkoti in #2252
- Skip storing telemetry metadata when emission is disabled by
@pankajkoti in #2278
- Hide telemetry metadata parameters from the Airflow trigger UI by
@pankajkoti in #2247

closes:
astronomer/oss-integrations-private#317

---------

Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants