Skip to content

Support real-time consumer updates when using ExecutionMode.WATCHER and InvocationMode.SUBPROCESS#2152

Merged
pankajastro merged 21 commits into
mainfrom
ISSUE-2060
Dec 3, 2025
Merged

Support real-time consumer updates when using ExecutionMode.WATCHER and InvocationMode.SUBPROCESS#2152
pankajastro merged 21 commits into
mainfrom
ISSUE-2060

Conversation

@pankajastro
Copy link
Copy Markdown
Contributor

@pankajastro pankajastro commented Nov 22, 2025

closes: #2060
This PR modifies ExecutionMode.WATCHER so that, when InvocationMode.SUBPROCESS is used, sensor tasks can still be updated in real time based on dbt status information from structured JSON logs, rather than parsing run_results.json after the full dbt pipeline is executed. The change enables real-time status tracking for subprocess-based dbt invocations.

Key Changes:

  • Added --log-format json flag to dbt build commands in watcher mode
  • Implemented real-time JSON log parsing in subprocess hook to extract and store node status to XCom
  • Refactored XCom utility functions to a new common module for better code organization

We can see here that the producer task is running but some node has already succeeded for WATCHER mode with SUBPROCESS invocation
Screenshot 2025-11-22 at 7 29 35 PM

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR modifies the WATCHER execution mode and SUBPROCESS invocation mode to consume dbt status information from structured JSON logs rather than parsing run_results.json after execution completes. The change enables real-time status tracking for subprocess-based dbt invocations.

Key Changes:

  • Added --log-format json flag to dbt build commands in watcher mode
  • Implemented real-time JSON log parsing in subprocess hook to extract and store node status to XCom
  • Refactored XCom utility functions to a new common module for better code organization

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
cosmos/operators/watcher.py Removed local safe_xcom_push (moved to utils), added log_format="json" configuration, switched from parsing run_results to direct XCom status lookup
cosmos/operators/local.py Added **kwargs parameter support to pass context through subprocess/runner invocations
cosmos/operators/base.py Added log_format parameter to DbtBuildMixin to support JSON log output configuration
cosmos/hooks/subprocess.py Implemented _store_dbt_resource_status_from_log to parse JSON logs, changed subprocess to text mode, stores node status to XCom in real-time
cosmos/dbt/runner.py Added **kwargs parameter to support additional arguments
cosmos/_utils/common.py New utility module containing safe_xcom_push and get_xcom_val functions
cosmos/_triggers/watcher.py Updated _parse_node_status to handle new status key format for non-event mode
cosmos/init.py Version bump to 1.12.0a3
Comments suppressed due to low confidence (1)

cosmos/hooks/subprocess.py:80

  • The docstring for run_command should be updated to document the new **kwargs parameter. Since kwargs is now used to pass the context (which contains the TaskInstance for XCom operations), this should be documented. Add:
:param kwargs: Additional keyword arguments. Can include 'context' for XCom operations.
    def run_command(
        self,
        command: list[str],
        env: dict[str, str] | None = None,
        output_encoding: str = "utf-8",
        cwd: str | None = None,
        **kwargs: Any,
    ) -> FullOutputSubprocessResult:
        """
        Execute the command.

        If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards.
        If ``env`` is not supplied, ``os.environ`` is passed

        :param command: the command to run
        :param env: Optional dict containing environment variables to be made available to the shell
            environment in which ``command`` will be executed.  If omitted, ``os.environ`` will be used.
            Note, that in case you have Sentry configured, original variables from the environment
            will also be passed to the subprocess with ``SUBPROCESS_`` prefix.
        :param output_encoding: encoding to use for decoding stdout
        :param cwd: Working directory to run the command in.
            If None (default), the command is run in a temporary directory.
        :return: :class:`namedtuple` containing:
                                    ``exit_code``
                                    ``output``: the last line from stderr or stdout
                                    ``full_output``: all lines from stderr or stdout.
        """

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/hooks/subprocess.py Outdated
Comment thread cosmos/hooks/subprocess.py
Comment thread cosmos/hooks/subprocess.py
Comment thread cosmos/hooks/subprocess.py Outdated
Comment thread cosmos/_utils/common.py Outdated
Comment thread cosmos/operators/base.py
@pankajastro pankajastro marked this pull request as draft November 22, 2025 14:21
@pankajastro pankajastro changed the title Consume from dbt struct log for ExecutionMode.WATCHER and InvocationMode.SUBPROCESS Consume dbt struct log for ExecutionMode.WATCHER and InvocationMode.SUBPROCESS Nov 24, 2025
@codecov
Copy link
Copy Markdown

codecov Bot commented Nov 24, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.86%. Comparing base (4bd8e9e) to head (39d3609).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2152      +/-   ##
==========================================
+ Coverage   97.81%   97.86%   +0.04%     
==========================================
  Files          93       93              
  Lines        6005     6031      +26     
==========================================
+ Hits         5874     5902      +28     
+ Misses        131      129       -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pankajastro pankajastro marked this pull request as ready for review November 24, 2025 18:23
@tatiana tatiana changed the title Consume dbt struct log for ExecutionMode.WATCHER and InvocationMode.SUBPROCESS Support real-time consumer updates when using ExecutionMode.WATCHER and InvocationMode.SUBPROCESS Nov 25, 2025
Comment thread cosmos/_utils/common.py Outdated
Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very good, @pankajastro. Thanks a lot for the work.
I left minor feedback, and I'm happy for the changes to be merged.

Comment thread cosmos/_triggers/watcher.py
Comment thread cosmos/hooks/subprocess.py
Comment thread cosmos/hooks/subprocess.py Outdated
Comment thread cosmos/hooks/subprocess.py
@pankajastro pankajastro merged commit cb9080e into main Dec 3, 2025
84 checks passed
@pankajastro pankajastro deleted the ISSUE-2060 branch December 3, 2025 15:55
tatiana added a commit that referenced this pull request Dec 5, 2025
While the PR #2152 introduced the capability of users running the
`ExecutionMode.WATCHER` with dbt installed in a separate virtualenv, and
still get updates in "real time", it came with two downsides:

1. Spread the watcher logic into multiple areas of the Cosmos code base
(inside the `utils` folder, and also inside the `hooks/subprocess` -
which should be focused only on running suprocess);
2. It changed the subprocess hook to always invoke the method
`_store_dbt_resource_status_from_log(line, **kwargs)` for each log line
made available in stdout, even when users are not using
`ExecutionMode.WATCHER` .

This PR refactors the implementation to keep watcher implementation
details inside the `operators/` folder. It also allows operators that
subclass the `AbstractDbtLocalBase` to define the attribute
`_process_log_line_callable`. Only classes that define this function
will parse the log lines with the desired logic. At the moment, only the
`DbtProducerWatcherOperator` implements the logic of uploading the
status to XCom.
@pankajkoti pankajkoti mentioned this pull request Dec 9, 2025
pankajkoti added a commit that referenced this pull request Dec 18, 2025
Breaking changes

* Introduced in the PR #2080. The following functions are expected to be
used internally only to Cosmos, so we hope these won't impact end-users,
but we are documenting the changes just in case:
- ``generate_task_or_group`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` receives ``render_config`` instead of its
individual configurations, such as ``test_behavior``,
``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` now expects the ``node_converters`` argument
* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165

Features

* Support applying ``node_converter`` at a task level instead of task
group level by @anyapriya in #1759
* Allow overriding ``DbtProducerWatcherOperator`` parameters via
``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133
* Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by
@pankajastro in #2084
* Support real-time consumer updates when using
``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by
@pankajastro in #2152
* Update telemetry to v3 format with query parameters by @pankajkoti in
#2192
* Add initial set of telemetry task listener metrics for Cosmos
operators by @pankajkoti in #2195

Enhancements

* Unify Airflow version handling into ``constants.py`` by @tatiana in
#2089
* Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in
#2080
* Force watcher producer retries to zero by @pankajkoti in #2114
* Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the
producer fails using Airflow context by @pankajkoti in #2126
* ``ExecutonMode.WATCHER``: fetch producer status asynchronously from
the Airflow runtime so deferrable sensors fail immediately when the
producer task fails by @pankajkoti in #2144
* Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log
parser by @tatiana in #2183
* Replace map_index with is_mapped_task boolean in task telemetry
metrics by @pankajkoti in #2210
* Collect cosmos profile metrics in task telemetry metrics by
@pankajastro in #2198
* Remove unnecessary information from telemetry by @tatiana in #2211

Bug fixes

* Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by
@pankajkoti in #2124
* Remove empty test tasks when all tests are detached by @anyapriya in
#2010
* Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by
@michal-mrazek in #2127
* Add databricks oauth mock profile by @fjmacagno in #2164
* Register listeners in Airflow 3 plugin implementation by @pankajastro
in #2187
* Fix resolution of ``packages-install-path`` when it uses ``env_var``
by @tatiana in #2194
* Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include
``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis
in #2209
* Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro
in #2184
* Remove dag_run_id from telemetry tests by @tatiana in #2213

Docs

* Document dataset-event limitation when using
``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143
* Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana  in #2139
* Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro
in #2115
* Fix reStructuredText formatting by @dnskr in #2132
* Add docs for ``setup_operator_args`` param by @pankajastro in #2136
* Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by
@pankajastro in #2153
* Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti
in #2167
* Update PRIVACY_NOTICE.rst by @tatiana in #2212

Others

* Drop Python 3.9 support by @pankajastro in #2118
* Drop Airflow 2.4 support by @pankajastro in #2161
* Drop Airflow 2.5 support by @pankajastro in #2165
* Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140
* Enable tests for Python 3.13 by @pankajastro in #2154
* Add Python 3.12 to CI integration tests matrix by @pankajastro in
#2168
* Retry flaky Telemetry success test to stabilise CI by @pankajkoti in
#2138
* Drop unused producer state xcom handling in ``ExecutionMode.WATCHER``
by @pankajkoti in #2145
* Remove unused Python3.9 uses from Github action CI by @pankajastro in
#2117
* Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in
#2150
* Refactor: Use shared airflow version constant by @pankajkoti in #2157
* Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in
#2172
* Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170
* Add targeted ``type: ignore`` for untyped decorators to fix ``mypy``
errors by @pankajastro in #2174
* Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by
@pankajastro in #2175
* Refactor to reuse ``load_method_from_module`` from
``_utils/importer.py`` by @pankajastro in #2176
* Remove try except block for cache import and unused python_version
variable by @pankajastro in #2186
* Unpin Airflow to satisfy GitHub Security tab requirements by
@pankajastro in #2171
* Update Python version for ``pyupgrade`` in ``pre-commit`` config by
@pankajastro in #2190
* Add cooldown config in ``dependabot`` config by @pankajastro in #2189
* Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in
#2196
* Remove empty variables emission from telemetry metrics by @pankajkoti
in #2197
* Reformat documented comments for historical URL formats by @pankajkoti
in #2199
* Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot
in #2135
* Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by
@dependabot in #2147
* Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by
@dependabot in #2156
* Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot
in #2155
* Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot
in #2178
* Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by
@dependabot in #2208
* pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173,
#2191, #2202

closes:
astronomer/oss-integrations-private#275
tatiana pushed a commit that referenced this pull request Jan 27, 2026
This PR refactors dbt log processing in **WATCHER** mode to improve
readability at task logs. It reverts the JSON log output introduced in
#2152 back to plain text, while still preserving real-time consumer
updates when using `SUBPROCESS` invocation mode.

Key changes include:

* Refactoring `FullOutputSubprocessHook` to ensure log outputs are
processed by the `process_log_line` callable when provided.
* Introducing a new `_process_json_log_line` function, which
consolidates the previous `_store_dbt_resource_status_from_log` logic
and adds logging behavior:
  * Properly handles both JSON and non-JSON log lines.
* Extracts dbt logging info from structured JSON output, handling
dynamic log levels if needed.
* Extracts dbt node execution status to XCom, preserving the original
`_store_dbt_resource_status_from_log` behavior.
* Updating `DbtProducerWatcherOperator` to conditionally set
`log_format="json"` and the log line processing callable only when
running in `SUBPROCESS` invocation mode, avoiding unintended side
effects in other execution paths.


## SUBPROCESS
As I've observed it works fine and even outputs CLI colors at airflow
logs

<img width="1707" height="828" alt="image"
src="https://github.com/user-attachments/assets/a083f394-1708-4391-9454-800f9523920b"
/>

```python
# [START example_watcher_subprocess]
example_watcher_subprocess = DbtDag(
    # dbt/cosmos-specific parameters
    execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.SUBPROCESS),
    project_config=ProjectConfig(DBT_PROJECT_PATH),
    profile_config=profile_config,
    render_config=RenderConfig(exclude=["raw_payments"]),
    operator_args=operator_args,
    # normal dag parameters
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="example_watcher_subprocess",
    default_args={"retries": 0},
)
# [END example_watcher_subprocess]
```

## DBT_RUNNER
works fine as previous version.

<img width="1710" height="830" alt="image"
src="https://github.com/user-attachments/assets/f76129a2-f027-45f0-8ca2-8229350393e6"
/>

```python
# [START example_watcher]
example_watcher = DbtDag(
    # dbt/cosmos-specific parameters
    execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER),
    project_config=ProjectConfig(DBT_PROJECT_PATH),
    profile_config=profile_config,
    render_config=RenderConfig(exclude=["raw_payments"]),
    operator_args=operator_args,
    # normal dag parameters
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="example_watcher",
    default_args={"retries": 0},
)
# [END example_watcher]
```

## Related Issue(s)

Closes #2225

Co-authored-by: Emanuel Luis <emanuel.filho@grupomateus.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Consume from dbt struct log instead of callback in ExecutionMode.WATCHER

3 participants