Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 98 additions & 61 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,42 @@ Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t l
if (bit_width == 1) {
return ctx->AllocateBitmap(length);
} else {
ARROW_CHECK_EQ(bit_width % 8, 0)
<< "Only bit widths with multiple of 8 are currently supported";
int64_t buffer_size = length * bit_width / 8;
int64_t buffer_size = BitUtil::BytesForBits(length * bit_width);
return ctx->Allocate(buffer_size);
}
return Status::OK();
}

bool CanPreallocate(const DataType& type) {
// There are currently cases where NullType is the output type, so we disable
// any preallocation logic when this occurs
return is_fixed_width(type.id()) && type.id() != Type::NA;
struct BufferPreallocation {
explicit BufferPreallocation(int bit_width = -1, int added_length = 0)
: bit_width(bit_width), added_length(added_length) {}

int bit_width;
int added_length;
};

void ComputeDataPreallocate(const DataType& type,
std::vector<BufferPreallocation>* widths) {
if (is_fixed_width(type.id()) && type.id() != Type::NA) {
widths->emplace_back(checked_cast<const FixedWidthType&>(type).bit_width());
return;
}
// Preallocate binary and list offsets
switch (type.id()) {
case Type::BINARY:
case Type::STRING:
case Type::LIST:
case Type::MAP:
widths->emplace_back(32, /*added_length=*/1);
return;
case Type::LARGE_BINARY:
case Type::LARGE_STRING:
case Type::LARGE_LIST:
widths->emplace_back(64, /*added_length=*/1);
return;
default:
break;
}
}

Status GetValueDescriptors(const std::vector<Datum>& args,
Expand All @@ -88,6 +112,16 @@ Status GetValueDescriptors(const std::vector<Datum>& args,

namespace detail {

Status CheckAllValues(const std::vector<Datum>& values) {
for (const auto& value : values) {
if (!value.is_value()) {
return Status::Invalid("Tried executing function with non-value type: ",
value.ToString());
}
}
return Status::OK();
}

ExecBatchIterator::ExecBatchIterator(std::vector<Datum> args, int64_t length,
int64_t max_chunksize)
: args_(std::move(args)),
Expand Down Expand Up @@ -184,6 +218,8 @@ bool ExecBatchIterator::Next(ExecBatch* batch) {
return true;
}

namespace {

bool ArrayHasNulls(const ArrayData& data) {
// As discovered in ARROW-8863 (and not only for that reason)
// ArrayData::null_count can -1 even when buffers[0] is nullptr. So we check
Expand Down Expand Up @@ -393,28 +429,6 @@ class NullPropagator {
bool bitmap_preallocated_ = false;
};

Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* output) {
DCHECK_NE(nullptr, output);
DCHECK_GT(output->buffers.size(), 0);

if (output->type->id() == Type::NA) {
// Null output type is a no-op (rare when this would happen but we at least
// will test for it)
return Status::OK();
}

// This function is ONLY able to write into output with non-zero offset
// when the bitmap is preallocated. This could be a DCHECK but returning
// error Status for now for emphasis
if (output->offset != 0 && output->buffers[0] == nullptr) {
return Status::Invalid(
"Can only propagate nulls into pre-allocated memory "
"when the output offset is non-zero");
}
NullPropagator propagator(ctx, batch, output);
return propagator.Execute();
}

std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
const std::shared_ptr<DataType>& type) {
std::vector<std::shared_ptr<Array>> arrays;
Expand All @@ -438,16 +452,6 @@ bool HaveChunkedArray(const std::vector<Datum>& values) {
return false;
}

Status CheckAllValues(const std::vector<Datum>& values) {
for (const auto& value : values) {
if (!value.is_value()) {
return Status::Invalid("Tried executing function with non-value type: ",
value.ToString());
}
}
return Status::OK();
}

template <typename FunctionType>
class FunctionExecutorImpl : public FunctionExecutor {
public:
Expand Down Expand Up @@ -499,10 +503,14 @@ class FunctionExecutorImpl : public FunctionExecutor {
if (validity_preallocated_) {
ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_.AllocateBitmap(length));
}
if (data_preallocated_) {
const auto& fw_type = checked_cast<const FixedWidthType&>(*out->type);
ARROW_ASSIGN_OR_RAISE(
out->buffers[1], AllocateDataBuffer(&kernel_ctx_, length, fw_type.bit_width()));
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
ARROW_ASSIGN_OR_RAISE(
out->buffers[i + 1],
AllocateDataBuffer(&kernel_ctx_, length + prealloc.added_length,
prealloc.bit_width));
}
}
return out;
}
Expand All @@ -523,12 +531,13 @@ class FunctionExecutorImpl : public FunctionExecutor {

int output_num_buffers_;

// If true, then the kernel writes into a preallocated data buffer
bool data_preallocated_ = false;

// If true, then memory is preallocated for the validity bitmap with the same
// strategy as the data buffer(s).
bool validity_preallocated_ = false;

// The kernel writes into data buffers preallocated for these bit widths
// (0 indicates no preallocation);
std::vector<BufferPreallocation> data_preallocated_;
};

class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
Expand Down Expand Up @@ -675,24 +684,27 @@ class ScalarExecutor : public FunctionExecutorImpl<ScalarFunction> {
output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());

// Decide if we need to preallocate memory for this kernel
data_preallocated_ = ((kernel_->mem_allocation == MemAllocation::PREALLOCATE) &&
CanPreallocate(*output_descr_.type));
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
}

// Contiguous preallocation only possible if both the VALIDITY and DATA can
// be preallocated. Otherwise, we must go chunk-by-chunk. Note that when
// the DATA cannot be preallocated, the VALIDITY may still be preallocated
// depending on the NullHandling of the kernel
// Contiguous preallocation only possible on non-nested types if all
// buffers are preallocated. Otherwise, we must go chunk-by-chunk.
//
// Some kernels are unable to write into sliced outputs, so we respect the
// kernel's attributes
// Some kernels are also unable to write into sliced outputs, so we respect the
// kernel's attributes.
preallocate_contiguous_ =
(exec_ctx_->preallocate_contiguous() && kernel_->can_write_into_slices &&
data_preallocated_ && validity_preallocated_);
validity_preallocated_ && !is_nested(output_descr_.type->id()) &&
data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
std::all_of(data_preallocated_.begin(), data_preallocated_.end(),
[](const BufferPreallocation& prealloc) {
return prealloc.bit_width >= 0;
}));
if (preallocate_contiguous_) {
DCHECK_EQ(2, output_num_buffers_);
ARROW_ASSIGN_OR_RAISE(preallocated_, PrepareOutput(total_length));
}
return Status::OK();
Expand Down Expand Up @@ -825,11 +837,12 @@ class VectorExecutor : public FunctionExecutorImpl<VectorFunction> {
output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());

// Decide if we need to preallocate memory for this kernel
data_preallocated_ = ((kernel_->mem_allocation == MemAllocation::PREALLOCATE) &&
CanPreallocate(*output_descr_.type));
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
}
return Status::OK();
}

Expand Down Expand Up @@ -901,15 +914,39 @@ Result<std::unique_ptr<FunctionExecutor>> MakeExecutor(ExecContext* ctx,
return std::unique_ptr<FunctionExecutor>(new ExecutorType(ctx, typed_func, options));
}

} // namespace

Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* output) {
DCHECK_NE(nullptr, output);
DCHECK_GT(output->buffers.size(), 0);

if (output->type->id() == Type::NA) {
// Null output type is a no-op (rare when this would happen but we at least
// will test for it)
return Status::OK();
}

// This function is ONLY able to write into output with non-zero offset
// when the bitmap is preallocated. This could be a DCHECK but returning
// error Status for now for emphasis
if (output->offset != 0 && output->buffers[0] == nullptr) {
return Status::Invalid(
"Can only propagate nulls into pre-allocated memory "
"when the output offset is non-zero");
}
NullPropagator propagator(ctx, batch, output);
return propagator.Execute();
}

Result<std::unique_ptr<FunctionExecutor>> FunctionExecutor::Make(
ExecContext* ctx, const Function* func, const FunctionOptions* options) {
switch (func->kind()) {
case Function::SCALAR:
return MakeExecutor<detail::ScalarExecutor>(ctx, func, options);
return MakeExecutor<ScalarExecutor>(ctx, func, options);
case Function::VECTOR:
return MakeExecutor<detail::VectorExecutor>(ctx, func, options);
return MakeExecutor<VectorExecutor>(ctx, func, options);
case Function::SCALAR_AGGREGATE:
return MakeExecutor<detail::ScalarAggExecutor>(ctx, func, options);
return MakeExecutor<ScalarAggExecutor>(ctx, func, options);
default:
DCHECK(false);
return nullptr;
Expand Down
59 changes: 30 additions & 29 deletions cpp/src/arrow/compute/kernels/scalar_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,9 @@ struct UTF8Transform {
ctx->Allocate(output_ncodeunits_max));
output->buffers[2] = values_buffer;

// We could reuse the indices if the data is all ascii, benchmarking showed this
// not to matter.
// output->buffers[1] = input.buffers[1];
KERNEL_ASSIGN_OR_RAISE(output->buffers[1], ctx,
ctx->Allocate((input_nstrings + 1) * sizeof(offset_type)));
uint8_t* output_str = output->buffers[2]->mutable_data();
// String offsets are preallocated
offset_type* output_string_offsets = output->GetMutableValues<offset_type>(1);
uint8_t* output_str = output->buffers[2]->mutable_data();
offset_type output_ncodeunits = 0;

output_string_offsets[0] = 0;
Expand Down Expand Up @@ -200,7 +196,7 @@ struct UTF8Upper : UTF8Transform<Type, UTF8Upper<Type>> {

template <typename Type>
struct UTF8Lower : UTF8Transform<Type, UTF8Lower<Type>> {
static uint32_t TransformCodepoint(uint32_t codepoint) {
inline static uint32_t TransformCodepoint(uint32_t codepoint) {
return codepoint <= kMaxCodepointLookup ? lut_lower_codepoint[codepoint]
: utf8proc_tolower(codepoint);
}
Expand Down Expand Up @@ -910,22 +906,18 @@ struct SplitBaseTransform {
const ArrayData& input = *batch[0].array();
ArrayType input_boxed(batch[0].array());

string_offset_type input_nstrings = static_cast<string_offset_type>(input.length);

BuilderType builder(input.type, ctx->memory_pool());
// a slight overestimate of the data needed
KERNEL_RETURN_IF_ERROR(ctx, builder.ReserveData(input_boxed.total_values_length()));
// the minimum amount of strings needed
KERNEL_RETURN_IF_ERROR(ctx, builder.Resize(input.length));

// ideally we do not allocate this, see
// https://issues.apache.org/jira/browse/ARROW-10207
ListOffsetsBuilderType list_offsets_builder(ctx->memory_pool());
KERNEL_RETURN_IF_ERROR(ctx, list_offsets_builder.Resize(input_nstrings));
ArrayData* output_list = out->mutable_array();
// list offsets were preallocated
auto* list_offsets = output_list->GetMutableValues<list_offset_type>(1);
DCHECK_NE(list_offsets, nullptr);
// initial value
KERNEL_RETURN_IF_ERROR(
ctx, list_offsets_builder.Append(static_cast<list_offset_type>(0)));
*list_offsets++ = 0;
KERNEL_RETURN_IF_ERROR(
ctx,
VisitArrayDataInline<Type>(
Expand All @@ -936,18 +928,14 @@ struct SplitBaseTransform {
std::numeric_limits<list_offset_type>::max())) {
return Status::CapacityError("List offset does not fit into 32 bit");
}
RETURN_NOT_OK(list_offsets_builder.Append(
static_cast<list_offset_type>(builder.length())));
*list_offsets++ = static_cast<list_offset_type>(builder.length());
return Status::OK();
},
[&]() {
// null value is already taken from input
RETURN_NOT_OK(list_offsets_builder.Append(
static_cast<list_offset_type>(builder.length())));
*list_offsets++ = static_cast<list_offset_type>(builder.length());
return Status::OK();
}));
// assign list indices
KERNEL_RETURN_IF_ERROR(ctx, list_offsets_builder.Finish(&output_list->buffers[1]));
// assign list child data
std::shared_ptr<Array> string_array;
KERNEL_RETURN_IF_ERROR(ctx, builder.Finish(&string_array));
Expand Down Expand Up @@ -1282,13 +1270,22 @@ void AddBinaryLength(FunctionRegistry* registry) {
}

template <template <typename> class ExecFunctor>
void MakeUnaryStringBatchKernel(std::string name, FunctionRegistry* registry,
const FunctionDoc* doc) {
void MakeUnaryStringBatchKernel(
std::string name, FunctionRegistry* registry, const FunctionDoc* doc,
MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE) {
auto func = std::make_shared<ScalarFunction>(name, Arity::Unary(), doc);
auto exec_32 = ExecFunctor<StringType>::Exec;
auto exec_64 = ExecFunctor<LargeStringType>::Exec;
DCHECK_OK(func->AddKernel({utf8()}, utf8(), exec_32));
DCHECK_OK(func->AddKernel({large_utf8()}, large_utf8(), exec_64));
{
auto exec_32 = ExecFunctor<StringType>::Exec;
ScalarKernel kernel{{utf8()}, utf8(), exec_32};
kernel.mem_allocation = mem_allocation;
DCHECK_OK(func->AddKernel(std::move(kernel)));
}
{
auto exec_64 = ExecFunctor<LargeStringType>::Exec;
ScalarKernel kernel{{large_utf8()}, large_utf8(), exec_64};
kernel.mem_allocation = mem_allocation;
DCHECK_OK(func->AddKernel(std::move(kernel)));
}
DCHECK_OK(registry->AddFunction(std::move(func)));
}

Expand Down Expand Up @@ -1455,8 +1452,12 @@ const FunctionDoc utf8_lower_doc(
} // namespace

void RegisterScalarStringAscii(FunctionRegistry* registry) {
MakeUnaryStringBatchKernel<AsciiUpper>("ascii_upper", registry, &ascii_upper_doc);
MakeUnaryStringBatchKernel<AsciiLower>("ascii_lower", registry, &ascii_lower_doc);
// ascii_upper and ascii_lower are able to reuse the original offsets buffer,
// so don't preallocate them in the output.
MakeUnaryStringBatchKernel<AsciiUpper>("ascii_upper", registry, &ascii_upper_doc,
MemAllocation::NO_PREALLOCATE);
MakeUnaryStringBatchKernel<AsciiLower>("ascii_lower", registry, &ascii_lower_doc,
MemAllocation::NO_PREALLOCATE);

AddUnaryStringPredicate<IsAscii>("string_is_ascii", registry, &string_is_ascii_doc);

Expand Down