Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding node_id to ExecutionPlanProperties #12186

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ameyc
Copy link
Contributor

@ameyc ameyc commented Aug 27, 2024

Which issue does this PR close?

Closes #11364

Rationale for this change

Currently ExecutionPlans dont have an identifier associated with them, making it hard to distinguish between the nodes for
usecases such as snapshotting continuous pipelines, displaying node metrics in a UI etc.

What changes are included in this PR?

Changes to -

  1. ExecutionPlanProperties to add node_id Option<usize>
  2. ExecutionPlan to add with_node_id() method to return a copy of the ExecutionPlan with assigned node id.
  3. Changes to SessionState to add node_id annotation to finalized physical plans.
  4. Utils in physical-plan/src/node_id.rs to traverse ExecutionPlans and generate deterministic ids for the whole tree.

Are these changes tested?

Added asserts to an existing test in datafusion-examples/src/planner_api.rs.

Are there any user-facing changes?

No

…notation to combined create_physical_plan API
@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Aug 27, 2024
@ozankabak
Copy link
Contributor

ozankabak commented Aug 27, 2024

We had similar challenges when using DataFusion in a context similar to yours (checkpointing etc.) I have consulted with my team on how we solved them, and discussed this general approach. I can share the following general thoughts/concerns:

  • In some use cases, one wants an ID for every node the plan tree (e.g. for display/UI purposes). In others, what is actually necessary is an ID per stream (any kind of stateful work).
  • Having a default None value for IDs is potentially problematic (apparently we ran into bugs caused by this after initially trying such an approach).
  • One may want different types for the ID in different use cases (usize may not be appropriate for all)

Therefore, we think the right approach is to traverse the tree (for plans or streams, depending on your use case), generate the IDs as you see fit, and store it downstream in a map container that associates nodes with your IDs. I don't think doing this upstream inside ExecutionPlanProperties is the right solution. This is also somewhat evident when you look at what is inside: Execution mode, ordering information, partitioning information, and equivalence information -- it is used as a cache to store derived properties of the plan object itself that emerge from the plan tree (not set externally)

Apologies for not being able to provide feedback and start the detailed discussion before you prepared a PR, but in my defense you were too fast :)

I think it would be great to add the traversal code and the map approach I mentioned as an example to upstream for future users who want to do something like this. This is an approach we are following for indexing work as well

@ameyc
Copy link
Contributor Author

ameyc commented Aug 30, 2024

I think it would be great to add the traversal code and the map approach I mentioned as an example to upstream for future users who want to do something like this. This is an approach we are following for indexing work as well

Thanks for the feedback @ozankabak . Would it make sense to add this traversal code in the utils? I would be nice for this to be available in the core library itself.

@ozankabak
Copy link
Contributor

Let's create a new example file, add the traversal logic and a simple of demonstration of how to store the node <-> id association via a map in that file. I think it will be a fairly succinct yet guiding example for many others.

You may run into a difficulty creating or inserting into a map with Arc<dyn ExecutionPlan> as a key. A good way to get around it is to put the plan inside a wrapper struct and using that as a key.

We wanted to make Arc<dyn ExecutionPlan> directly useable as keys for a while, but couldn't get around to doing it. It would be a great contribution if you'd like to work on it. It will make use cases like this smoother to code, and also help make some planning algorithms faster.

@ameyc
Copy link
Contributor Author

ameyc commented Sep 11, 2024

Taking a second look at implementing this the using a global hashmap with pointers to Arc and solution is somewhat more hacky and error prone and as developers trying to build on top of DataFusion makes our experience feel pretty brittle.

While we are interested in using the node_id for stateful checkpointing, it does make sense for this to be on an ExecutionPlan since it is a node on PhysicalPlan graph.

In some use cases, one wants an ID for every node the plan tree (e.g. for display/UI purposes). In others, what is actually necessary is an ID per stream (any kind of stateful work).

Streams can easily, derive their id "{node_id}_{stream_id}"

Having a default None value for IDs is potentially problematic (apparently we ran into bugs caused by this after initially trying such an approach).

Presumably for any user interested in using the "node_id", they'd make their operators implement "with_node_id".

One may want different types for the ID in different use cases (usize may not be appropriate for all).

For additional types of node_ids, I suppose one could maintain them in a HashMap<usize, T> where usize is the node_id and not a ptr to Arc

@ozankabak
Copy link
Contributor

ozankabak commented Sep 11, 2024

Taking a second look at implementing this the using a global hashmap with pointers to Arc and solution is somewhat more hacky and error prone and as developers trying to build on top of DataFusion makes our experience feel pretty brittle.

Can you share a link to your implementation attempt? This is a little surprising, I'd like to go over it and understand what is going on.

@ameyc
Copy link
Contributor Author

ameyc commented Sep 12, 2024

@ozankabak would love any pointers. this code is admittedly a quick draft. so we pass in a hashmap and recursivesly traverse the tree.

pub struct NodeIdAnnotator {
    next_id: usize,
}

impl NodeIdAnnotator {
    pub fn new() -> Self {
        NodeIdAnnotator { next_id: 0 }
    }

    pub fn next_node_id(&mut self) -> usize {
        let node_id = self.next_id;
        self.next_id += 1;
        node_id
    }
}

pub fn annotate_node_id_for_execution_plan(
    plan: &Arc<dyn ExecutionPlan>,
    annotator: &mut NodeIdAnnotator,
    plan_map: &mut HashMap<usize, usize>,
) {
    for child in plan.children() {
        annotate_node_id_for_execution_plan(child, annotator, plan_map);
    }
    let node_id = annotator.next_node_id();
    let addr = Arc::as_ptr(plan) as *const () as usize;
    plan_map.insert(addr, node_id);
}

then when executing the plan, we need to actually annotate this and set a global hash map singleton.

    pub async fn print_stream(self) -> Result<()> {
        let plan = self.df.as_ref().clone().create_physical_plan().await?;
        let mut node_id_map: HashMap<usize, usize> = HashMap::new();
        let mut annotator = NodeIdAnnotator::new();
        annotate_node_id_for_execution_plan(&plan, &mut annotator, &mut node_id_map);
        for (key, value) in node_id_map.iter() {
            debug!("Node {}, Id {}", key, value);
        }
        let task_ctx = self.df.as_ref().clone().task_ctx();
        set_global_node_id_hash_map(&node_id_map);
        let mut stream: SendableRecordBatchStream = execute_stream(plan, Arc::new(task_ctx))?;

now when creating a Stream, we do need to pass a reference to the ExecutionPlan it is tied to in order for it to figure out the channel_tag/checkpoint_tag it is supposed to use to coordinate checkpoints.

Btw the annotation outputted is something like -

[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783968320, Id 0
[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783975296, Id 1
[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783964912, Id 2
[2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783965296, Id 3

In contrast this PR annotates the node_ids during create_physical_plan phase and then you have the node_id at Stream creation time without needing a global lookup. The corresponding print_physical_plan() now gives you the node id as well.

FilterExec: max@3 > 113, node_id=3
  StreamingWindowExec: mode=Single, gby=[...], aggr=[..], window_type=[..], node_id=2
    RepartitionExec: partitioning=Hash([sensor_name@2], 14), input_partitions=1, node_id=1
      DenormalizedStreamingTableExec: partition_sizes=1, projection=[..], infinite_source=true, node_id=0

@emgeee
Copy link
Contributor

emgeee commented Sep 13, 2024

Just chiming in -- the implementation in this PR seems quite reasonable to me. While there are definitely ways to hack around the limitation of not having Node IDs, those strategies would be quite vulnerable to upstream breaking changes and given datafusion's goal of being extensible it makes makes sense for these to be a core a feature of the library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Deterministic IDs for ExecutionPlan
3 participants