Skip to content
51 changes: 8 additions & 43 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
*ss << ']';
}

class ScalarAggregateNode : public ExecNode {
class ScalarAggregateNode : public ExecNode, public TracedNode<ScalarAggregateNode> {
public:
ScalarAggregateNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
Expand Down Expand Up @@ -139,11 +139,6 @@ class ScalarAggregateNode : public ExecNode {
const char* kind_name() const override { return "ScalarAggregateNode"; }

Status DoConsume(const ExecSpan& batch, size_t thread_index) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
{{"aggregate", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
for (size_t i = 0; i < kernels_.size(); ++i) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, aggs_[i].function,
Expand All @@ -161,12 +156,7 @@ class ScalarAggregateNode : public ExecNode {
}

void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"aggregate", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
auto scope = TraceInputReceived(batch);
DCHECK_EQ(input, inputs_[0]);

auto thread_index = plan_->query_context()->GetThreadIndex();
Expand All @@ -179,24 +169,19 @@ class ScalarAggregateNode : public ExecNode {
}

void ErrorReceived(ExecNode* input, Status error) override {
EVENT(span_, "ErrorReceived", {{"error", error.message()}});
DCHECK_EQ(input, inputs_[0]);
outputs_[0]->ErrorReceived(this, std::move(error));
}

void InputFinished(ExecNode* input, int total_batches) override {
EVENT(span_, "InputFinished", {{"batches.length", total_batches}});
DCHECK_EQ(input, inputs_[0]);
if (input_counter_.SetTotal(total_batches)) {
ErrorIfNotOk(Finish());
}
}

Status StartProducing() override {
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
NoteStartProducing(ToStringExtra());
// Scalar aggregates will only output a single batch
outputs_[0]->InputFinished(this, 1);
return Status::OK();
Expand All @@ -216,7 +201,6 @@ class ScalarAggregateNode : public ExecNode {
}

void StopProducing() override {
EVENT(span_, "StopProducing");
if (input_counter_.Cancel()) {
finished_.MarkFinished();
}
Expand All @@ -233,9 +217,7 @@ class ScalarAggregateNode : public ExecNode {

private:
Status Finish() {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Finish",
{{"aggregate", ToStringExtra()}, {"node.label", label()}});
auto scope = TraceFinish();
ExecBatch batch{{}, 1};
batch.values.resize(kernels_.size());

Expand Down Expand Up @@ -266,7 +248,7 @@ class ScalarAggregateNode : public ExecNode {
AtomicCounter input_counter_;
};

class GroupByNode : public ExecNode {
class GroupByNode : public ExecNode, public TracedNode<GroupByNode> {
public:
GroupByNode(ExecNode* input, std::shared_ptr<Schema> output_schema,
std::vector<int> key_field_ids, std::vector<int> agg_src_field_ids,
Expand Down Expand Up @@ -361,11 +343,6 @@ class GroupByNode : public ExecNode {
const char* kind_name() const override { return "GroupByNode"; }

Status Consume(ExecSpan batch) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
size_t thread_index = plan_->query_context()->GetThreadIndex();
if (thread_index >= local_states_.size()) {
return Status::IndexError("thread index ", thread_index, " is out of range [0, ",
Expand Down Expand Up @@ -514,6 +491,7 @@ class GroupByNode : public ExecNode {
}

void OutputResult() {
auto scope = TraceFinish();
// If something goes wrong outputting the result we need to make sure
// we still mark finished.
Status st = DoOutputResult();
Expand All @@ -523,12 +501,7 @@ class GroupByNode : public ExecNode {
}

void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
util::tracing::Span span;
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"group_by", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
auto scope = TraceInputReceived(batch);

// bail if StopProducing was called
if (finished_.is_finished()) return;
Expand All @@ -543,16 +516,12 @@ class GroupByNode : public ExecNode {
}

void ErrorReceived(ExecNode* input, Status error) override {
EVENT(span_, "ErrorReceived", {{"error", error.message()}});

DCHECK_EQ(input, inputs_[0]);

outputs_[0]->ErrorReceived(this, std::move(error));
}

void InputFinished(ExecNode* input, int total_batches) override {
EVENT(span_, "InputFinished", {{"batches.length", total_batches}});

// bail if StopProducing was called
if (finished_.is_finished()) return;

Expand All @@ -564,10 +533,7 @@ class GroupByNode : public ExecNode {
}

Status StartProducing() override {
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
NoteStartProducing(ToStringExtra());
local_states_.resize(plan_->query_context()->max_concurrency());
return Status::OK();
}
Expand All @@ -583,7 +549,6 @@ class GroupByNode : public ExecNode {
}

void StopProducing(ExecNode* output) override {
EVENT(span_, "StopProducing");
DCHECK_EQ(output, outputs_[0]);

if (input_counter_.Cancel()) {
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ struct ExecPlanImpl : public ExecPlan {
// If no source node schedules any tasks (e.g. they do all their word synchronously as
// part of StartProducing) then the plan may be finished before we return from this
// call.
auto scope = START_SCOPED_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
Future<> scheduler_finished = util::AsyncTaskScheduler::Make(
[this](util::AsyncTaskScheduler* async_scheduler) {
QueryContext* ctx = query_context();
RETURN_NOT_OK(ctx->Init(ctx->max_concurrency(), async_scheduler));

START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
#ifdef ARROW_WITH_OPENTELEMETRY
if (HasMetadata()) {
auto pairs = metadata().get()->sorted_pairs();
Expand All @@ -136,7 +136,11 @@ struct ExecPlanImpl : public ExecPlan {
RETURN_NOT_OK(n->Init());
}
for (auto& n : nodes_) {
async_scheduler->AddSimpleTask([&] { return n->finished(); });
std::string qualified_label = std::string(n->kind_name()) + ":" + n->label();
std::string wait_for_finish =
"ExecPlan::WaitForFinish(" + qualified_label + ")";
async_scheduler->AddSimpleTask([&] { return n->finished(); },
std::move(wait_for_finish));
}

ctx->scheduler()->RegisterEnd();
Expand All @@ -149,7 +153,9 @@ struct ExecPlanImpl : public ExecPlan {
RETURN_NOT_OK(ctx->scheduler()->StartScheduling(
0 /* thread_index */,
[ctx](std::function<Status(size_t)> fn) -> Status {
return ctx->ScheduleTask(std::move(fn));
// TODO(weston) add names to synchronous scheduler so we can use something
// better than sync-scheduler-task here
return ctx->ScheduleTask(std::move(fn), "sync-scheduler-task");
},
/*concurrent_tasks=*/2 * num_threads, sync_execution));

Expand All @@ -163,10 +169,7 @@ struct ExecPlanImpl : public ExecPlan {
++it) {
auto node = *it;

EVENT(span_, "StartProducing:" + node->label(),
{{"node.label", node->label()}, {"node.kind_name", node->kind_name()}});
st = node->StartProducing();
EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}});
if (!st.ok()) {
// Stop nodes that successfully started, in reverse order
bool expected = false;
Expand Down Expand Up @@ -197,7 +200,7 @@ struct ExecPlanImpl : public ExecPlan {

void StopProducing() {
DCHECK(started_) << "stopped an ExecPlan which never started";
EVENT(span_, "StopProducing");
EVENT(span_, "ExecPlan::StopProducing");
bool expected = false;
if (stopped_.compare_exchange_strong(expected, true)) {
query_context()->scheduler()->Abort(
Expand All @@ -209,8 +212,6 @@ struct ExecPlanImpl : public ExecPlan {
void StopProducingImpl(It begin, It end) {
for (auto it = begin; it != end; ++it) {
auto node = *it;
EVENT(span_, "StopProducing:" + node->label(),
{{"node.label", node->label()}, {"node.kind_name", node->kind_name()}});
node->StopProducing();
}
}
Expand Down Expand Up @@ -459,7 +460,7 @@ std::string ExecNode::ToString(int indent) const {
return ss.str();
}

std::string ExecNode::ToStringExtra(int indent = 0) const { return ""; }
std::string ExecNode::ToStringExtra(int indent) const { return ""; }

bool ExecNode::ErrorIfNotOk(Status status) {
if (status.ok()) return false;
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class ARROW_EXPORT ExecNode {
bool ErrorIfNotOk(Status status);

/// Provide extra info to include in the string representation.
virtual std::string ToStringExtra(int indent) const;
virtual std::string ToStringExtra(int indent = 0) const;

ExecPlan* plan_;
std::string label_;
Expand All @@ -304,8 +304,6 @@ class ARROW_EXPORT ExecNode {

// Future to sync finished
Future<> finished_ = Future<>::Make();

util::tracing::Span span_;
};

/// \brief An extensible registry for factories of ExecNodes
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/arrow/compute/exec/filter_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,9 @@ class FilterNode : public MapNode {
}

void InputReceived(ExecNode* input, ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a TraceInputReceived or NoteInputReceived?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets it from MapNode. This will be more obvious in #15253 because subclasses will no longer implement InputReceived (they will use MapNode::InputReceived and instead just implement ProcessBatch)

DCHECK_EQ(input, inputs_[0]);
auto func = [this](ExecBatch batch) {
util::tracing::Span span;
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"filter", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
auto result = DoFilter(std::move(batch));
MARK_SPAN(span, result.status());
END_SPAN(span);
return result;
};
this->SubmitTask(std::move(func), std::move(batch));
Expand Down
18 changes: 3 additions & 15 deletions cpp/src/arrow/compute/exec/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ bool HashJoinSchema::HasLargeBinary() const {
return false;
}

class HashJoinNode : public ExecNode {
class HashJoinNode : public ExecNode, public TracedNode<HashJoinNode> {
public:
HashJoinNode(ExecPlan* plan, NodeVector inputs, const HashJoinNodeOptions& join_options,
std::shared_ptr<Schema> output_schema,
Expand Down Expand Up @@ -873,6 +873,7 @@ class HashJoinNode : public ExecNode {
}

void InputReceived(ExecNode* input, ExecBatch batch) override {
auto scope = TraceInputReceived(batch);
ARROW_DCHECK(std::find(inputs_.begin(), inputs_.end(), input) != inputs_.end());
if (complete_.load()) {
return;
Expand All @@ -881,11 +882,6 @@ class HashJoinNode : public ExecNode {
size_t thread_index = plan_->query_context()->GetThreadIndex();
int side = (input == inputs_[0]) ? 0 : 1;

EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}});
util::tracing::Span span;
START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"batch.length", batch.length}});

Status status = side == 0 ? OnProbeSideBatch(thread_index, std::move(batch))
: OnBuildSideBatch(thread_index, std::move(batch));

Expand All @@ -908,7 +904,6 @@ class HashJoinNode : public ExecNode {
}

void ErrorReceived(ExecNode* input, Status error) override {
EVENT(span_, "ErrorReceived", {{"error", error.message()}});
DCHECK_EQ(input, inputs_[0]);
StopProducing();
outputs_[0]->ErrorReceived(this, std::move(error));
Expand All @@ -919,8 +914,6 @@ class HashJoinNode : public ExecNode {
size_t thread_index = plan_->query_context()->GetThreadIndex();
int side = (input == inputs_[0]) ? 0 : 1;

EVENT(span_, "InputFinished", {{"side", side}, {"batches.length", total_batches}});

if (batch_count_[side].SetTotal(total_batches)) {
Status status = side == 0 ? OnProbeSideFinished(thread_index)
: OnBuildSideFinished(thread_index);
Expand Down Expand Up @@ -987,11 +980,7 @@ class HashJoinNode : public ExecNode {
}

Status StartProducing() override {
START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
END_SPAN_ON_FUTURE_COMPLETION(span_, finished_);
NoteStartProducing(ToStringExtra());
RETURN_NOT_OK(
pushdown_context_.StartProducing(plan_->query_context()->GetThreadIndex()));
return Status::OK();
Expand All @@ -1013,7 +1002,6 @@ class HashJoinNode : public ExecNode {
}

void StopProducing() override {
EVENT(span_, "StopProducing");
bool expected = false;
if (complete_.compare_exchange_strong(expected, true)) {
impl_->Abort([this]() { finished_.MarkFinished(); });
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/arrow/compute/exec/map_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,19 @@ MapNode::MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,

void MapNode::ErrorReceived(ExecNode* input, Status error) {
DCHECK_EQ(input, inputs_[0]);
EVENT(span_, "ErrorReceived", {{"error.message", error.message()}});
outputs_[0]->ErrorReceived(this, std::move(error));
}

void MapNode::InputFinished(ExecNode* input, int total_batches) {
DCHECK_EQ(input, inputs_[0]);
EVENT(span_, "InputFinished", {{"batches.length", total_batches}});
outputs_[0]->InputFinished(this, total_batches);
if (input_counter_.SetTotal(total_batches)) {
this->Finish();
}
}

Status MapNode::StartProducing() {
START_COMPUTE_SPAN(
span_, std::string(kind_name()) + ":" + label(),
{{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}});
NoteStartProducing(ToStringExtra());
return Status::OK();
}

Expand All @@ -75,7 +71,6 @@ void MapNode::StopProducing(ExecNode* output) {
}

void MapNode::StopProducing() {
EVENT(span_, "StopProducing");
if (input_counter_.Cancel()) {
this->Finish();
}
Expand All @@ -84,6 +79,7 @@ void MapNode::StopProducing() {

void MapNode::SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
ExecBatch batch) {
auto scope = TraceInputReceived(batch);
Status status;
// This will be true if the node is stopped early due to an error or manual
// cancellation
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/map_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
namespace arrow {
namespace compute {

class ARROW_EXPORT MapNode : public ExecNode {
class ARROW_EXPORT MapNode : public ExecNode, public TracedNode<MapNode> {
public:
MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema);
Expand Down
Loading