Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions velox/functions/remote/client/RemoteVectorFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/fbhive/HiveTypeSerializer.h"

namespace facebook::velox::functions {
Expand All @@ -28,6 +29,16 @@ std::string serializeType(const TypePtr& type) {
return type::fbhive::HiveTypeSerializer::serialize(type);
}

std::unique_ptr<VectorSerde::Options> getOptions(remote::PageFormat format) {
if (format == remote::PageFormat::PRESTO_PAGE) {
auto options = std::make_unique<
serializer::presto::PrestoVectorSerde::PrestoOptions>();
options->preserveEncodings = true;
return options;
}
return std::make_unique<VectorSerde::Options>();
}

} // namespace

RemoteVectorFunction::RemoteVectorFunction(
Expand All @@ -36,6 +47,7 @@ RemoteVectorFunction::RemoteVectorFunction(
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName),
serdeFormat_(metadata.serdeFormat),
serdeOptions_(getOptions(serdeFormat_)),
serde_(getSerde(serdeFormat_)) {
std::vector<TypePtr> types;
types.reserve(inputArgs.size());
Expand Down Expand Up @@ -91,8 +103,8 @@ void RemoteVectorFunction::applyRemote(
requestInputs->pageFormat_ref() = serdeFormat_;

// TODO: serialize only active rows.
requestInputs->payload_ref() = rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), serde_.get());
requestInputs->payload_ref() = rowVectorToIOBufBatch(
remoteRowVector, *context.pool(), serde_.get(), serdeOptions_.get());

std::unique_ptr<remote::RemoteFunctionResponse> remoteResponse;

Expand Down
1 change: 1 addition & 0 deletions velox/functions/remote/client/RemoteVectorFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class RemoteVectorFunction : public exec::VectorFunction {
const std::string functionName_;

remote::PageFormat serdeFormat_;
std::unique_ptr<VectorSerde::Options> serdeOptions_;
std::unique_ptr<VectorSerde> serde_;

// Structures we construct once to cache:
Expand Down
4 changes: 2 additions & 2 deletions velox/functions/remote/server/RemoteFunctionService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void RemoteFunctionServiceHandler::handleErrors(
numRows,
std::vector<VectorPtr>{flatVector});
result->errorPayload() =
rowVectorToIOBuf(errorRowVector, *pool_, serde.get());
rowVectorToIOBufBatch(errorRowVector, *pool_, serde.get());
}

void RemoteFunctionServiceHandler::invokeFunction(
Expand Down Expand Up @@ -154,7 +154,7 @@ void RemoteFunctionServiceHandler::invokeFunction(
result->rowCount() = outputRowVector->size();
result->pageFormat() = serdeFormat;
result->payload() =
rowVectorToIOBuf(outputRowVector, rows.end(), *pool_, serde.get());
rowVectorToIOBufBatch(outputRowVector, rows.end(), *pool_, serde.get());

auto evalErrors = evalCtx.errors();
if (evalErrors != nullptr && evalErrors->hasError()) {
Expand Down
27 changes: 27 additions & 0 deletions velox/vector/VectorStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,4 +341,31 @@ RowVectorPtr IOBufToRowVector(
return outputVector;
}

folly::IOBuf rowVectorToIOBufBatch(
const RowVectorPtr& rowVector,
memory::MemoryPool& pool,
VectorSerde* serde,
const VectorSerde::Options* options) {
return rowVectorToIOBufBatch(
rowVector, rowVector->size(), pool, serde, options);
}

folly::IOBuf rowVectorToIOBufBatch(
const RowVectorPtr& rowVector,
vector_size_t rangeEnd,
memory::MemoryPool& pool,
VectorSerde* serde,
const VectorSerde::Options* options) {
if (serde == nullptr) {
serde = getVectorSerde();
}

auto serializer = serde->createBatchSerializer(&pool, options);
IOBufOutputStream stream(pool);
IndexRange range{0, rangeEnd};
Scratch scratch;
serializer->serialize(rowVector, folly::Range(&range, 1), scratch, &stream);
return std::move(*stream.getIOBuf());
}

} // namespace facebook::velox
16 changes: 16 additions & 0 deletions velox/vector/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,22 @@ folly::IOBuf rowVectorToIOBuf(
memory::MemoryPool& pool,
VectorSerde* serde = nullptr);

/// Convenience function to serialize a single rowVector into an IOBuf using
/// BatchVectorSerializer, which preserves encodings of input vectors.
folly::IOBuf rowVectorToIOBufBatch(
const RowVectorPtr& rowVector,
memory::MemoryPool& pool,
VectorSerde* serde = nullptr,
const VectorSerde::Options* options = nullptr);

/// Same as above but serializes up until row `rangeEnd`.
folly::IOBuf rowVectorToIOBufBatch(
const RowVectorPtr& rowVector,
vector_size_t rangeEnd,
memory::MemoryPool& pool,
VectorSerde* serde = nullptr,
const VectorSerde::Options* options = nullptr);

/// Convenience function to deserialize an IOBuf into a rowVector. If `serde` is
/// nullptr, use the default installed serializer.
RowVectorPtr IOBufToRowVector(
Expand Down
Loading