Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 90 additions & 7 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,12 +604,13 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
return DeclarationToBatchesAsync(std::move(declaration), exec_context).result();
}

Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(Declaration declaration,
ExecContext* exec_context) {
Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context) {
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make());
Declaration with_sink =
Declaration::Sequence({declaration, {"sink", SinkNodeOptions(&sink_gen)}});
Declaration with_sink = Declaration::Sequence(
{declaration, {"sink", SinkNodeOptions(&sink_gen, out_schema)}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get()));
ARROW_RETURN_NOT_OK(exec_plan->StartProducing(exec_context->executor()));
auto collected_fut = CollectAsyncGenerator(sink_gen);
Expand All @@ -622,9 +623,91 @@ Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(Declaration declara
});
}

Result<std::vector<ExecBatch>> DeclarationToExecBatches(Declaration declaration,
ExecContext* exec_context) {
return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result();
Result<std::vector<ExecBatch>> DeclarationToExecBatches(
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context) {
return DeclarationToExecBatchesAsync(std::move(declaration), out_schema, exec_context)
.result();
}

namespace {
struct BatchConverter {
Future<std::shared_ptr<RecordBatch>> operator()() {
return exec_batch_gen().Then([this](const std::optional<ExecBatch>& batch)
-> Result<std::shared_ptr<RecordBatch>> {
if (batch) {
return batch->ToRecordBatch(schema);
} else {
return nullptr;
}
});
}

AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
std::shared_ptr<Schema> schema;
};

Result<BatchConverter> DeclarationToRecordBatchGenerator(
Declaration declaration, ::arrow::internal::Executor* executor,
std::shared_ptr<ExecPlan>* out_plan) {
BatchConverter converter;
ARROW_ASSIGN_OR_RAISE(*out_plan, ExecPlan::Make());
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter.exec_batch_gen, &converter.schema)}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(out_plan->get()));
ARROW_RETURN_NOT_OK((*out_plan)->StartProducing(executor));
return converter;
}
} // namespace

Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
bool use_threads) {
std::shared_ptr<ExecPlan> plan;
std::shared_ptr<Schema> schema;
Iterator<std::shared_ptr<RecordBatch>> batch_itr =
::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
[&](::arrow::internal::Executor* executor)
-> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
ARROW_ASSIGN_OR_RAISE(
BatchConverter batch_converter,
DeclarationToRecordBatchGenerator(declaration, executor, &plan));
schema = batch_converter.schema;
return batch_converter;
},
use_threads);

struct PlanReader : RecordBatchReader {
PlanReader(std::shared_ptr<ExecPlan> plan, std::shared_ptr<Schema> schema,
Iterator<std::shared_ptr<RecordBatch>> iterator)
: plan_(std::move(plan)),
schema_(std::move(schema)),
iterator_(std::move(iterator)) {}
~PlanReader() { plan_->finished().Wait(); }

std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
return iterator_.Next().Value(record_batch);
}

Status Close() override {
// End plan and read from generator until finished
plan_->StopProducing();
std::shared_ptr<RecordBatch> batch;
do {
ARROW_RETURN_NOT_OK(ReadNext(&batch));
} while (batch != nullptr);
return Status::OK();
}

std::shared_ptr<ExecPlan> plan_;
std::shared_ptr<Schema> schema_;
Iterator<std::shared_ptr<RecordBatch>> iterator_;
};

return std::make_unique<PlanReader>(std::move(plan), std::move(schema),
std::move(batch_itr));
}

namespace internal {
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,13 @@ ARROW_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
///
/// \see DeclarationToTable for details
ARROW_EXPORT Result<std::vector<ExecBatch>> DeclarationToExecBatches(
Declaration declaration, ExecContext* exec_context = default_exec_context());
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context = default_exec_context());

/// \brief Asynchronous version of \see DeclarationToExecBatches
ARROW_EXPORT Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context = default_exec_context());

/// \brief Utility method to run a declaration and collect the results into a vector
///
Expand All @@ -519,6 +521,10 @@ ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatc
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Utility method to run a declaration return results as a RecordBatchReader
Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
bool use_threads);

/// \brief Wrap an ExecBatch generator in a RecordBatchReader.
///
/// The RecordBatchReader does not impose any ordering on emitted batches.
Expand Down
18 changes: 14 additions & 4 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ struct ARROW_EXPORT BackpressureOptions {
class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
public:
explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator,
std::shared_ptr<Schema>* schema = NULLPTR,
BackpressureOptions backpressure = {},
BackpressureMonitor** backpressure_monitor = NULLPTR)
: generator(generator),
schema(schema),
backpressure(std::move(backpressure)),
backpressure_monitor(backpressure_monitor) {}

Expand All @@ -175,6 +177,11 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
/// data from the plan. If this function is not called frequently enough then the sink
/// node will start to accumulate data and may apply backpressure.
std::function<Future<std::optional<ExecBatch>>()>* generator;

/// \brief A pointer to a schema
///
/// This will be set when the plan is created
std::shared_ptr<Schema>* schema;
/// \brief Options to control when to apply backpressure
///
/// This is optional, the default is to never apply backpressure. If the plan is not
Expand Down Expand Up @@ -246,8 +253,9 @@ class ARROW_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions {
public:
explicit OrderBySinkNodeOptions(
SortOptions sort_options,
std::function<Future<std::optional<ExecBatch>>()>* generator)
: SinkNodeOptions(generator), sort_options(std::move(sort_options)) {}
std::function<Future<std::optional<ExecBatch>>()>* generator,
std::shared_ptr<Schema>* schema = NULLPTR)
: SinkNodeOptions(generator, schema), sort_options(std::move(sort_options)) {}

SortOptions sort_options;
};
Expand Down Expand Up @@ -436,8 +444,10 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
public:
explicit SelectKSinkNodeOptions(
SelectKOptions select_k_options,
std::function<Future<std::optional<ExecBatch>>()>* generator)
: SinkNodeOptions(generator), select_k_options(std::move(select_k_options)) {}
std::function<Future<std::optional<ExecBatch>>()>* generator,
std::shared_ptr<Schema>* schema = NULLPTR)
: SinkNodeOptions(generator, schema),
select_k_options(std::move(select_k_options)) {}

/// SelectK options
SelectKOptions select_k_options;
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,14 @@ TEST(ExecPlanExecution, SinkNodeBackpressure) {
BackpressureMonitor* backpressure_monitor;
BackpressureOptions backpressure_options(resume_if_below_bytes, pause_if_above_bytes);
std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
ARROW_EXPECT_OK(compute::Declaration::Sequence(
{
{"source", SourceNodeOptions(schema_, batch_producer)},
{"sink", SinkNodeOptions{&sink_gen, backpressure_options,
&backpressure_monitor}},
})
.AddToPlan(plan.get()));
ARROW_EXPECT_OK(
compute::Declaration::Sequence(
{
{"source", SourceNodeOptions(schema_, batch_producer)},
{"sink", SinkNodeOptions{&sink_gen, /*schema=*/nullptr,
backpressure_options, &backpressure_monitor}},
})
.AddToPlan(plan.get()));
ASSERT_TRUE(backpressure_monitor);
ARROW_EXPECT_OK(plan->StartProducing(GetCpuThreadPool()));

Expand Down
18 changes: 12 additions & 6 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SinkNode : public ExecNode {
public:
SinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
AsyncGenerator<std::optional<ExecBatch>>* generator,
BackpressureOptions backpressure,
std::shared_ptr<Schema>* schema, BackpressureOptions backpressure,
BackpressureMonitor** backpressure_monitor_out)
: ExecNode(plan, std::move(inputs), {"collected"}, {},
/*num_outputs=*/0),
Expand All @@ -101,6 +101,9 @@ class SinkNode : public ExecNode {
if (backpressure_monitor_out) {
*backpressure_monitor_out = &backpressure_queue_;
}
if (schema) {
*schema = inputs_[0]->output_schema();
}
auto node_destroyed_capture = node_destroyed_;
*generator = [this, node_destroyed_capture]() -> Future<std::optional<ExecBatch>> {
if (*node_destroyed_capture) {
Expand All @@ -125,7 +128,7 @@ class SinkNode : public ExecNode {
const auto& sink_options = checked_cast<const SinkNodeOptions&>(options);
RETURN_NOT_OK(ValidateOptions(sink_options));
return plan->EmplaceNode<SinkNode>(plan, std::move(inputs), sink_options.generator,
sink_options.backpressure,
sink_options.schema, sink_options.backpressure,
sink_options.backpressure_monitor);
}

Expand Down Expand Up @@ -404,8 +407,9 @@ static Result<ExecNode*> MakeTableConsumingSinkNode(
struct OrderBySinkNode final : public SinkNode {
OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<std::optional<ExecBatch>>* generator)
: SinkNode(plan, std::move(inputs), generator, /*backpressure=*/{},
AsyncGenerator<std::optional<ExecBatch>>* generator,
std::shared_ptr<Schema>* schema)
: SinkNode(plan, std::move(inputs), generator, schema, /*backpressure=*/{},
/*backpressure_monitor_out=*/nullptr),
impl_(std::move(impl)) {}

Expand All @@ -426,7 +430,8 @@ struct OrderBySinkNode final : public SinkNode {
OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
sink_options.sort_options));
return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), std::move(impl),
sink_options.generator);
sink_options.generator,
sink_options.schema);
}

static Status ValidateCommonOrderOptions(const SinkNodeOptions& options) {
Expand Down Expand Up @@ -458,7 +463,8 @@ struct OrderBySinkNode final : public SinkNode {
OrderByImpl::MakeSelectK(plan->exec_context(), inputs[0]->output_schema(),
sink_options.select_k_options));
return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), std::move(impl),
sink_options.generator);
sink_options.generator,
sink_options.schema);
}

static Status ValidateSelectKOptions(const SelectKSinkNodeOptions& options) {
Expand Down
34 changes: 6 additions & 28 deletions cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,37 +130,15 @@ Result<Datum> GroupByUsingExecPlan(const BatchesWithSchema& input,
keys[i] = FieldRef(key_names[i]);
}

ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make());
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
RETURN_NOT_OK(
Declaration::Sequence(
{
{"source",
SourceNodeOptions{input.schema, input.gen(use_threads, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{std::move(aggregates), std::move(keys)}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

RETURN_NOT_OK(plan->Validate());
RETURN_NOT_OK(plan->StartProducing(ctx->executor()));

auto collected_fut = CollectAsyncGenerator(sink_gen);

auto start_and_collect =
AllFinished({plan->finished(), Future<>(collected_fut)})
.Then([collected_fut]() -> Result<std::vector<ExecBatch>> {
ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result());
return ::arrow::internal::MapVector(
[](std::optional<ExecBatch> batch) { return std::move(*batch); },
std::move(collected));
});

std::shared_ptr<Schema> output_schema;
Declaration decl = Declaration::Sequence(
{{"source",
SourceNodeOptions{input.schema, input.gen(use_threads, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{std::move(aggregates), std::move(keys)}}});
ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
start_and_collect.MoveResult());
DeclarationToExecBatches(decl, &output_schema, ctx));

ArrayVector out_arrays(aggregates.size() + key_names.size());
const auto& output_schema = plan->sources()[0]->outputs()[0]->output_schema();
for (size_t i = 0; i < out_arrays.size(); ++i) {
std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
for (size_t j = 0; j < output_batches.size(); ++j) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
{"filter", compute::FilterNodeOptions{scan_options_->filter}},
{"augmented_project",
compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"sink", compute::SinkNodeOptions{&sink_gen, scan_options_->backpressure}},
{"sink", compute::SinkNodeOptions{&sink_gen, /*schema=*/nullptr,
scan_options_->backpressure}},
})
.AddToPlan(plan.get()));

Expand Down
Loading