Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Add ResultSetRegistry storage [6/N] #442

Merged
merged 1 commit into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion omniscidb/ArrowStorage/ArrowStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ std::unique_ptr<AbstractDataToken> ArrowStorage::getZeroCopyBufferMemory(
const int8_t* ptr =
chunk->data()->GetValues<int8_t>(1, chunk->data()->offset * arrow_elem_size);
size_t chunk_size = chunk->length() * arrow_elem_size;
return std::make_unique<ArrowChunkDataToken>(std::move(chunk), ptr, chunk_size);
return std::make_unique<ArrowChunkDataToken>(
std::move(chunk), col_type, ptr, chunk_size);
}
}

Expand Down
5 changes: 4 additions & 1 deletion omniscidb/ArrowStorage/ArrowStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,18 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider {
class ArrowChunkDataToken : public Data_Namespace::AbstractDataToken {
public:
ArrowChunkDataToken(std::shared_ptr<arrow::Array> chunk,
const hdk::ir::Type* type,
const int8_t* ptr,
size_t size)
: chunk_(std::move(chunk)), ptr_(ptr), size_(size) {}
: chunk_(std::move(chunk)), type_(type), ptr_(ptr), size_(size) {}

const int8_t* getMemoryPtr() const override { return ptr_; }
size_t getSize() const override { return size_; }
const hdk::ir::Type* getType() const override { return type_; }

private:
std::shared_ptr<arrow::Array> chunk_;
const hdk::ir::Type* type_;
const int8_t* ptr_;
size_t size_;
};
Expand Down
1 change: 1 addition & 0 deletions omniscidb/DataMgr/AbstractBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class AbstractDataToken {

virtual const int8_t* getMemoryPtr() const = 0;
virtual size_t getSize() const = 0;
virtual const hdk::ir::Type* getType() const = 0;
};

class AbstractBuffer {
Expand Down
1 change: 1 addition & 0 deletions omniscidb/DataMgr/BufferMgr/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Buffer::Buffer(BufferMgr* bm,
, token_(std::move(token)) {
pin();
setSize(token_->getSize());
initEncoder(token_->getType());
}

Buffer::~Buffer() {}
Expand Down
141 changes: 135 additions & 6 deletions omniscidb/ResultSet/ResultSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -890,20 +890,21 @@ bool ResultSet::isDirectColumnarConversionPossible() const {
bool ResultSet::isZeroCopyColumnarConversionPossible(size_t column_idx) const {
return query_mem_desc_.didOutputColumnar() &&
query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection &&
appended_storage_.empty() && storage_ &&
!colType(column_idx)->isVarLen() && appended_storage_.empty() && storage_ &&
(lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
}

bool ResultSet::isChunkedZeroCopyColumnarConversionPossible(size_t column_idx) const {
return query_mem_desc_.didOutputColumnar() &&
query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection &&
storage_ &&
!colType(column_idx)->isVarLen() && storage_ &&
(lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
}

const int8_t* ResultSet::getColumnarBuffer(size_t column_idx) const {
CHECK(isZeroCopyColumnarConversionPossible(column_idx));
return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(column_idx);
size_t slot_idx = query_mem_desc_.getSlotIndexForSingleSlotCol(column_idx);
return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(slot_idx);
}

std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffer(
Expand All @@ -917,14 +918,15 @@ std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffe
size_t rows_to_skip = drop_first_;
// RowCount value should be cached and take into account size, limit and offset
size_t rows_to_fetch = rowCount();
size_t slot_idx = query_mem_desc_.getSlotIndexForSingleSlotCol(column_idx);

if (current_storage_rows <= rows_to_skip) {
rows_to_skip -= current_storage_rows;
} else {
size_t fetch_from_current_storage =
std::min(current_storage_rows - rows_to_skip, rows_to_fetch);
retval.emplace_back(storage_->getUnderlyingBuffer() +
storage_->getColOffInBytes(column_idx) +
storage_->getColOffInBytes(slot_idx) +
colType(column_idx)->size() * rows_to_skip,
fetch_from_current_storage);
rows_to_fetch -= fetch_from_current_storage;
Expand All @@ -936,7 +938,7 @@ std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffe
break;
}
const int8_t* ptr =
storage_uptr->getUnderlyingBuffer() + storage_uptr->getColOffInBytes(column_idx);
storage_uptr->getUnderlyingBuffer() + storage_uptr->getColOffInBytes(slot_idx);
current_storage_rows = storage_uptr->binSearchRowCount();
if (current_storage_rows <= rows_to_skip) {
rows_to_skip -= current_storage_rows;
Expand All @@ -952,6 +954,132 @@ std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffe
return retval;
}

size_t ResultSet::computeVarLenOffsets(size_t col_idx, int32_t* offsets) const {
auto type = colType(col_idx);
CHECK(type->isVarLen());
size_t arr_elem_size =
type->isVarLenArray() ? type->as<hdk::ir::ArrayBaseType>()->elemType()->size() : 1;
bool lazy_fetch =
!lazy_fetch_info_.empty() && lazy_fetch_info_[col_idx].is_lazily_fetched;

size_t data_slot_idx = 0;
size_t data_slot_offs = 0;
size_t size_slot_idx = 0;
size_t size_slot_offs = 0;
// Compute required slot index.
for (size_t i = 0; i < col_idx; ++i) {
// slot offset in a row is computed for rowwise access.
if (!query_mem_desc_.didOutputColumnar()) {
data_slot_offs = advance_target_ptr_row_wise(data_slot_offs,
targets_[i],
data_slot_idx,
query_mem_desc_,
separate_varlen_storage_valid_);
}
data_slot_idx =
advance_slot(data_slot_idx, targets_[i], separate_varlen_storage_valid_);
}
if (!separate_varlen_storage_valid_ && !lazy_fetch) {
size_slot_offs =
data_slot_offs + query_mem_desc_.getPaddedSlotWidthBytes(data_slot_idx);
size_slot_idx = data_slot_idx + 1;
} else {
size_slot_idx = data_slot_idx;
size_slot_offs = data_slot_offs;
}

// Translate varlen value to its length. Return -1 for NULLs.
auto slot_val_to_length = [this, lazy_fetch, col_idx, type](
size_t storage_idx,
int64_t val,
const int8_t* size_slot_ptr,
size_t size_slot_sz) -> int32_t {
if (separate_varlen_storage_valid_ && !targets_[col_idx].is_agg) {
if (val >= 0) {
const auto& varlen_buffer_for_storage = serialized_varlen_buffer_[storage_idx];
return varlen_buffer_for_storage[val].size();
}
return -1;
}

if (lazy_fetch) {
auto& frag_col_buffers = getColumnFrag(storage_idx, col_idx, val);
bool is_end{false};
if (type->isString()) {
VarlenDatum vd;
ChunkIter_get_nth(reinterpret_cast<ChunkIter*>(const_cast<int8_t*>(
frag_col_buffers[lazy_fetch_info_[col_idx].local_col_id])),
val,
false,
&vd,
&is_end);
CHECK(!is_end);
return vd.is_null ? -1 : vd.length;
} else {
ArrayDatum ad;
ChunkIter_get_nth(reinterpret_cast<ChunkIter*>(const_cast<int8_t*>(
frag_col_buffers[lazy_fetch_info_[col_idx].local_col_id])),
val,
&ad,
&is_end);
CHECK(!is_end);
return ad.is_null ? -1 : ad.length;
}
}

if (val)
return read_int_from_buff(size_slot_ptr, size_slot_sz);
return -1;
};

offsets[0] = 0;
size_t row_idx = 0;
ResultSetRowIterator iter(this);
++iter;
const auto data_elem_size = query_mem_desc_.getPaddedSlotWidthBytes(data_slot_idx);
const auto size_elem_size = query_mem_desc_.getPaddedSlotWidthBytes(size_slot_idx);
while (iter.global_entry_idx_valid_) {
const auto storage_lookup_result = findStorage(iter.global_entry_idx_);
auto storage = storage_lookup_result.storage_ptr;
auto local_entry_idx = storage_lookup_result.fixedup_entry_idx;

const int8_t* elem_ptr = nullptr;
const int8_t* size_ptr = nullptr;
if (query_mem_desc_.didOutputColumnar()) {
auto col_ptr =
storage->buff_ + storage->query_mem_desc_.getColOffInBytes(data_slot_idx);
elem_ptr = col_ptr + data_elem_size * local_entry_idx;
auto size_col_ptr =
storage->buff_ + storage->query_mem_desc_.getColOffInBytes(size_slot_idx);
size_ptr = size_col_ptr + size_elem_size * local_entry_idx;
} else {
auto keys_ptr = row_ptr_rowwise(storage->buff_, query_mem_desc_, local_entry_idx);
const auto key_bytes_with_padding =
align_to_int64(get_key_bytes_rowwise(query_mem_desc_));
elem_ptr = keys_ptr + key_bytes_with_padding + data_slot_offs;
size_ptr = keys_ptr + key_bytes_with_padding + size_slot_offs;
}

auto val = read_int_from_buff(elem_ptr, data_elem_size);
auto elem_length = slot_val_to_length(
storage_lookup_result.storage_idx, val, size_ptr, size_elem_size);
if (elem_length < 0) {
if (type->isString()) {
offsets[row_idx + 1] = offsets[row_idx];
} else {
offsets[row_idx + 1] = -std::abs(offsets[row_idx]);
}
} else {
offsets[row_idx + 1] = std::abs(offsets[row_idx]) + elem_length * arr_elem_size;
}

++iter;
++row_idx;
}

return row_idx + 1;
}

// Returns a bitmap (and total number) of all single slot targets
std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
std::vector<bool> target_bitmap(targets_.size(), true);
Expand All @@ -976,7 +1104,8 @@ std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() con
*
* The final goal is to remove the need for such selection, but at the moment for any
* target that doesn't qualify for direct columnarization, we use the traditional
* result set's iteration to handle it (e.g., count distinct, approximate count distinct)
* result set's iteration to handle it (e.g., count distinct, approximate count
* distinct)
*/
std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
const {
Expand Down
6 changes: 6 additions & 0 deletions omniscidb/ResultSet/ResultSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,12 @@ class ResultSet {
std::vector<std::pair<const int8_t*, size_t>> getChunkedColumnarBuffer(
size_t column_idx) const;

// For columns with varlen data writes element offsets to the output buffer.
// It is 0 for the first element and cumulative length of all previous elements
// for others. The total length is written at the end.
// Returns the number of values written.
size_t computeVarLenOffsets(size_t col_idx, int32_t* offsets) const;

QueryDescriptionType getQueryDescriptionType() const {
return query_mem_desc_.getQueryDescriptionType();
}
Expand Down
42 changes: 8 additions & 34 deletions omniscidb/ResultSet/ResultSetIteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,41 +892,15 @@ const VarlenOutputInfo* ResultSet::getVarlenOutputInfo(const size_t entry_idx) c
void ResultSet::copyColumnIntoBuffer(const size_t column_idx,
int8_t* output_buffer,
const size_t output_buffer_size) const {
CHECK(isDirectColumnarConversionPossible());
CHECK_LT(column_idx, query_mem_desc_.getSlotCount());
CHECK(output_buffer_size > 0);
CHECK(output_buffer);
const auto column_width_size = query_mem_desc_.getPaddedSlotWidthBytes(column_idx);
const size_t slot_idx = query_mem_desc_.getSlotIndexForSingleSlotCol(column_idx);
const auto column_width_size = query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
auto chunks = getChunkedColumnarBuffer(column_idx);
size_t out_buff_offset = 0;

// the main storage:
const size_t crt_storage_row_count = storage_->query_mem_desc_.getEntryCount();
const size_t crt_buffer_size = crt_storage_row_count * column_width_size;
const size_t column_offset = storage_->query_mem_desc_.getColOffInBytes(column_idx);
const int8_t* storage_buffer = storage_->getUnderlyingBuffer() + column_offset;
CHECK(crt_buffer_size <= output_buffer_size);
std::memcpy(output_buffer, storage_buffer, crt_buffer_size);

out_buff_offset += crt_buffer_size;

// the appended storages:
for (size_t i = 0; i < appended_storage_.size(); i++) {
const size_t crt_storage_row_count =
appended_storage_[i]->query_mem_desc_.getEntryCount();
if (crt_storage_row_count == 0) {
// skip an empty appended storage
continue;
}
CHECK_LT(out_buff_offset, output_buffer_size);
const size_t crt_buffer_size = crt_storage_row_count * column_width_size;
const size_t column_offset =
appended_storage_[i]->query_mem_desc_.getColOffInBytes(column_idx);
const int8_t* storage_buffer =
appended_storage_[i]->getUnderlyingBuffer() + column_offset;
CHECK(out_buff_offset + crt_buffer_size <= output_buffer_size);
std::memcpy(output_buffer + out_buff_offset, storage_buffer, crt_buffer_size);

out_buff_offset += crt_buffer_size;
for (auto& chunk : chunks) {
size_t bytes_to_copy = chunk.second * column_width_size;
CHECK_LE(out_buff_offset + bytes_to_copy, output_buffer_size);
std::memcpy(output_buffer + out_buff_offset, chunk.first, bytes_to_copy);
out_buff_offset += bytes_to_copy;
}
}

Expand Down
Loading