Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 194 additions & 14 deletions velox/functions/prestosql/FilterFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "velox/expression/VectorFunction.h"
#include "velox/functions/lib/LambdaFunctionUtil.h"
#include "velox/functions/lib/RowsTranslationUtil.h"
#include "velox/vector/FlatMapVector.h"
#include "velox/vector/FunctionVector.h"

namespace facebook::velox::functions {
Expand All @@ -34,7 +35,7 @@ class FilterFunctionBase : public exec::VectorFunction {
static vector_size_t doApply(
const SelectivityVector& rows,
const std::shared_ptr<T>& input,
const VectorPtr& lambdas,
const VectorPtr& lambda,
const std::vector<VectorPtr>& lambdaArgs,
exec::EvalCtx& context,
BufferPtr& resultOffsets,
Expand All @@ -60,7 +61,7 @@ class FilterFunctionBase : public exec::VectorFunction {
getElementToTopLevelRows(numElements, rows, input.get(), pool);

exec::LocalDecodedVector bitsDecoder(context);
auto iter = lambdas->asUnchecked<FunctionVector>()->iterator(&rows);
auto iter = lambda->asUnchecked<FunctionVector>()->iterator(&rows);
while (auto entry = iter.next()) {
auto elementRows =
toElementRows<T>(numElements, *entry.rows, input.get());
Expand Down Expand Up @@ -173,27 +174,177 @@ class ArrayFilterFunction : public FilterFunctionBase {
// - https://prestodb.io/docs/current/functions/lambda.html
// - https://prestodb.io/blog/2020/03/02/presto-lambda
class MapFilterFunction : public FilterFunctionBase {
public:
void apply(
private:
// Builds a SelectivityVector based upon FlatMapVector's inMap buffer. This
// buffer indicates whether or not a particular key is present in that map's
// row. When applying lambda functions we should avoid executing on key-value
// pairs that are not present in that particular row. SelectivityVector
// rowsToFilterOn is used to filter out those rows.
void buildInMapSelectivityVector(
SelectivityVector& rowsToFilterOn,
BufferPtr flattenedInMap,
BufferPtr inMap,
vector_size_t inMapSize,
const SelectivityVector& rows,
const vector_size_t* decodedIndices) const {
// Flatten inMap buffer.
auto* mutableFlattedInMap = flattenedInMap->asMutable<uint64_t>();
bits::fillBits(mutableFlattedInMap, 0, inMapSize, false);
auto* mutableInMap = inMap->asMutable<uint64_t>();
rows.applyToSelected([&](vector_size_t row) {
if (bits::isBitSet(mutableInMap, decodedIndices[row])) {
bits::setBit(mutableFlattedInMap, decodedIndices[row]);
}
});

// Extract flattened inMap buffer values for next lambda call.
rowsToFilterOn.clearAll();
auto bits = rowsToFilterOn.asMutableRange().bits();
bits::orBits(bits, mutableFlattedInMap, 0, rowsToFilterOn.size());
rowsToFilterOn.updateBounds();
}

// Apply filter function to vector of encoding FlatMapVector. Because the
// entirety of the map values are stored in in a list of vectors (one vector
// per key), we will need to apply the filter function on each vector and
// associated inMap buffer. Additionally, we will have to reduce the number of
// distinct keys stored in the FlatMapVector if they key list changes.
void applyFlatMapVector(
DecodedVector& decodedMap,
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const override {
VELOX_CHECK_EQ(args.size(), 2);
exec::LocalDecodedVector mapDecoder(context, *args[0], rows);
auto& decodedMap = *mapDecoder.get();
VectorPtr& result) const {
// Current map and fields
const FlatMapVector& flatMap =
*(decodedMap.base())->template as<FlatMapVector>();
auto distinctKeys = flatMap.distinctKeys();
auto mapValues = flatMap.mapValues();
auto numRows = rows.size();
BufferPtr decodedIndices =
AlignedBuffer::allocate<vector_size_t>(numRows, flatMap.pool());
auto mutableIndices = decodedIndices->asMutable<vector_size_t>();
for (int i = 0; i < decodedMap.size(); i++) {
mutableIndices[i] = decodedMap.indices()[i];
}

// Result map fields
auto filteredKeysIndices = AlignedBuffer::allocate<vector_size_t>(
distinctKeys->size(), context.pool());
std::vector<VectorPtr> filteredMapValues;
std::vector<BufferPtr> filteredInMaps;
uint64_t* filteredInMap;
auto numDistinct = 0;
auto rawIndices = filteredKeysIndices->asMutable<vector_size_t>();

// Lambda function
auto iter = args[1]->asUnchecked<FunctionVector>()->iterator(&rows);
exec::LocalDecodedVector bitsDecoder(context);
SelectivityVector rowsToFilterOn(flatMap.size());
// Selectivity vector to help ignore filtering for key-value pairs
// identified by inMap buffer. Let's allocate here to avoid during each
// iteration.
auto flattenedInMap =
AlignedBuffer::allocate<bool>(flatMap.size(), context.pool(), 0);

// Apply lambda function to each map value vector and its associated key
// from our flat map vector. If the key is not filtered out, we will copy it
// to our result vector.
while (auto entry = iter.next()) {
for (int channel = 0; channel < mapValues.size(); ++channel) {
// Only apply lambda function to values that are in the map.
buildInMapSelectivityVector(
rowsToFilterOn,
flattenedInMap,
flatMap.inMaps()[channel],
flatMap.size(),
*entry.rows,
decodedMap.indices());

// Call lambda function and decode its output bit vector. We will
// use it to determine what will persist to the final result vector.
VectorPtr lambdaResultBits;
entry.callable->apply(
rowsToFilterOn,
nullptr,
nullptr,
&context,
{
BaseVector::wrapInConstant(
flatMap.size(), channel, distinctKeys),
mapValues[channel],
},
decodedIndices,
&lambdaResultBits);
bitsDecoder.get()->decode(*lambdaResultBits);

bool isFilteredIn = false;
entry.rows->applyToSelected([&](vector_size_t row) {
row = decodedMap.indices()[row];
if (rowsToFilterOn.isValid(row) &&
!bitsDecoder.get()->isNullAt(row) &&
bitsDecoder.get()->valueAt<bool>(row)) {
// First time seeing this key; let's copy over its associated values
// vector and define a new filtered inMap buffer. Let's also note
// the index of this key for key filtering.
if (!isFilteredIn) {
filteredMapValues.push_back(
BaseVector::copy(*mapValues[channel]));
filteredInMaps.push_back(
AlignedBuffer::allocate<bool>(numRows, context.pool(), 0));
filteredInMap = filteredInMaps.back()->asMutable<uint64_t>();
rawIndices[numDistinct++] = channel;
isFilteredIn = true;
}
bits::setBit(filteredInMap, row);
}
});
}
}

// Resize filtered distinct keys indices in order to wrap in dictionary and
// create our result filtered flat map vector
filteredKeysIndices->setSize(numDistinct * sizeof(vector_size_t));
auto localResult = std::make_shared<FlatMapVector>(
context.pool(),
outputType,
nullptr,
flatMap.size(),
BaseVector::wrapInDictionary(
BufferPtr(nullptr), filteredKeysIndices, numDistinct, distinctKeys),
std::move(filteredMapValues),
std::move(filteredInMaps));

auto flatMap = flattenMap(rows, args[0], decodedMap);
// Handle wrapped encoding if necessary
if (decodedMap.isIdentityMapping()) {
context.moveOrCopyResult(localResult, rows, result);
} else {
context.moveOrCopyResult(
BaseVector::wrapInDictionary(
nullptr, decodedIndices, decodedMap.size(), localResult),
rows,
result);
}
}

VectorPtr keys = flatMap->mapKeys();
VectorPtr values = flatMap->mapValues();
// Applies filter function on traditional map vector.
void applyMapVector(
DecodedVector& decodedMap,
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const {
auto mapVector = flattenMap(rows, args[0], decodedMap);
VectorPtr keys = mapVector->mapKeys();
VectorPtr values = mapVector->mapValues();
BufferPtr resultSizes;
BufferPtr resultOffsets;
BufferPtr selectedIndices;
auto numSelected = doApply(
rows,
flatMap,
mapVector,
args[1],
{keys, values},
context,
Expand All @@ -220,9 +371,9 @@ class MapFilterFunction : public FilterFunctionBase {
true /*flattenIfRedundant*/)
: nullptr;
// Set nulls for rows not present in 'rows'.
BufferPtr newNulls = addNullsForUnselectedRows(flatMap, rows);
BufferPtr newNulls = addNullsForUnselectedRows(mapVector, rows);
auto localResult = std::make_shared<MapVector>(
flatMap->pool(),
mapVector->pool(),
outputType,
std::move(newNulls),
rows.end(),
Expand All @@ -233,6 +384,35 @@ class MapFilterFunction : public FilterFunctionBase {
context.moveOrCopyResult(localResult, rows, result);
}

public:
void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const override {
VELOX_CHECK_EQ(args.size(), 2);
exec::LocalDecodedVector mapDecoder(context, *args[0], rows);
auto& decodedMap = *mapDecoder.get();

// Flattening input maps will peel if possible, but may simply cast if the
// vector is an identify mapping.
switch (decodedMap.base()->encoding()) {
case VectorEncoding::Simple::FLAT_MAP: {
applyFlatMapVector(decodedMap, rows, args, outputType, context, result);
break;
}
case VectorEncoding::Simple::MAP: {
applyMapVector(decodedMap, rows, args, outputType, context, result);
break;
}
default:
VELOX_UNSUPPORTED(
"map_filter not supported for encoding: {}",
decodedMap.base()->encoding());
}
}

static std::vector<std::shared_ptr<exec::FunctionSignature>> signatures() {
// map(K,V), function(K,V,boolean) -> map(K,V)
return {exec::FunctionSignatureBuilder()
Expand Down
Loading
Loading