Skip to content

Refactor Watcher InvocationMode.SUBPROCESS log parser#2183

Merged
tatiana merged 5 commits into
mainfrom
refactor-watcher-log-processor
Dec 5, 2025
Merged

Refactor Watcher InvocationMode.SUBPROCESS log parser#2183
tatiana merged 5 commits into
mainfrom
refactor-watcher-log-processor

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented Dec 4, 2025

While the PR #2152 introduced the capability of users running the ExecutionMode.WATCHER with dbt installed in a separate virtualenv, and still get updates in "real time", it came with two downsides:

  1. Spread the watcher logic into multiple areas of the Cosmos code base (inside the utils folder, and also inside the hooks/subprocess - which should be focused only on running suprocess);
  2. It changed the subprocess hook to always invoke the method _store_dbt_resource_status_from_log(line, **kwargs) for each log line made available in stdout, even when users are not using ExecutionMode.WATCHER .

This PR refactors the implementation to keep watcher implementation details inside the operators/ folder. It also allows operators that subclass the AbstractDbtLocalBase to define the attribute _process_log_line_callable. Only classes that define this function will parse the log lines with the desired logic. At the moment, only the DbtProducerWatcherOperator implements the logic of uploading the status to XCom.

@tatiana tatiana changed the title Refactor watcher log processor Refactor watcher InvocationMode.SUBPROCESS log parser Dec 4, 2025
@tatiana tatiana changed the title Refactor watcher InvocationMode.SUBPROCESS log parser Refactor Watcher InvocationMode.SUBPROCESS log parser Dec 4, 2025
@tatiana tatiana marked this pull request as ready for review December 5, 2025 09:49
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the InvocationMode.SUBPROCESS log parser implementation for the Watcher execution mode to improve code organization and reduce unnecessary processing overhead. The main motivation is to consolidate watcher-specific logic into the operators folder and avoid parsing logs when not using the Watcher execution mode.

Key Changes:

  • Moved _store_dbt_resource_status_from_log from the subprocess hook to the watcher operator module
  • Introduced _process_log_line_callable attribute in AbstractDbtLocalBase to allow selective log line processing
  • Relocated watcher state utilities from _utils/watcher_state.py to operators/_watcher/state.py

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
cosmos/hooks/subprocess.py Removed log parsing method and added optional process_log_line parameter to delegate log processing to callers
cosmos/operators/local.py Added _process_log_line_callable attribute and wired it to the subprocess hook's run_command method
cosmos/operators/watcher.py Implemented _store_dbt_resource_status_from_log as a standalone function and set it as the callable for DbtProducerWatcherOperator
cosmos/operators/_watcher/init.py Created new module to expose watcher state utilities
cosmos/_triggers/watcher.py Updated import path for build_producer_state_fetcher
tests/hooks/test_subprocess.py Updated tests to call the relocated function and adjusted mock paths

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/local.py Outdated
Comment thread cosmos/operators/watcher.py Outdated
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Dec 5, 2025

Codecov Report

❌ Patch coverage is 96.15385% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 97.84%. Comparing base (e6ee9a2) to head (d215d6f).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
cosmos/hooks/subprocess.py 66.66% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2183      +/-   ##
==========================================
- Coverage   97.86%   97.84%   -0.02%     
==========================================
  Files          93       94       +1     
  Lines        6031     6036       +5     
==========================================
+ Hits         5902     5906       +4     
- Misses        129      130       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I have posted a minor comment.

Comment thread cosmos/operators/local.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented Dec 5, 2025

It seems the only coverage concern is a line that was already not covered before this PR, and was highlighted due to the file rename:
https://app.codecov.io/gh/astronomer/astronomer-cosmos/pull/2183/blob/cosmos/operators/_watcher/state.py?dropdown=coverage

Therefore, I'm going ahead and merging the change.

@tatiana tatiana merged commit b8f77b3 into main Dec 5, 2025
82 of 84 checks passed
@tatiana tatiana deleted the refactor-watcher-log-processor branch December 5, 2025 12:58
@pankajkoti pankajkoti mentioned this pull request Dec 9, 2025
pankajkoti added a commit that referenced this pull request Dec 18, 2025
Breaking changes

* Introduced in the PR #2080. The following functions are expected to be
used internally only to Cosmos, so we hope these won't impact end-users,
but we are documenting the changes just in case:
- ``generate_task_or_group`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` now expects the ``node_converters`` argument
* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165

Features

* Support applying ``node_converter`` at a task level instead of task
group level by @anyapriya in #1759
* Allow overriding ``DbtProducerWatcherOperator`` parameters via
``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133
* Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by
@pankajastro in #2084
* Support real-time consumer updates when using
``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by
@pankajastro in #2152
* Update telemetry to v3 format with query parameters by @pankajkoti in
#2192
* Add initial set of telemetry task listener metrics for Cosmos
operators by @pankajkoti in #2195

Enhancements

* Unify Airflow version handling into ``constants.py`` by @tatiana in
#2089
* Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in
#2080
* Force watcher producer retries to zero by @pankajkoti in #2114
* Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the
producer fails using Airflow context by @pankajkoti in #2126
* ``ExecutonMode.WATCHER``: fetch producer status asynchronously from
the Airflow runtime so deferrable sensors fail immediately when the
producer task fails by @pankajkoti in #2144
* Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log
parser by @tatiana in #2183
* Replace map_index with is_mapped_task boolean in task telemetry
metrics by @pankajkoti in #2210
* Collect cosmos profile metrics in task telemetry metrics by
@pankajastro in #2198
* Remove unnecessary information from telemetry by @tatiana in #2211

Bug fixes

* Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by
@pankajkoti in #2124
* Remove empty test tasks when all tests are detached by @anyapriya in
#2010
* Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by
@michal-mrazek in #2127
* Add databricks oauth mock profile by @fjmacagno in #2164
* Register listeners in Airflow 3 plugin implementation by @pankajastro
in #2187
* Fix resolution of ``packages-install-path`` when it uses ``env_var``
by @tatiana in #2194
* Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include
``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis
in #2209
* Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro
in #2184
* Remove dag_run_id from telemetry tests by @tatiana in #2213

Docs

* Document dataset-event limitation when using
``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143
* Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana  in #2139
* Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro
in #2115
* Fix reStructuredText formatting by @dnskr in #2132
* Add docs for ``setup_operator_args`` param by @pankajastro in #2136
* Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by
@pankajastro in #2153
* Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti
in #2167
* Update PRIVACY_NOTICE.rst by @tatiana in #2212

Others

* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165
* Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140
* Enable tests for Python 3.13 by @pankajastro in #2154
* Add Python 3.12 to CI integration tests matrix by @pankajastro in
#2168
* Retry flaky Telemetry success test to stabilise CI by @pankajkoti in
#2138
* Drop unused producer state xcom handling in ``ExecutionMode.WATCHER``
by @pankajkoti in #2145
* Remove unused Python3.9 uses from Github action CI by @pankajastro in
#2117
* Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in
#2150
* Refactor: Use shared airflow version constant by @pankajkoti in #2157
* Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in
#2172
* Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170
* Add targeted ``type: ignore`` for untyped decorators to fix ``mypy``
errors by @pankajastro in #2174
* Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by
@pankajastro in #2175
* Refactor to reuse ``load_method_from_module`` from
``_utils/importer.py`` by @pankajastro in #2176
* Remove try except block for cache import and unused python_version
variable by @pankajastro in #2186
* Unpin Airflow to satisfy GitHub Security tab requirements by
@pankajastro in #2171
* Update Python version for ``pyupgrade`` in ``pre-commit`` config by
@pankajastro in #2190
* Add cooldown config in ``dependabot`` config by @pankajastro in #2189
* Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in
#2196
* Remove empty variables emission from telemetry metrics by @pankajkoti
in #2197
* Reformat documented comments for historical URL formats by @pankajkoti
in #2199
* Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot
in #2135
* Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by
@dependabot in #2147
* Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by
@dependabot in #2156
* Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot
in #2155
* Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot
in #2178
* Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by
@dependabot in #2208
* pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173,
#2191, #2202

closes:
astronomer/oss-integrations-private#275
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants