Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create XCom API #46042

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Conversation

shubhamraj-git
Copy link
Contributor

@shubhamraj-git shubhamraj-git commented Jan 25, 2025

related: #45966

The XCom creation API fills a critical gap by enabling dynamic, external updates to workflows. This PR solves that problem.

Benefits:

  1. Enable Long-Running Workflow Updates:

Current XComs are tied to task execution, limiting flexibility for long-running workflows.
The API allows dynamic creation or updates of XCom values during execution, enabling workflows to adapt to external events or data without waiting for task completion.

  1. Improve Workflow Interactivity:

External systems or operators can inject real-time decisions or intermediate results directly into workflows.
Useful for human-in-the-loop workflows, event-driven processes, and external service integrations.

  1. Reduce Task Dependency Overhead:

Avoid placeholder tasks just to generate or update XComs.
Streamline DAGs by enabling direct updates without unnecessary task execution.


Steps to play around the feature.

  1. Add the following Dag.

This Airflow DAG demonstrates how to use XComs for passing data between tasks. The first task (wait_and_not_push) waits for 1 minute but does not push any XCom. The second task (pull_and_print) attempts to pull an XCom value with the key outbound_key1 from the first task, logs it if found, or warns if absent.

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
import time
import logging
import pendulum

# Default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'retries': 1
}

# Initialize the DAG
with DAG(
    dag_id="xcom_pull_example_dag",
    default_args=default_args,
    description="A DAG demonstrating XCom pull with key 'outbound_key1'",
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    catchup=False,
    tags=["xcom", "example"],
) as dag:

    def wait_and_not_push_xcom(**kwargs):
        """
        Task to wait for 1 minute.
        """
        logging.info("Waiting for 1 minute...")
        time.sleep(60)  # Wait for 1 minute

    def pull_and_print_xcom(**kwargs):
        """
        Task to pull an XCom entry with key 'outbound_key1' and print its value.
        """
        ti = kwargs["ti"]
        xcom_value = ti.xcom_pull(task_ids="wait_and_not_push", key="outbound_key1")

        if xcom_value:
            logging.info(f"Retrieved XCom value for key 'outbound_key1': {xcom_value}")
        else:
            logging.warning("No XCom value found for key 'outbound_key1'!")

    # First task: Wait for 1 minute
    wait_and_not_push = PythonOperator(
        task_id="wait_and_not_push",
        python_callable=wait_and_not_push_xcom
    )

    # Second task: Pull the XCom entry and print its value
    pull_and_print = PythonOperator(
        task_id="pull_and_print",
        python_callable=pull_and_print_xcom
    )

    # Define the task dependencies
    wait_and_not_push >> pull_and_print

  1. Trigger the run.
  2. As soon as the run starts, trigger an XCom creation through API (Can do it from swagger: http://localhost:29091/docs#/XCom/create_xcom_entry)
image

Now, check the logs, You can see the XCom pull was successful.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:UI Related to UI/UX. For Frontend Developers. label Jan 25, 2025
@shubhamraj-git shubhamraj-git changed the title [DRAFT] Create XCom API Create XCom API Jan 26, 2025
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 404 response code for the POST API seems unconventional.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

404 here seems to be ok, given dag_id does not exist

@jscheffl
Copy link
Contributor

Just adding comments as copy of the 1:1 conversation:
I doubt that pushing an XCom from an external system is a good way to signal data to an existing workflow. Technically it would be working but this opens the door to "hacky integrations" in my view. XCom (as far as I have seen it) was made for inter-task communication and not as an interface to external applications.
I don't know if there is a specific design pattern that need to be considered. Just my thought.

I would understand if the API can be used to "patch" wrong/incorrect XCom values administratively for specific szenarios where upstream tasks can not be re-executed but values are incorrect. But the PR of this API is only adding XCom, will raise an error if the value is existing.

So before merging I'd propose to have a bit of discussion between the other maintainers if we want to open XCom up to external applications to signal a processing status.

If so then this integration pattern would require a bit of documentation as well.

@eladkal
Copy link
Contributor

eladkal commented Jan 27, 2025

Just adding comments as copy of the 1:1 conversation:
I doubt that pushing an XCom from an external system is a good way to signal data to an existing workflow.

This is not the use case. The use case is incident recovery. When on call fixes problems with data pipeline manual interventions sometimes required.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:UI Related to UI/UX. For Frontend Developers.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants