diff --git a/velox/functions/remote/client/RemoteVectorFunction.cpp b/velox/functions/remote/client/RemoteVectorFunction.cpp index b4a68a08403..ff838c42128 100644 --- a/velox/functions/remote/client/RemoteVectorFunction.cpp +++ b/velox/functions/remote/client/RemoteVectorFunction.cpp @@ -18,6 +18,7 @@ #include "velox/expression/VectorFunction.h" #include "velox/functions/remote/if/GetSerde.h" +#include "velox/serializers/PrestoSerializer.h" #include "velox/type/fbhive/HiveTypeSerializer.h" namespace facebook::velox::functions { @@ -28,6 +29,16 @@ std::string serializeType(const TypePtr& type) { return type::fbhive::HiveTypeSerializer::serialize(type); } +std::unique_ptr getOptions(remote::PageFormat format) { + if (format == remote::PageFormat::PRESTO_PAGE) { + auto options = std::make_unique< + serializer::presto::PrestoVectorSerde::PrestoOptions>(); + options->preserveEncodings = true; + return options; + } + return std::make_unique(); +} + } // namespace RemoteVectorFunction::RemoteVectorFunction( @@ -36,6 +47,7 @@ RemoteVectorFunction::RemoteVectorFunction( const RemoteVectorFunctionMetadata& metadata) : functionName_(functionName), serdeFormat_(metadata.serdeFormat), + serdeOptions_(getOptions(serdeFormat_)), serde_(getSerde(serdeFormat_)) { std::vector types; types.reserve(inputArgs.size()); @@ -91,8 +103,8 @@ void RemoteVectorFunction::applyRemote( requestInputs->pageFormat_ref() = serdeFormat_; // TODO: serialize only active rows. - requestInputs->payload_ref() = rowVectorToIOBuf( - remoteRowVector, rows.end(), *context.pool(), serde_.get()); + requestInputs->payload_ref() = rowVectorToIOBufBatch( + remoteRowVector, *context.pool(), serde_.get(), serdeOptions_.get()); std::unique_ptr remoteResponse; diff --git a/velox/functions/remote/client/RemoteVectorFunction.h b/velox/functions/remote/client/RemoteVectorFunction.h index 677b5acb153..abfc5ccfa11 100644 --- a/velox/functions/remote/client/RemoteVectorFunction.h +++ b/velox/functions/remote/client/RemoteVectorFunction.h @@ -64,6 +64,7 @@ class RemoteVectorFunction : public exec::VectorFunction { const std::string functionName_; remote::PageFormat serdeFormat_; + std::unique_ptr serdeOptions_; std::unique_ptr serde_; // Structures we construct once to cache: diff --git a/velox/functions/remote/server/RemoteFunctionService.cpp b/velox/functions/remote/server/RemoteFunctionService.cpp index 0d573a8c464..29122e7af19 100644 --- a/velox/functions/remote/server/RemoteFunctionService.cpp +++ b/velox/functions/remote/server/RemoteFunctionService.cpp @@ -105,7 +105,7 @@ void RemoteFunctionServiceHandler::handleErrors( numRows, std::vector{flatVector}); result->errorPayload() = - rowVectorToIOBuf(errorRowVector, *pool_, serde.get()); + rowVectorToIOBufBatch(errorRowVector, *pool_, serde.get()); } void RemoteFunctionServiceHandler::invokeFunction( @@ -154,7 +154,7 @@ void RemoteFunctionServiceHandler::invokeFunction( result->rowCount() = outputRowVector->size(); result->pageFormat() = serdeFormat; result->payload() = - rowVectorToIOBuf(outputRowVector, rows.end(), *pool_, serde.get()); + rowVectorToIOBufBatch(outputRowVector, rows.end(), *pool_, serde.get()); auto evalErrors = evalCtx.errors(); if (evalErrors != nullptr && evalErrors->hasError()) { diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index c62dc1c6040..85f68276795 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -341,4 +341,31 @@ RowVectorPtr IOBufToRowVector( return outputVector; } +folly::IOBuf rowVectorToIOBufBatch( + const RowVectorPtr& rowVector, + memory::MemoryPool& pool, + VectorSerde* serde, + const VectorSerde::Options* options) { + return rowVectorToIOBufBatch( + rowVector, rowVector->size(), pool, serde, options); +} + +folly::IOBuf rowVectorToIOBufBatch( + const RowVectorPtr& rowVector, + vector_size_t rangeEnd, + memory::MemoryPool& pool, + VectorSerde* serde, + const VectorSerde::Options* options) { + if (serde == nullptr) { + serde = getVectorSerde(); + } + + auto serializer = serde->createBatchSerializer(&pool, options); + IOBufOutputStream stream(pool); + IndexRange range{0, rangeEnd}; + Scratch scratch; + serializer->serialize(rowVector, folly::Range(&range, 1), scratch, &stream); + return std::move(*stream.getIOBuf()); +} + } // namespace facebook::velox diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index 009da665c78..dc5751b5139 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -507,6 +507,22 @@ folly::IOBuf rowVectorToIOBuf( memory::MemoryPool& pool, VectorSerde* serde = nullptr); +/// Convenience function to serialize a single rowVector into an IOBuf using +/// BatchVectorSerializer, which preserves encodings of input vectors. +folly::IOBuf rowVectorToIOBufBatch( + const RowVectorPtr& rowVector, + memory::MemoryPool& pool, + VectorSerde* serde = nullptr, + const VectorSerde::Options* options = nullptr); + +/// Same as above but serializes up until row `rangeEnd`. +folly::IOBuf rowVectorToIOBufBatch( + const RowVectorPtr& rowVector, + vector_size_t rangeEnd, + memory::MemoryPool& pool, + VectorSerde* serde = nullptr, + const VectorSerde::Options* options = nullptr); + /// Convenience function to deserialize an IOBuf into a rowVector. If `serde` is /// nullptr, use the default installed serializer. RowVectorPtr IOBufToRowVector(