Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void CompactRowVectorSerde::estimateSerializedSize(
}

namespace {
class CompactRowVectorSerializer : public VectorSerializer {
class CompactRowVectorSerializer : public IterativeVectorSerializer {
public:
using TRowSize = uint32_t;

Expand Down Expand Up @@ -120,7 +120,8 @@ std::string concatenatePartialRow(

} // namespace

std::unique_ptr<VectorSerializer> CompactRowVectorSerde::createSerializer(
std::unique_ptr<IterativeVectorSerializer>
CompactRowVectorSerde::createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* streamArena,
Expand Down
2 changes: 1 addition & 1 deletion velox/serializers/CompactRowSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CompactRowVectorSerde : public VectorSerde {

// This method is not used in production code. It is only used to
// support round-trip tests for deserialization.
std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down
13 changes: 7 additions & 6 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3316,9 +3316,9 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
const std::unique_ptr<folly::io::Codec> codec_;
};

class PrestoVectorSerializer : public VectorSerializer {
class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
public:
PrestoVectorSerializer(
PrestoIterativeVectorSerializer(
const RowTypePtr& rowType,
std::vector<VectorEncoding::Simple> encodings,
int32_t numRows,
Expand Down Expand Up @@ -3349,7 +3349,7 @@ class PrestoVectorSerializer : public VectorSerializer {
// Constructor that takes a row vector instead of only the types. This is
// different because then we know exactly how each vector is encoded
// (recursively).
PrestoVectorSerializer(
PrestoIterativeVectorSerializer(
const RowVectorPtr& rowVector,
StreamArena* streamArena,
bool useLosslessTimestamp,
Expand Down Expand Up @@ -3455,13 +3455,14 @@ void PrestoVectorSerde::estimateSerializedSize(
estimateSerializedSizeInt(vector->loadedVector(), rows, sizes, scratch);
}

std::unique_ptr<VectorSerializer> PrestoVectorSerde::createSerializer(
std::unique_ptr<IterativeVectorSerializer>
PrestoVectorSerde::createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
const Options* options) {
const auto prestoOptions = toPrestoOptions(options);
return std::make_unique<PrestoVectorSerializer>(
return std::make_unique<PrestoIterativeVectorSerializer>(
type,
prestoOptions.encodings,
numRows,
Expand All @@ -3484,7 +3485,7 @@ void PrestoVectorSerde::deprecatedSerializeEncoded(
const Options* options,
OutputStream* out) {
auto prestoOptions = toPrestoOptions(options);
auto serializer = std::make_unique<PrestoVectorSerializer>(
auto serializer = std::make_unique<PrestoIterativeVectorSerializer>(
vector,
streamArena,
prestoOptions.useLosslessTimestamp,
Expand Down
11 changes: 6 additions & 5 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ namespace facebook::velox::serializer::presto {
/// There are two ways to serialize data using PrestoVectorSerde:
///
/// 1. In order to append multiple RowVectors into the same serialized payload,
/// one can first create a VectorSerializer using createSerializer(), then
/// append successive RowVectors using VectorSerializer::append(). In this case,
/// since different RowVector might encode columns differently, data is always
/// flattened in the serialized payload.
/// one can first create an IterativeVectorSerializer using
/// createIterativeSerializer(), then append successive RowVectors using
/// IterativeVectorSerializer::append(). In this case, since different RowVector
/// might encode columns differently, data is always flattened in the serialized
/// payload.
///
/// Note that there are two flavors of append(), one that takes a range of rows,
/// and one that takes a list of row ids. The former is useful when serializing
Expand Down Expand Up @@ -76,7 +77,7 @@ class PrestoVectorSerde : public VectorSerde {
vector_size_t** sizes,
Scratch& scratch) override;

std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down
5 changes: 3 additions & 2 deletions velox/serializers/UnsafeRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void UnsafeRowVectorSerde::estimateSerializedSize(
}

namespace {
class UnsafeRowVectorSerializer : public VectorSerializer {
class UnsafeRowVectorSerializer : public IterativeVectorSerializer {
public:
using TRowSize = uint32_t;

Expand Down Expand Up @@ -122,7 +122,8 @@ std::string concatenatePartialRow(

} // namespace

std::unique_ptr<VectorSerializer> UnsafeRowVectorSerde::createSerializer(
std::unique_ptr<IterativeVectorSerializer>
UnsafeRowVectorSerde::createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* streamArena,
Expand Down
2 changes: 1 addition & 1 deletion velox/serializers/UnsafeRowSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class UnsafeRowVectorSerde : public VectorSerde {

// This method is not used in production code. It is only used to
// support round-trip tests for deserialization.
std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down
3 changes: 2 additions & 1 deletion velox/serializers/tests/CompactRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class CompactRowSerializerTest : public ::testing::Test,

auto arena = std::make_unique<StreamArena>(pool_.get());
auto rowType = asRowType(rowVector->type());
auto serializer = serde_->createSerializer(rowType, numRows, arena.get());
auto serializer =
serde_->createIterativeSerializer(rowType, numRows, arena.get());

Scratch scratch;
serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch);
Expand Down
4 changes: 2 additions & 2 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class PrestoSerializerTest
auto rowType = asRowType(rowVector->type());
auto numRows = rowVector->size();
auto paramOptions = getParamSerdeOptions(serdeOptions);
auto serializer =
serde_->createSerializer(rowType, numRows, arena.get(), &paramOptions);
auto serializer = serde_->createIterativeSerializer(
rowType, numRows, arena.get(), &paramOptions);
vector_size_t sizeEstimate = 0;

Scratch scratch;
Expand Down
3 changes: 2 additions & 1 deletion velox/serializers/tests/UnsafeRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class UnsafeRowSerializerTest : public ::testing::Test,

auto arena = std::make_unique<StreamArena>(pool_.get());
auto rowType = std::dynamic_pointer_cast<const RowType>(rowVector->type());
auto serializer = serde_->createSerializer(rowType, numRows, arena.get());
auto serializer =
serde_->createIterativeSerializer(rowType, numRows, arena.get());

Scratch scratch;
serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch);
Expand Down
6 changes: 3 additions & 3 deletions velox/vector/VectorStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DefaultBatchVectorSerializer : public BatchVectorSerializer {
}

StreamArena arena(pool_);
auto serializer = serde_->createSerializer(
auto serializer = serde_->createIterativeSerializer(
asRowType(vector->type()), numRows, &arena, options_);
serializer->append(vector, ranges, scratch);
serializer->flush(stream);
Expand All @@ -67,7 +67,7 @@ getNamedVectorSerdeImpl() {

} // namespace

void VectorSerializer::append(const RowVectorPtr& vector) {
void IterativeVectorSerializer::append(const RowVectorPtr& vector) {
const IndexRange allRows{0, vector->size()};
Scratch scratch;
append(vector, folly::Range(&allRows, 1), scratch);
Expand Down Expand Up @@ -144,7 +144,7 @@ void VectorStreamGroup::createStreamTree(
RowTypePtr type,
int32_t numRows,
const VectorSerde::Options* options) {
serializer_ = serde_->createSerializer(type, numRows, this, options);
serializer_ = serde_->createIterativeSerializer(type, numRows, this, options);
}

void VectorStreamGroup::append(
Expand Down
20 changes: 16 additions & 4 deletions velox/vector/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ struct IndexRange {
/// Uses successive calls to `append` to add more rows to the serialization
/// buffer. Then call `flush` to write the aggregate serialized data to an
/// OutputStream.
class VectorSerializer {
class IterativeVectorSerializer {
public:
virtual ~VectorSerializer() = default;
virtual ~IterativeVectorSerializer() = default;

/// Serialize a subset of rows in a vector.
virtual void append(
Expand Down Expand Up @@ -159,7 +159,19 @@ class VectorSerde {
///
/// This is more appropriate if the use case involves many small writes, e.g.
/// partitioning a RowVector across multiple destinations.
virtual std::unique_ptr<VectorSerializer> createSerializer(
///
/// TODO: Remove createSerializer once Presto is updated to call
/// createIterativeSerializer.
virtual std::unique_ptr<IterativeVectorSerializer> createSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
const Options* options = nullptr) {
return createIterativeSerializer(
std::move(type), numRows, streamArena, options);
}

virtual std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down Expand Up @@ -298,7 +310,7 @@ class VectorStreamGroup : public StreamArena {
const VectorSerde::Options* options = nullptr);

private:
std::unique_ptr<VectorSerializer> serializer_;
std::unique_ptr<IterativeVectorSerializer> serializer_;
VectorSerde* serde_{nullptr};
};

Expand Down
2 changes: 1 addition & 1 deletion velox/vector/tests/VectorStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class MockVectorSerde : public VectorSerde {
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes) override {}

std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down