Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ class GroupByNode : public ExecNode {
outputs_[0]->InputReceived(this, out_data_.Slice(batch_size * n, batch_size));
}

Status OutputResult() {
Status DoOutputResult() {
// To simplify merging, ensure that the first grouper is nonempty
for (size_t i = 0; i < local_states_.size(); i++) {
if (local_states_[i].grouper) {
Expand All @@ -500,11 +500,28 @@ class GroupByNode : public ExecNode {

int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));
RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
num_output_batches));
Status st =
plan_->query_context()->StartTaskGroup(output_task_group_id_, num_output_batches);
if (st.IsCancelled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this if-else seems redundant, since OutputResult will MarkFinished even if we don't do this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is translating a failed status into a successful status (the test expects a plan to succeed after calling StopProducing).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else condition propagates the failed status.

// This means the user has cancelled/aborted the plan. We will not send any batches
// and end immediately.
finished_.MarkFinished();
return Status::OK();
} else {
return st;
}
return Status::OK();
}

void OutputResult() {
// If something goes wrong outputting the result we need to make sure
// we still mark finished.
Status st = DoOutputResult();
if (!st.ok()) {
finished_.MarkFinished(st);
}
}

void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
Expand All @@ -521,7 +538,7 @@ class GroupByNode : public ExecNode {
if (ErrorIfNotOk(Consume(ExecSpan(batch)))) return;

if (input_counter_.Increment()) {
ErrorIfNotOk(OutputResult());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't want to propagate the error back anymore? (I suppose that's all going away anyways?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at the moment, the only thing that handles this error is the sink node and all it will do is trigger an abort which will end up coming back here and marking this finished anyways.

That used to be needed because it was the only way to trigger an abort on a failure.

Now, with the async scheduler, any failed task will trigger an abort. Furthemore, since finished_ is a "task" it will trigger the abort in this path.

OutputResult();
}
}

Expand All @@ -542,7 +559,7 @@ class GroupByNode : public ExecNode {
DCHECK_EQ(input, inputs_[0]);

if (input_counter_.SetTotal(total_batches)) {
ErrorIfNotOk(OutputResult());
OutputResult();
}
}

Expand All @@ -551,7 +568,6 @@ class GroupByNode : public ExecNode {
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});

local_states_.resize(plan_->query_context()->max_concurrency());
return Status::OK();
}
Expand All @@ -570,7 +586,9 @@ class GroupByNode : public ExecNode {
EVENT(span_, "StopProducing");
DCHECK_EQ(output, outputs_[0]);

if (input_counter_.Cancel()) finished_.MarkFinished();
if (input_counter_.Cancel()) {
finished_.MarkFinished();
}
inputs_[0]->StopProducing(this);
}

Expand Down