Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Jan 18, 2023

This PR does two things. First, it requires that all "tasks" (for the AsyncTaskScheduler, not the executor) have a name. Second, it simplifies and cleans up the way that exec nodes report their tracing using a TracedNode helper class.

@github-actions
Copy link

@westonpace
Copy link
Member Author

Example Plan Trace:

image

Note: I can already see something I find rather interesting in the above trace. The final pipeline calls ScalarAggregateNode::Finish which finalizes the aggregates and calls SinkNode::InputReceived. Admittedly, finalizing aggregates is not much work, however, all InputReceived should be doing is pushing the batch onto a queue. I'm surprised how much longer it takes to run InputReceived.

@westonpace westonpace requested a review from lidavidm January 18, 2023 07:41
@westonpace
Copy link
Member Author

CC @mbrobbel please review if you have a chance

@westonpace
Copy link
Member Author

Looks like I still have a CI issue. Converting to draft while I work that out.

@westonpace westonpace marked this pull request as draft January 18, 2023 07:59
Copy link
Contributor

@joosthooz joosthooz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this, I still have to try this branch for myself but I left some questions!

span, span_, "InputReceived",
{{"node.label", label()}, {"batch.length", batch.length}});

auto scope = TraceInputReceived(batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the sink applies backpressure (e.g. the datasetwriter), it does not result in an event being created on the span as is the case for the normal SinkNode here https://github.com/apache/arrow/pull/33738/files#diff-967cff6ef1964402635ac0769dece741ca0ed58bcefb8d669ecfe3ed8371998eR175
Is there a way to do this? For example, we could compare the value of backpressure_counter_ before and after calling Consume(), but then we wouldn't be able to know if backpressure was applied or released.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backpressure is applied in the dataset writer (it has it's own queue). However, we can add tracing events in the dataset writer. I'm kind of interested now to see what a dataset writer trace looks like. I will add something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that makes a lot of sense especially because the dataset writer submits tasks to the IO executor that can run in parallel. All that would otherwise be lost behind a single ConsumingSinkNode span

}

void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a TraceInputReceived or NoteInputReceived?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets it from MapNode. This will be more obvious in #15253 because subclasses will no longer implement InputReceived (they will use MapNode::InputReceived and instead just implement ProcessBatch)

}

void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a TraceInputReceived?

}

void InputReceived(compute::ExecNode* input, compute::ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a TraceInputReceived?

MapNode::Finish(std::move(finish_st));
return;
}
auto scope = TraceFinish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do a TraceFinish here and not inside DatasetWriter::Finish? That's the only thing inside this trace and it seems weird to only trace it if it is inside a TeeNode (e.g. it is missing here https://github.com/apache/arrow/pull/33738/files#diff-2caf4e9bd3f139e05e55dca80725d8a9c436f5ccf65c76a37cebfa6ee9b36a6aL418)

@joosthooz
Copy link
Contributor

In the figure, why does WaitForFinish(SinkNode:) end earlier than the ScalarAggregate? Can we add a name (and maybe even an id number in case there are multiple) to the names so that we know which node a span refers to? Lastly, there is a SinkNode but it doesn't seem to perform any work

@westonpace
Copy link
Member Author

In the figure, why does WaitForFinish(SinkNode:) end earlier than the ScalarAggregate?

The code looks roughly like:

void SinkNode::ReceiveLastBatch(batch) {
  output_queue.Enqueue(batch); // 4
  finished.MarkFinished(); // 5
}

void AggregateNode::ReceiveLastBatch(batch) {
    Enqueue(batch); // 2
    aggregates = ComputeAggregates(); // 3
    output->ReceiveLastBatch(batch);
    finished_.MarkFinished(); // 6
}

void SourceNode::ReceiveLastBatch(batch) {
  output->ReceiveLastBatch(batch); // 1
  finished_.MarkFinished(); // 7
}

Can we add a name (and maybe even an id number in case there are multiple) to the names so that we know which node a span refers to?

All of the node-specific spans and events should have the node label as an attribute. I don't think it's displayed here. The node label defaults (I think) to NodeType:NodeCounter but I'll check

Lastly, there is a SinkNode but it doesn't seem to perform any work

There are two kinds of sinks. The SinkNode has an external queue. All it does is push batches into the queue. So no, it should not be doing any work. The ConsumingSinkNode assumes the batch is consumed as part of the plan (e.g. dataset write) and has no output but it does do work.

@westonpace
Copy link
Member Author

Here's a trace from a dataset write. There are still things that could be cleaned up here. Pretty much all of the DatasetWriter:: traces are a mix of active CPU time and idle I/O time and it isn't clear what is what.

Screenshot 2023-01-18 at 17-51-42 Jaeger UI

@joosthooz
Copy link
Contributor

Nice, I think the WriteAndCheckBackpressure span is important because that's where the backpressure is checked and also it performs some work combining staged batches (in PopStagedBatch). Maybe that even deserves its own span because it is not always called (only if enough rows have arrived in the Push function).
Shouldn't there also be a span created in the delta that gets submitted to the IO executor in WriteNext? That's where the actual writing (and parquet encoding & compression) is being performed

@westonpace
Copy link
Member Author

@joosthooz I slightly changed things so the current task will be used as parent and not the scheduler. This makes it more clear that WriteAndCheckBackpressure is actually creating some of those following spans.

However, at this point, I think we are veering from my original goal which was "remove the dependence on the exec node finished future so I can get away with it but don't break OT worse than it already was".

I think I'd like to merge this in as it is. Would you be interested in investigating better ways of handling spans in a future PR?

Shouldn't there also be a span created in the delta that gets submitted to the IO executor in WriteNext? That's where the actual writing (and parquet encoding & compression) is being performed

I think I/O of any kind is generally interesting enough to always justify a span.

@westonpace westonpace marked this pull request as ready for review January 20, 2023 05:39
@joosthooz
Copy link
Contributor

joosthooz commented Jan 23, 2023

Hi, I gave this branch a spin (this code reads in a partitioned csv dataset and writes it to a partitioned parquet dataset), and it seems that the nesting has become inconsistent:
image
There's 2 ReadBatch spans under InitialTask. 1 of these has all the FragmentsToBatches as its child spans (these were nested under the SourceNode before). The other keeps recursively nesting more ReadBatch spans. Each has a ProcessMorsel, that has the filter, project and sink spans nested under each other. Then the dataset writer also keeps nesting WriteAndCheckBackpressure.
image
Is there a way to go back to making most of these spans siblings again?
Do we want to change the organization of the spans in this PR from having 1 span for each node in the graph, each having a span for every chunk of data it processes (how it was before), to having a ProcessMorsel for each chunk of data, each having a span for each node it traverses through?
I think I can help in a follow-up PR, especially for the dataset writer.

@westonpace
Copy link
Member Author

Is there a way to go back to making most of these spans siblings again?

Yes. The recursion is probably somewhat accurate but I agree it makes it harder to read. Having them as siblings makes sense too. I will revert back to this understanding.

Do we want to change the organization of the spans in this PR from having 1 span for each node in the graph, each having a span for every chunk of data it processes (how it was before), to having a ProcessMorsel for each chunk of data, each having a span for each node it traverses through?

Yes, I believe so. A more generic term than ProcessMorsel would be "pipeline" or "plan fragment". Acero is (implicitly) a "plan fragment" engine in that we have one task per batch per fragment. Thinking about it this way it would be nice if we had something like the conceptual model:

  • ProcessMorsel
    • Filter
    • Project
    • Sink

But today it ends up being (because the last part of each node is to call the downstream node):

  • ProcessMorsel
    • Filter
      • Project
        • Sink

Yet another case where the conceptual/logical understanding is different than the physical understanding. Although, perhaps there is some merit for the physical understanding as it mirrors reality more closely. Perhaps it depends on the goal of the reader. If someone is trying to improve the threading and execution of the plan itself they might want the physical model. If someone is trying to focus on the performance of a single node they might want the logical model. I'll leave that for a follow-up PR.

I think I can help in a follow-up PR, especially for the dataset writer.

Great. Mentally, when I think of the dataset writer, I think there are two parts. The first part should be the trailing part of the fragment/pipeline that feeds the writer. In this first part we partition the batch, select the appropriate file queues, and deposit the batches into the queues. There is then a separate dedicated thread task to write each batch to the writer.

@westonpace
Copy link
Member Author

I'm going to merge this as-is to unblock the error handling cleanup. We can fine tune in future PRs.

@westonpace westonpace merged commit 589b5b2 into apache:master Jan 23, 2023
@lidavidm
Copy link
Member

I was about to start reviewing it, sorry for the delay 😅 In any case, Joost already took a look here fortunately.

@westonpace
Copy link
Member Author

I was about to start reviewing it, sorry for the delay sweat_smile In any case, Joost already took a look here fortunately.

No problem. I might be moving a little fast but I think the tracing stuff is still pretty experimental at the moment. One thing I forgot to note is that I have upgraded OT from 1.4 to 1.8. I found that 1.4 could not connect directly to a connector for some reason. 1.8 seems to work out of the box. Also, I noticed that 1.8 now has a jaeger exporter which could potentially avoid the need to have a collector at all. I tried to enable it but quickly ran into trouble with the bundled build which seems to be pretty custom.

@lidavidm
Copy link
Member

The upgrade sounds fine. I think OT itself is also moving fast so that might explain the incompatibility.

As mentioned in the original OT PRs, there's a tension between whether Arrow counts as a library or an application to OT. Really we shouldn't be setting up any exporters at all, letting the application control it all, but that is inconvenient/impossible for Python users at the moment...

@ursabot
Copy link

ursabot commented Jan 23, 2023

Benchmark runs are scheduled for baseline = b9d1162 and contender = 589b5b2. 589b5b2 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Failed] ec2-t3-xlarge-us-east-2
[Failed] test-mac-arm
[Failed] ursa-i9-9960x
[Failed] ursa-thinkcentre-m75q
Buildkite builds:
[Failed] 589b5b2b ec2-t3-xlarge-us-east-2
[Failed] 589b5b2b test-mac-arm
[Failed] 589b5b2b ursa-i9-9960x
[Failed] 589b5b2b ursa-thinkcentre-m75q
[Failed] b9d11627 ec2-t3-xlarge-us-east-2
[Failed] b9d11627 test-mac-arm
[Failed] b9d11627 ursa-i9-9960x
[Failed] b9d11627 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++] Simplify tracing in exec plan

4 participants