diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 71bbc34187e..64919f3f3d1 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -62,18 +62,42 @@ Result> AllocateDataBuffer(KernelContext* ctx, int64_t l if (bit_width == 1) { return ctx->AllocateBitmap(length); } else { - ARROW_CHECK_EQ(bit_width % 8, 0) - << "Only bit widths with multiple of 8 are currently supported"; - int64_t buffer_size = length * bit_width / 8; + int64_t buffer_size = BitUtil::BytesForBits(length * bit_width); return ctx->Allocate(buffer_size); } return Status::OK(); } -bool CanPreallocate(const DataType& type) { - // There are currently cases where NullType is the output type, so we disable - // any preallocation logic when this occurs - return is_fixed_width(type.id()) && type.id() != Type::NA; +struct BufferPreallocation { + explicit BufferPreallocation(int bit_width = -1, int added_length = 0) + : bit_width(bit_width), added_length(added_length) {} + + int bit_width; + int added_length; +}; + +void ComputeDataPreallocate(const DataType& type, + std::vector* widths) { + if (is_fixed_width(type.id()) && type.id() != Type::NA) { + widths->emplace_back(checked_cast(type).bit_width()); + return; + } + // Preallocate binary and list offsets + switch (type.id()) { + case Type::BINARY: + case Type::STRING: + case Type::LIST: + case Type::MAP: + widths->emplace_back(32, /*added_length=*/1); + return; + case Type::LARGE_BINARY: + case Type::LARGE_STRING: + case Type::LARGE_LIST: + widths->emplace_back(64, /*added_length=*/1); + return; + default: + break; + } } Status GetValueDescriptors(const std::vector& args, @@ -88,6 +112,16 @@ Status GetValueDescriptors(const std::vector& args, namespace detail { +Status CheckAllValues(const std::vector& values) { + for (const auto& value : values) { + if (!value.is_value()) { + return Status::Invalid("Tried executing function with non-value type: ", + value.ToString()); + } + } + return Status::OK(); +} + ExecBatchIterator::ExecBatchIterator(std::vector args, int64_t length, int64_t max_chunksize) : args_(std::move(args)), @@ -184,6 +218,8 @@ bool ExecBatchIterator::Next(ExecBatch* batch) { return true; } +namespace { + bool ArrayHasNulls(const ArrayData& data) { // As discovered in ARROW-8863 (and not only for that reason) // ArrayData::null_count can -1 even when buffers[0] is nullptr. So we check @@ -393,28 +429,6 @@ class NullPropagator { bool bitmap_preallocated_ = false; }; -Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* output) { - DCHECK_NE(nullptr, output); - DCHECK_GT(output->buffers.size(), 0); - - if (output->type->id() == Type::NA) { - // Null output type is a no-op (rare when this would happen but we at least - // will test for it) - return Status::OK(); - } - - // This function is ONLY able to write into output with non-zero offset - // when the bitmap is preallocated. This could be a DCHECK but returning - // error Status for now for emphasis - if (output->offset != 0 && output->buffers[0] == nullptr) { - return Status::Invalid( - "Can only propagate nulls into pre-allocated memory " - "when the output offset is non-zero"); - } - NullPropagator propagator(ctx, batch, output); - return propagator.Execute(); -} - std::shared_ptr ToChunkedArray(const std::vector& values, const std::shared_ptr& type) { std::vector> arrays; @@ -438,16 +452,6 @@ bool HaveChunkedArray(const std::vector& values) { return false; } -Status CheckAllValues(const std::vector& values) { - for (const auto& value : values) { - if (!value.is_value()) { - return Status::Invalid("Tried executing function with non-value type: ", - value.ToString()); - } - } - return Status::OK(); -} - template class FunctionExecutorImpl : public FunctionExecutor { public: @@ -499,10 +503,14 @@ class FunctionExecutorImpl : public FunctionExecutor { if (validity_preallocated_) { ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_.AllocateBitmap(length)); } - if (data_preallocated_) { - const auto& fw_type = checked_cast(*out->type); - ARROW_ASSIGN_OR_RAISE( - out->buffers[1], AllocateDataBuffer(&kernel_ctx_, length, fw_type.bit_width())); + for (size_t i = 0; i < data_preallocated_.size(); ++i) { + const auto& prealloc = data_preallocated_[i]; + if (prealloc.bit_width >= 0) { + ARROW_ASSIGN_OR_RAISE( + out->buffers[i + 1], + AllocateDataBuffer(&kernel_ctx_, length + prealloc.added_length, + prealloc.bit_width)); + } } return out; } @@ -523,12 +531,13 @@ class FunctionExecutorImpl : public FunctionExecutor { int output_num_buffers_; - // If true, then the kernel writes into a preallocated data buffer - bool data_preallocated_ = false; - // If true, then memory is preallocated for the validity bitmap with the same // strategy as the data buffer(s). bool validity_preallocated_ = false; + + // The kernel writes into data buffers preallocated for these bit widths + // (0 indicates no preallocation); + std::vector data_preallocated_; }; class ScalarExecutor : public FunctionExecutorImpl { @@ -675,24 +684,27 @@ class ScalarExecutor : public FunctionExecutorImpl { output_num_buffers_ = static_cast(output_descr_.type->layout().buffers.size()); // Decide if we need to preallocate memory for this kernel - data_preallocated_ = ((kernel_->mem_allocation == MemAllocation::PREALLOCATE) && - CanPreallocate(*output_descr_.type)); validity_preallocated_ = (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + ComputeDataPreallocate(*output_descr_.type, &data_preallocated_); + } - // Contiguous preallocation only possible if both the VALIDITY and DATA can - // be preallocated. Otherwise, we must go chunk-by-chunk. Note that when - // the DATA cannot be preallocated, the VALIDITY may still be preallocated - // depending on the NullHandling of the kernel + // Contiguous preallocation only possible on non-nested types if all + // buffers are preallocated. Otherwise, we must go chunk-by-chunk. // - // Some kernels are unable to write into sliced outputs, so we respect the - // kernel's attributes + // Some kernels are also unable to write into sliced outputs, so we respect the + // kernel's attributes. preallocate_contiguous_ = (exec_ctx_->preallocate_contiguous() && kernel_->can_write_into_slices && - data_preallocated_ && validity_preallocated_); + validity_preallocated_ && !is_nested(output_descr_.type->id()) && + data_preallocated_.size() == static_cast(output_num_buffers_ - 1) && + std::all_of(data_preallocated_.begin(), data_preallocated_.end(), + [](const BufferPreallocation& prealloc) { + return prealloc.bit_width >= 0; + })); if (preallocate_contiguous_) { - DCHECK_EQ(2, output_num_buffers_); ARROW_ASSIGN_OR_RAISE(preallocated_, PrepareOutput(total_length)); } return Status::OK(); @@ -825,11 +837,12 @@ class VectorExecutor : public FunctionExecutorImpl { output_num_buffers_ = static_cast(output_descr_.type->layout().buffers.size()); // Decide if we need to preallocate memory for this kernel - data_preallocated_ = ((kernel_->mem_allocation == MemAllocation::PREALLOCATE) && - CanPreallocate(*output_descr_.type)); validity_preallocated_ = (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + ComputeDataPreallocate(*output_descr_.type, &data_preallocated_); + } return Status::OK(); } @@ -901,15 +914,39 @@ Result> MakeExecutor(ExecContext* ctx, return std::unique_ptr(new ExecutorType(ctx, typed_func, options)); } +} // namespace + +Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* output) { + DCHECK_NE(nullptr, output); + DCHECK_GT(output->buffers.size(), 0); + + if (output->type->id() == Type::NA) { + // Null output type is a no-op (rare when this would happen but we at least + // will test for it) + return Status::OK(); + } + + // This function is ONLY able to write into output with non-zero offset + // when the bitmap is preallocated. This could be a DCHECK but returning + // error Status for now for emphasis + if (output->offset != 0 && output->buffers[0] == nullptr) { + return Status::Invalid( + "Can only propagate nulls into pre-allocated memory " + "when the output offset is non-zero"); + } + NullPropagator propagator(ctx, batch, output); + return propagator.Execute(); +} + Result> FunctionExecutor::Make( ExecContext* ctx, const Function* func, const FunctionOptions* options) { switch (func->kind()) { case Function::SCALAR: - return MakeExecutor(ctx, func, options); + return MakeExecutor(ctx, func, options); case Function::VECTOR: - return MakeExecutor(ctx, func, options); + return MakeExecutor(ctx, func, options); case Function::SCALAR_AGGREGATE: - return MakeExecutor(ctx, func, options); + return MakeExecutor(ctx, func, options); default: DCHECK(false); return nullptr; diff --git a/cpp/src/arrow/compute/kernels/scalar_string.cc b/cpp/src/arrow/compute/kernels/scalar_string.cc index 0a1c9a0ab4e..3f83a9f7a41 100644 --- a/cpp/src/arrow/compute/kernels/scalar_string.cc +++ b/cpp/src/arrow/compute/kernels/scalar_string.cc @@ -131,13 +131,9 @@ struct UTF8Transform { ctx->Allocate(output_ncodeunits_max)); output->buffers[2] = values_buffer; - // We could reuse the indices if the data is all ascii, benchmarking showed this - // not to matter. - // output->buffers[1] = input.buffers[1]; - KERNEL_ASSIGN_OR_RAISE(output->buffers[1], ctx, - ctx->Allocate((input_nstrings + 1) * sizeof(offset_type))); - uint8_t* output_str = output->buffers[2]->mutable_data(); + // String offsets are preallocated offset_type* output_string_offsets = output->GetMutableValues(1); + uint8_t* output_str = output->buffers[2]->mutable_data(); offset_type output_ncodeunits = 0; output_string_offsets[0] = 0; @@ -200,7 +196,7 @@ struct UTF8Upper : UTF8Transform> { template struct UTF8Lower : UTF8Transform> { - static uint32_t TransformCodepoint(uint32_t codepoint) { + inline static uint32_t TransformCodepoint(uint32_t codepoint) { return codepoint <= kMaxCodepointLookup ? lut_lower_codepoint[codepoint] : utf8proc_tolower(codepoint); } @@ -910,22 +906,18 @@ struct SplitBaseTransform { const ArrayData& input = *batch[0].array(); ArrayType input_boxed(batch[0].array()); - string_offset_type input_nstrings = static_cast(input.length); - BuilderType builder(input.type, ctx->memory_pool()); // a slight overestimate of the data needed KERNEL_RETURN_IF_ERROR(ctx, builder.ReserveData(input_boxed.total_values_length())); // the minimum amount of strings needed KERNEL_RETURN_IF_ERROR(ctx, builder.Resize(input.length)); - // ideally we do not allocate this, see - // https://issues.apache.org/jira/browse/ARROW-10207 - ListOffsetsBuilderType list_offsets_builder(ctx->memory_pool()); - KERNEL_RETURN_IF_ERROR(ctx, list_offsets_builder.Resize(input_nstrings)); ArrayData* output_list = out->mutable_array(); + // list offsets were preallocated + auto* list_offsets = output_list->GetMutableValues(1); + DCHECK_NE(list_offsets, nullptr); // initial value - KERNEL_RETURN_IF_ERROR( - ctx, list_offsets_builder.Append(static_cast(0))); + *list_offsets++ = 0; KERNEL_RETURN_IF_ERROR( ctx, VisitArrayDataInline( @@ -936,18 +928,14 @@ struct SplitBaseTransform { std::numeric_limits::max())) { return Status::CapacityError("List offset does not fit into 32 bit"); } - RETURN_NOT_OK(list_offsets_builder.Append( - static_cast(builder.length()))); + *list_offsets++ = static_cast(builder.length()); return Status::OK(); }, [&]() { // null value is already taken from input - RETURN_NOT_OK(list_offsets_builder.Append( - static_cast(builder.length()))); + *list_offsets++ = static_cast(builder.length()); return Status::OK(); })); - // assign list indices - KERNEL_RETURN_IF_ERROR(ctx, list_offsets_builder.Finish(&output_list->buffers[1])); // assign list child data std::shared_ptr string_array; KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_array)); @@ -1282,13 +1270,22 @@ void AddBinaryLength(FunctionRegistry* registry) { } template