diff --git a/cpp/src/arrow/acero/filter_node.cc b/cpp/src/arrow/acero/filter_node.cc index e5435487a00..1414f6108f4 100644 --- a/cpp/src/arrow/acero/filter_node.cc +++ b/cpp/src/arrow/acero/filter_node.cc @@ -73,12 +73,12 @@ class FilterNode : public MapNode { Result ProcessBatch(ExecBatch batch) override { ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, SimplifyWithGuarantee(filter_, batch.guarantee)); - arrow::util::tracing::Span span; START_COMPUTE_SPAN(span, "Filter", {{"filter.expression", ToStringExtra()}, {"filter.expression.simplified", simplified_filter.ToString()}, - {"filter.length", batch.length}}); + {"filter.length", batch.length}, + {"input_batch.size_bytes", batch.TotalBufferSize()}}); ARROW_ASSIGN_OR_RAISE( Datum mask, ExecuteScalarExpression(simplified_filter, batch, @@ -87,8 +87,10 @@ class FilterNode : public MapNode { if (mask.is_scalar()) { const auto& mask_scalar = mask.scalar_as(); if (mask_scalar.is_valid && mask_scalar.value) { + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", batch.TotalBufferSize()); return batch; } + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", 0); return batch.Slice(0, 0); } @@ -101,7 +103,10 @@ class FilterNode : public MapNode { if (value.is_scalar()) continue; ARROW_ASSIGN_OR_RAISE(value, Filter(value, mask, FilterOptions::Defaults())); } - return ExecBatch::Make(std::move(values)); + auto filtered_batch = ExecBatch::Make(std::move(values)); + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", + filtered_batch->TotalBufferSize()); + return filtered_batch; } protected: diff --git a/cpp/src/arrow/acero/project_node.cc b/cpp/src/arrow/acero/project_node.cc index bcabe585b29..d69524fca6d 100644 --- a/cpp/src/arrow/acero/project_node.cc +++ b/cpp/src/arrow/acero/project_node.cc @@ -79,12 +79,14 @@ class ProjectNode : public MapNode { Result ProcessBatch(ExecBatch batch) override { std::vector values{exprs_.size()}; + arrow::util::tracing::Span span; + START_COMPUTE_SPAN(span, "Project", + {{"project.length", batch.length}, + {"input_batch.size_bytes", batch.TotalBufferSize()}}); for (size_t i = 0; i < exprs_.size(); ++i) { - arrow::util::tracing::Span span; - START_COMPUTE_SPAN(span, "Project", - {{"project.type", exprs_[i].type()->ToString()}, - {"project.length", batch.length}, - {"project.expression", exprs_[i].ToString()}}); + std::string project_name = "project[" + std::to_string(i) + "]"; + ATTRIBUTE_ON_CURRENT_SPAN(project_name + ".type", exprs_[i].type()->ToString()); + ATTRIBUTE_ON_CURRENT_SPAN(project_name + ".expression", exprs_[i].ToString()); ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], batch.guarantee)); @@ -92,6 +94,7 @@ class ProjectNode : public MapNode { values[i], ExecuteScalarExpression(simplified_expr, batch, plan()->query_context()->exec_context())); } + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", batch.TotalBufferSize()); return ExecBatch{std::move(values), batch.length}; } diff --git a/cpp/src/arrow/acero/sink_node.cc b/cpp/src/arrow/acero/sink_node.cc index 4ab6b4537de..0d7d815cfe5 100644 --- a/cpp/src/arrow/acero/sink_node.cc +++ b/cpp/src/arrow/acero/sink_node.cc @@ -539,6 +539,7 @@ struct OrderBySinkNode final : public SinkNode { Status Finish() override { arrow::util::tracing::Span span; + START_SPAN(span, std::string(kind_name()) + "::Finish"); ARROW_RETURN_NOT_OK(DoFinish()); return SinkNode::Finish(); } diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 8060e01f074..e6f3e801191 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -133,6 +133,8 @@ struct SourceNode : ExecNode, public TracedNode { plan_->query_context()->ScheduleTask( [this, morsel_length, use_legacy_batching, initial_batch_index, morsel, has_ordering = !ordering_.is_unordered()]() { + arrow::util::tracing::Span span; + START_SPAN(span, "SourceNode::ProcessMorsel"); int64_t offset = 0; int batch_index = initial_batch_index; do { @@ -163,6 +165,7 @@ struct SourceNode : ExecNode, public TracedNode { Status StartProducing() override { NoteStartProducing(ToStringExtra()); + { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this @@ -184,6 +187,9 @@ struct SourceNode : ExecNode, public TracedNode { options.should_schedule = ShouldSchedule::IfDifferentExecutor; ARROW_ASSIGN_OR_RAISE(Future<> scan_task, plan_->query_context()->BeginExternalTask( "SourceNode::DatasetScan")); + arrow::util::tracing::Span span; + START_SPAN(span, "SourceNode::DatasetScan"); + if (!scan_task.is_valid()) { // Plan has already been aborted, no need to start scanning return Status::OK(); @@ -195,9 +201,6 @@ struct SourceNode : ExecNode, public TracedNode { } lock.unlock(); - arrow::util::tracing::Span fetch_batch_span; - auto fetch_batch_scope = - START_SCOPED_SPAN(fetch_batch_span, "SourceNode::ReadBatch"); return generator_().Then( [this]( const std::optional& morsel_or_end) -> Future> { diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index c0433145dd1..7ec5123c073 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -219,7 +219,7 @@ struct FunctionExecutorImpl : public FunctionExecutor { } Result Execute(const std::vector& args, int64_t passed_length) override { - util::tracing::Span span; + arrow::util::tracing::Span span; auto func_kind = func.kind(); const auto& func_name = func.name(); diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index bf703b6c6ba..106d9c19cfa 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -43,12 +43,14 @@ #include "arrow/type.h" #include "arrow/type_fwd.h" #include "arrow/util/async_generator.h" +#include "arrow/util/byte_size.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/utf8_internal.h" #include "arrow/util/vector.h" @@ -881,6 +883,8 @@ class StreamingReaderImpl : public ReaderMixin, } Future> ReadNextAsync() override { + util::tracing::Span span; + START_SPAN(span, "arrow::csv::ReadNextAsync"); return record_batch_gen_(); } @@ -892,6 +896,11 @@ class StreamingReaderImpl : public ReaderMixin, return Status::Invalid("Empty CSV file"); } + // Create a arrow::csv::ReadNextAsync span so that grouping by that name does not + // ignore the work performed for this first block. + util::tracing::Span read_span; + auto scope = START_SCOPED_SPAN(read_span, "arrow::csv::ReadNextAsync"); + std::shared_ptr after_header; ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, ProcessHeader(first_buffer, &after_header)); @@ -911,9 +920,12 @@ class StreamingReaderImpl : public ReaderMixin, auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); - return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) { + auto init_finished = rb_gen().Then([self, rb_gen, max_readahead, + read_span = std::move(read_span) + ](const DecodedBlock& first_block) { return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); }); + return init_finished; } Future<> InitFromBlock(const DecodedBlock& block, diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index aac437df075..1c206017290 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -29,8 +29,10 @@ #include "arrow/scalar.h" #include "arrow/type.h" #include "arrow/util/async_generator.h" +#include "arrow/util/byte_size.h" #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" +#include "arrow/util/tracing_internal.h" namespace arrow { namespace dataset { @@ -144,6 +146,12 @@ inline RecordBatchGenerator MakeChunkedBatchGenerator(RecordBatchGenerator gen, [batch_size](const std::shared_ptr& batch) -> ::arrow::AsyncGenerator> { const int64_t rows = batch->num_rows(); + util::tracing::Span span; + START_SPAN(span, "MakeChunkedBatchGenerator", + {{"target_batch_size_rows", batch_size}, + {"batch.size_rows", rows}, + {"batch.size_bytes", util::TotalBufferSize(*batch)}, + {"output_batches", rows / batch_size + (rows % batch_size != 0)}}); if (rows <= batch_size) { return ::arrow::MakeVectorGenerator>({batch}); } diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index a2096d691b4..d660f6f948f 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -27,6 +27,7 @@ #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/table.h" +#include "arrow/util/byte_size.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" #include "arrow/util/map.h" @@ -191,9 +192,13 @@ class DatasetWriterFileQueue { } Result PopAndDeliverStagedBatch() { + util::tracing::Span span; + START_SPAN(span, "DatasetWriter::Pop"); ARROW_ASSIGN_OR_RAISE(std::shared_ptr next_batch, PopStagedBatch()); int64_t rows_popped = next_batch->num_rows(); rows_currently_staged_ -= next_batch->num_rows(); + ATTRIBUTE_ON_CURRENT_SPAN("batch.size_rows", next_batch->num_rows()); + ATTRIBUTE_ON_CURRENT_SPAN("rows_currently_staged", rows_currently_staged_); ScheduleBatch(std::move(next_batch)); return rows_popped; } @@ -202,7 +207,15 @@ class DatasetWriterFileQueue { Status Push(std::shared_ptr batch) { uint64_t delta_staged = batch->num_rows(); rows_currently_staged_ += delta_staged; - staged_batches_.push_back(std::move(batch)); + { + util::tracing::Span span; + START_SPAN(span, "DatasetWriter::Push", + {{"batch.size_rows", batch->num_rows()}, + {"rows_currently_staged", rows_currently_staged_}, + {"options_.min_rows_per_group", options_.min_rows_per_group}, + {"max_rows_staged", writer_state_->max_rows_staged}}); + staged_batches_.push_back(std::move(batch)); + } while (!staged_batches_.empty() && (writer_state_->StagingFull() || rows_currently_staged_ >= options_.min_rows_per_group)) { @@ -233,6 +246,18 @@ class DatasetWriterFileQueue { return DeferNotOk(options_.filesystem->io_context().executor()->Submit( [self = this, batch = std::move(next)]() { int64_t rows_to_release = batch->num_rows(); +#ifdef ARROW_WITH_OPENTELEMETRY + uint64_t size_bytes = util::TotalBufferSize(*batch); + uint64_t num_buffers = 0; + for (auto column : batch->columns()) { + num_buffers += column->data()->buffers.size(); + } + util::tracing::Span span; + START_SPAN(span, "DatasetWriter::WriteNext", + {{"threadpool", "IO"}, + {"batch.size_bytes", size_bytes}, + {"batch.num_buffers", num_buffers}}); +#endif Status status = self->writer_->Write(batch); self->writer_state_->rows_in_flight_throttle.Release(rows_to_release); return status; @@ -261,11 +286,6 @@ class DatasetWriterFileQueue { util::AsyncTaskScheduler* file_tasks_ = nullptr; }; -struct WriteTask { - std::string filename; - uint64_t num_rows; -}; - class DatasetWriterDirectoryQueue { public: DatasetWriterDirectoryQueue(util::AsyncTaskScheduler* scheduler, std::string directory, @@ -301,7 +321,6 @@ class DatasetWriterDirectoryQueue { Status StartWrite(const std::shared_ptr& batch) { rows_written_ += batch->num_rows(); - WriteTask task{current_filename_, static_cast(batch->num_rows())}; if (!latest_open_file_) { ARROW_RETURN_NOT_OK(OpenFileQueue(current_filename_)); } @@ -351,6 +370,8 @@ class DatasetWriterDirectoryQueue { latest_open_file_tasks_ = util::MakeThrottledAsyncTaskGroup( scheduler_, 1, /*queue=*/nullptr, std::move(file_finish_task)); if (init_future_.is_valid()) { + util::tracing::Span span; + START_SPAN(span, "arrow::dataset::WaitForDirectoryInit"); latest_open_file_tasks_->AddSimpleTask( [init_future = init_future_]() { return init_future; }, "DatasetWriter::WaitForDirectoryInit"sv); @@ -362,6 +383,8 @@ class DatasetWriterDirectoryQueue { uint64_t rows_written() const { return rows_written_; } void PrepareDirectory() { + util::tracing::Span span; + START_SPAN(span, "arrow::dataset::SubmitPrepareDirectoryTask"); if (directory_.empty() || !write_options_.create_dir) { return; } @@ -383,6 +406,8 @@ class DatasetWriterDirectoryQueue { if (write_options_.existing_data_behavior == ExistingDataBehavior::kDeleteMatchingPartitions) { init_task = [this, create_dir_cb, notify_waiters_cb, notify_waiters_on_err_cb] { + util::tracing::Span span; + START_SPAN(span, "arrow::dataset::PrepareDirectory"); return write_options_.filesystem ->DeleteDirContentsAsync(directory_, /*missing_dir_ok=*/true) @@ -614,12 +639,14 @@ class DatasetWriter::DatasetWriterImpl { backpressure = writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); if (!backpressure.is_finished()) { + EVENT(scheduler_->span(), "DatasetWriter::Backpressure::TooManyRowsQueued"); EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued"); break; } if (will_open_file) { backpressure = writer_state_.open_files_throttle.Acquire(1); if (!backpressure.is_finished()) { + EVENT(scheduler_->span(), "DatasetWriter::Backpressure::TooManyOpenFiles"); EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); RETURN_NOT_OK(TryCloseLargestFile()); break; diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 09ab775727c..82de5c6c8e2 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -278,10 +278,6 @@ static inline Result GetReadOptions( static inline Future> OpenReaderAsync( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options, Executor* cpu_executor) { -#ifdef ARROW_WITH_OPENTELEMETRY - auto tracer = arrow::internal::tracing::GetTracer(); - auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); -#endif ARROW_ASSIGN_OR_RAISE( auto fragment_scan_options, GetFragmentScanOptions( @@ -300,31 +296,24 @@ static inline Future> OpenReaderAsync( // input->Peek call blocks so we run the whole thing on the I/O thread pool. auto reader_fut = DeferNotOk(input->io_context().executor()->Submit( [=]() -> Future> { + ARROW_ASSIGN_OR_RAISE(auto first_block, input->Peek(reader_options.block_size)); const auto& parse_options = format.parse_options; ARROW_ASSIGN_OR_RAISE( auto convert_options, GetConvertOptions(format, scan_options ? scan_options.get() : nullptr, first_block)); - return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), - cpu_executor, reader_options, - parse_options, convert_options); + return csv::StreamingReader::MakeAsync( + io::default_io_context(), std::move(input), cpu_executor, reader_options, + parse_options, convert_options); })); return reader_fut.Then( // Adds the filename to the error [=](const std::shared_ptr& reader) -> Result> { -#ifdef ARROW_WITH_OPENTELEMETRY - span->SetStatus(opentelemetry::trace::StatusCode::kOk); - span->End(); -#endif return reader; }, [=](const Status& err) -> Result> { -#ifdef ARROW_WITH_OPENTELEMETRY - arrow::internal::tracing::MarkSpan(err, span.get()); - span->End(); -#endif return err.WithMessage("Could not open CSV input source '", path, "': ", err); }); } @@ -384,8 +373,6 @@ Result CsvFileFormat::ScanBatchesAsync( auto reader_fut = OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); - WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( - generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); return generator; } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 6e801e1f8ad..ee1fb685809 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -57,6 +57,7 @@ #include "arrow/util/parallel.h" #include "arrow/util/string.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/ubsan.h" #include "arrow/util/vector.h" #include "arrow/visit_type_inline.h" @@ -523,6 +524,12 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op return ::arrow::internal::OptionalParallelFor( options.use_threads, static_cast(buffers.size()), [&](int i) { + util::tracing::Span span; + START_SPAN(span, "arrow::ipc::DecompressBuffer", + {{"buffer_index", i}, + {"ipc.compression.codec", codec.get()->name().c_str()}, + {"ipc.options.use_threads", options.use_threads}, + {"size.uncompressed", (*buffers[i])->size() - sizeof(int64_t)}}); ARROW_ASSIGN_OR_RAISE(*buffers[i], DecompressBuffer(*buffers[i], options, codec.get())); return Status::OK(); diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index e4b49ed5646..9c9615918e7 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -55,6 +55,7 @@ #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/parallel.h" +#include "arrow/util/tracing_internal.h" #include "arrow/visit_array_inline.h" #include "arrow/visit_type_inline.h" @@ -232,6 +233,12 @@ class RecordBatchSerializer { auto CompressOne = [&](size_t i) { if (out_->body_buffers[i]->size() > 0) { + util::tracing::Span span; + START_SPAN(span, "arrow::ipc::CompressBuffers", + {{"buffer_index", i}, + {"ipc.compression.codec", options_.codec.get()->name().c_str()}, + {"ipc.options.use_threads", options_.use_threads}, + {"size.uncompressed", out_->body_buffers[i]->size()}}); RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(), &out_->body_buffers[i])); } diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 63e27bfbe57..6d6cf457144 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -122,28 +122,6 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue { std::list> tasks_; }; -#ifdef ARROW_WITH_OPENTELEMETRY -::arrow::internal::tracing::Scope TraceTaskSubmitted(AsyncTaskScheduler::Task* task, - const util::tracing::Span& parent) { - if (task->span.valid()) { - EVENT(task->span, "task submitted"); - return ACTIVATE_SPAN(task->span); - } - - return START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), - {{"task.cost", task->cost()}}); -} - -void TraceTaskQueued(AsyncTaskScheduler::Task* task, const util::tracing::Span& parent) { - START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), - {{"task.cost", task->cost()}}); -} - -void TraceTaskFinished(AsyncTaskScheduler::Task* task) { END_SPAN(task->span); } - -void TraceSchedulerAbort(const Status& error) { EVENT_ON_CURRENT_SPAN(error.ToString()); } -#endif - class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { public: using Task = AsyncTaskScheduler::Task; @@ -168,6 +146,25 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (IsAborted()) { return false; } +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct WrapperTask : public Task { + WrapperTask( + std::unique_ptr target, + opentelemetry::nostd::shared_ptr parent_span) + : target(std::move(target)), parent_span(std::move(parent_span)) {} + Result> operator()() override { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); + return (*target)(); + } + int cost() const override { return target->cost(); } + std::string_view name() const override { return target->name(); } + std::unique_ptr target; + opentelemetry::nostd::shared_ptr parent_span; + }; + task = std::make_unique( + std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); +#endif SubmitTaskUnlocked(std::move(task), std::move(lk)); return true; } @@ -202,9 +199,6 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { // Capture `task` to keep it alive until finished if (!submit_result->TryAddCallback([this, task_inner = std::move(task)]() mutable { return [this, task_inner2 = std::move(task_inner)](const Status& st) mutable { -#ifdef ARROW_WITH_OPENTELEMETRY - TraceTaskFinished(task_inner2.get()); -#endif // OnTaskFinished might trigger the scheduler to end. We want to ensure that // is the very last thing that happens after all task destructors have run so // we eagerly destroy the task first. @@ -232,7 +226,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (!IsAborted()) { maybe_error_ = st; #ifdef ARROW_WITH_OPENTELEMETRY - TraceSchedulerAbort(st); + EVENT(span(), "Task aborted", {{"Error", st.ToString()}}); #endif // Add one more "task" to represent running the abort callback. This // will prevent any other task finishing and marking the scheduler finished @@ -254,12 +248,6 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { AbortUnlocked(stop_token_.Poll(), std::move(lk)); return; } -#ifdef ARROW_WITH_OPENTELEMETRY - // It's important that the task's span be active while we run the submit function. - // Normally the submit function should transfer the span to the thread task as the - // active span. - auto scope = TraceTaskSubmitted(task.get(), span_); -#endif running_tasks_++; lk.unlock(); return DoSubmitTask(std::move(task)); @@ -298,6 +286,25 @@ class ThrottledAsyncTaskSchedulerImpl } bool AddTask(std::unique_ptr task) override { +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct WrapperTask : public Task { + WrapperTask( + std::unique_ptr target, + opentelemetry::nostd::shared_ptr parent_span) + : target(std::move(target)), parent_span(std::move(parent_span)) {} + Result> operator()() override { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); + return (*target)(); + } + int cost() const override { return target->cost(); } + std::string_view name() const override { return target->name(); } + std::unique_ptr target; + opentelemetry::nostd::shared_ptr parent_span; + }; + task = std::make_unique( + std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); +#endif std::unique_lock lk(mutex_); // If the queue isn't empty then don't even try and acquire the throttle // We can safely assume it is either blocked or in the middle of trying to @@ -310,7 +317,10 @@ class ThrottledAsyncTaskSchedulerImpl std::optional> maybe_backoff = throttle_->TryAcquire(latched_cost); if (maybe_backoff) { #ifdef ARROW_WITH_OPENTELEMETRY - TraceTaskQueued(task.get(), span()); + EVENT(span(), "Task submission throttled", + {{"task.name", ::opentelemetry::nostd::string_view(task->name().data(), + task->name().size())}, + {"task.cost", task->cost()}}); #endif queue_->Push(std::move(task)); lk.unlock(); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index ac92618ff66..b42e8ae892d 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -31,6 +31,7 @@ #define __EXTENSIONS__ #endif +#include "arrow/util/tracing_internal.h" #include "arrow/util/windows_compatibility.h" // IWYU pragma: keep #include diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 58668cab18b..583921ce0f5 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -156,9 +156,9 @@ nostd::shared_ptr InitializeSdkTracerProvider() { auto exporter = InitializeExporter(); if (exporter) { sdktrace::BatchSpanProcessorOptions options; - options.max_queue_size = 16384; + options.max_queue_size = 65536; options.schedule_delay_millis = std::chrono::milliseconds(500); - options.max_export_batch_size = 16384; + options.max_export_batch_size = 65536; auto processor = std::make_unique(std::move(exporter), options); return std::make_shared(std::move(processor)); diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index a031edf08dc..d21ecb9d751 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -72,10 +72,11 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, return [=]() mutable -> Future { auto span = active_span; auto scope = GetTracer()->WithActiveSpan(active_span); - auto fut = wrapped(); if (create_childspan) { span = GetTracer()->StartSpan(span_name); + scope = GetTracer()->WithActiveSpan(span); } + auto fut = wrapped(); fut.AddCallback([span](const Result& result) { MarkSpan(result.status(), span.get()); span->End(); @@ -178,6 +179,9 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( #define EVENT_ON_CURRENT_SPAN(...) \ ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()->AddEvent(__VA_ARGS__) +#define ATTRIBUTE_ON_CURRENT_SPAN(...) \ + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()->SetAttribute(__VA_ARGS__) + #define EVENT(target_span, ...) \ ::arrow::internal::tracing::UnwrapSpan(target_span.details.get())->AddEvent(__VA_ARGS__) @@ -231,6 +235,7 @@ struct Scope { #define MARK_SPAN(target_span, status) #define EVENT(target_span, ...) #define EVENT_ON_CURRENT_SPAN(...) +#define ATTRIBUTE_ON_CURRENT_SPAN(...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future) #define PROPAGATE_SPAN_TO_GENERATOR(generator) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 99b8a9ccef1..cdbe422f406 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -33,6 +33,7 @@ #include "arrow/type.h" #include "arrow/util/async_generator.h" #include "arrow/util/bit_util.h" +#include "arrow/util/byte_size.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" @@ -275,13 +276,21 @@ class FileReaderImpl : public FileReader { std::string phys_type = TypeToString(reader_->metadata()->schema()->Column(i)->physical_type()); ::arrow::util::tracing::Span span; - START_SPAN(span, "parquet::arrow::read_column", + START_SPAN(span, "parquet::arrow::ReadColumn", {{"parquet.arrow.columnindex", i}, {"parquet.arrow.columnname", column_name}, {"parquet.arrow.physicaltype", phys_type}, - {"parquet.arrow.records_to_read", records_to_read}}); -#endif + { "parquet.arrow.records_to_read", + records_to_read }}); + + auto status = reader->NextBatch(records_to_read, out); + + uint64_t size_bytes = ::arrow::util::TotalBufferSize(*out->get()); + ATTRIBUTE_ON_CURRENT_SPAN("parquet.arrow.output_batch_size_bytes", size_bytes); + return status; +#else return reader->NextBatch(records_to_read, out); +#endif END_PARQUET_CATCH_EXCEPTIONS } @@ -1035,7 +1044,11 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( reader_properties_.use_threads(), static_cast(readers.size()), - [&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); })); + [&](int i) { + ::arrow::util::tracing::Span span; + START_SPAN(span, "parquet::arrow::GetRecordBatchReader::NextBatch"); + return readers[i]->NextBatch(batch_size, &columns[i]); + })); for (const auto& column : columns) { if (column == nullptr || column->length() == 0) { diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 0c67e8d6bb3..ded81b0e263 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -16,6 +16,7 @@ // under the License. #include "parquet/arrow/writer.h" +#include "arrow/util/tracing_internal.h" #include #include @@ -129,6 +130,8 @@ class ArrowColumnWriterV2 { // // Columns are written in DFS order. Status Write(ArrowWriteContext* ctx) { + ::arrow::util::tracing::Span span; + START_SPAN(span, "parquet::arrow::ArrowColumnWriterV2::Write"); for (int leaf_idx = 0; leaf_idx < leaf_count_; leaf_idx++) { ColumnWriter* column_writer; if (row_group_writer_->buffered()) { diff --git a/docs/source/cpp/opentelemetry.rst b/docs/source/cpp/opentelemetry.rst index db6c0ac4469..b8eca0f4b79 100644 --- a/docs/source/cpp/opentelemetry.rst +++ b/docs/source/cpp/opentelemetry.rst @@ -85,3 +85,88 @@ at http://localhost:16686. Note that unlike with other methods of exporting traces, no output will be made to stdout/stderr. However, if you tail your Docker container logs, you should see output when traces are received by the all-in-one container. + +Note that the volume of spans produced by Acero can quickly become overwhelming +for many tracing frameworks. Several spans are produced per input +file, input batch, internal chunk of data (called Morsel, consisting of 128k +rows by default) and per output file (possibly also divided by columns). +In practice, this means that for each MB of data processed by Acero, it will +produce 10 - 20 spans. Choose a suitably sized dataset that strikes a balance +between being representative for the real-world workload, but not too large to +be inspected with (or even ingested by!) a span visualizer such as Jaeger. + +Additional background on tracing +-------------------------------- +Traces produced by Acero are conceptually similar to information produced by +using profiling tools, but they are not the same. +For example, the spans by Acero do not necessarily follow the structure of the +code, like in case of the call-stacks and flame-graphs produced by profiling. +The spans aim to highlight: +- code sections that are performing significant work on the CPU +- code sections that perform I/O operations (reading/writing to disk) +- The way blocks of data flow through the execution graph +- The way data is being reorganized (e.g. a file being split into blocks) +Each span instance can have various attributes added to it when it is created. +This allows us to capture the exact size of each block of data and the amount +of time each node in the execution graph has spent on it. + +Span hierarchy +---------------------- +Traces are organized in a hierarchical fashion, where each span except the root +span has parents and can have any number of children. +If a span has a child span active during its lifetime, this usually means that +this parent span is not actually in control of the CPU. Thus, calculating the +total CPU time is not as easy as adding up all of the span durations; only the +time that a span does not have any active children (this is often referred to +as the "self-time") should count. +However, Acero is a multi-threaded engine, so it is likely that there are +in fact multiple spans performing work on a CPU at any given time! + +To achieve this multi-threaded behavior, many sections of code are +executed through a task scheduling mechanism. When these tasks are scheduled, +they can start execution immediately or some time in the future. +Often, a certain span is active that represents the lifetime of some resource +(like a scanner, but also a certain batch of data) that functions as the parent +of a set of spans where actual compute happens. +Care must be taken when aggregating the durations of these spans. + +Structure of Acero traces +------------------------- +Acero traces are structured to allow following pieces of data as they flow +through the graph. Each node's function (a kernel) is represented as a child +span of the preceding node. +Acero uses "Morsel-driven parallelism" where batches of data called "morsels" +flow through the graph. +The morsels are produced by e.g. a DatasetScanner. +First, the DatasetScanner reads files (called Fragments) into Batches. +Depending on the size of the fragments it will produce several Batches per +Fragment. +Then, it may slice the Batches so they do conform to the maximum size of a +morsel. +Each morsel has a toplevel span called ProcessMorsel. +Currently, the DatasetScanner cannot connect its output to the ProcessMorsel +spans due to the asynchronous structure of the code. +The dataset writer will gather batches of data in its staging area, and will +issue a write operation once it has enough rows. +This is represented by the DatasetWriter::Push and DatasetWriter::Pop spans. +These also carry the current fill level of the staging area. +This means that some Morsels will not trigger a write. +Only if a morsel causes the staging area to overflow its threshold, +a DatasetWriter::Pop is triggered that will perform a write operation. + + +Backpressure +------------ +When a node in the execution graph is receiving more data than it can process, +it can ask its preceding nodes to slow down. This process is called +"backpressure". Reasons for this can include for example: +- the buffer capacity for the node is almost full +- the maximum number of concurrently open files is reached +Relevant events such as a node applying/releasing backpressure, or an async task +group/scheduler throttling task submission, are posted as events to the toplevel +span that belongs to the asynchronous task scheduler, + and can also be posted to the "local" span (that belongs to the block of data + that caused the backpressure). + + +