Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions velox/expression/ComplexWriterTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class VectorWriterBase {
}
virtual void commit(bool isSet) = 0;
virtual void ensureSize(vector_size_t size) = 0;
// Resizes the vector to exactly the specified size. Unlike ensureSize(),
// this can shrink the vector. Implementations should update internal
// pointers after resize to ensure they remain valid.
virtual void resizeTo(vector_size_t size) = 0;
virtual void finish() {}
// Implementations that write variable length data or complex types should
// override this to reset their state and that of their children.
Expand Down Expand Up @@ -789,6 +793,7 @@ class MapWriter {
auto index = indexOfLast();
if constexpr (!requires_commit<K>) {
VELOX_DCHECK(provide_std_interface<K>);
keysVector_->setNull(index, false);
return keysWriter_->data_[index];
} else {
keyNeedsCommit_ = true;
Expand Down
70 changes: 66 additions & 4 deletions velox/expression/VectorWriters.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ struct VectorWriter : public VectorWriterBase {
}
}

// Resizes the vector to exactly the specified size, updating the data_
// pointer to ensure it remains valid after potential reallocation.
void resizeTo(vector_size_t size) override {
vector_->resize(size, /*setNotNull*/ false);
data_ = vector_->mutableRawValues();
}

VectorWriter() {}

FOLLY_ALWAYS_INLINE exec_out_t& current() {
Expand Down Expand Up @@ -104,7 +111,8 @@ struct VectorWriter<Array<V>> : public VectorWriterBase {

// This should be called once all rows are processed.
void finish() override {
writer_.elementsVector()->resize(writer_.valuesOffset_);
// Use resizeTo() to transparently update data_ pointer after resize.
childWriter_.resizeTo(writer_.valuesOffset_);
arrayVector_ = nullptr;
childWriter_.finish();
}
Expand All @@ -126,6 +134,11 @@ struct VectorWriter<Array<V>> : public VectorWriterBase {
}
}

void resizeTo(vector_size_t size) override {
writer_.elementsVector()->resize(size);
childWriter_.resizeTo(size);
}

// Commit a not null value.
void commit() {
arrayVector_->setOffsetAndSize(
Expand Down Expand Up @@ -182,9 +195,9 @@ struct VectorWriter<Map<K, V>> : public VectorWriterBase {

// This should be called once all rows are processed.
void finish() override {
// Downsize to actual used size.
writer_.keysVector_->resize(writer_.innerOffset_);
writer_.valuesVector_->resize(writer_.innerOffset_);
// Use resizeTo() to transparently update data_ pointers after resize.
keyWriter_.resizeTo(writer_.innerOffset_);
valWriter_.resizeTo(writer_.innerOffset_);
mapVector_ = nullptr;
keyWriter_.finish();
valWriter_.finish();
Expand All @@ -207,6 +220,13 @@ struct VectorWriter<Map<K, V>> : public VectorWriterBase {
}
}

void resizeTo(vector_size_t size) override {
writer_.keysVector_->resize(size);
writer_.valuesVector_->resize(size);
keyWriter_.resizeTo(size);
valWriter_.resizeTo(size);
}

// Commit a not null value.
void commit() {
mapVector_->setOffsetAndSize(
Expand Down Expand Up @@ -288,6 +308,11 @@ struct VectorWriter<Row<T...>> : public VectorWriterBase {
}
}

void resizeTo(vector_size_t size) override {
resizeVectorWritersTo<0>(size);
rowVector_->resize(size, /*setNotNull*/ false);
}

void finalizeNull() override {
// TODO: we could pull the logic out to here also.
writer_.finalizeNullOnChildren();
Expand Down Expand Up @@ -333,6 +358,16 @@ struct VectorWriter<Row<T...>> : public VectorWriterBase {
}
}

template <size_t I>
void resizeVectorWritersTo(size_t size) {
if constexpr (I == sizeof...(T)) {
return;
} else {
std::get<I>(writer_.childrenWriters_).resizeTo(size);
resizeVectorWritersTo<I + 1>(size);
}
}

template <size_t I>
void initVectorWriters() {
if constexpr (I == sizeof...(T)) {
Expand Down Expand Up @@ -371,6 +406,10 @@ struct VectorWriter<
}
}

void resizeTo(vector_size_t size) override {
proxy_.vector_->resize(size, /*setNotNull=*/false);
}

VectorWriter() {}

exec_out_t& current() {
Expand Down Expand Up @@ -423,6 +462,10 @@ struct VectorWriter<T, std::enable_if_t<std::is_same_v<T, bool>>>
}
}

void resizeTo(vector_size_t size) override {
vector_->resize(size, /*setNotNull*/ false);
}

VectorWriter() {}

bool& current() {
Expand Down Expand Up @@ -471,6 +514,10 @@ struct VectorWriter<std::shared_ptr<T>> : public VectorWriterBase {
}
}

void resizeTo(vector_size_t size) override {
vector_->resize(size, /*setNotNull*/ false);
}

vector_t& vector() {
return *vector_;
}
Expand Down Expand Up @@ -565,6 +612,14 @@ struct VectorWriter<Generic<T, comparable, orderable>>
}
}

void resizeTo(vector_size_t size) override {
if (castType_) {
castWriter_->resizeTo(size);
} else {
vector_->resize(size, false);
}
}

FOLLY_ALWAYS_INLINE exec_out_t& current() {
return writer_;
}
Expand Down Expand Up @@ -742,6 +797,13 @@ struct VectorWriter<DynamicRow, void> : public VectorWriterBase {
}
}

void resizeTo(vector_size_t size) override {
for (int i = 0; i < writer_.childrenCount_; ++i) {
writer_.childrenWriters_[i]->resizeTo(size);
}
rowVector_->resize(size, /*setNotNull*/ false);
}

void finalizeNull() override {
writer_.finalizeNullOnChildren();
}
Expand Down
Loading