Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Add ResultSetRegistry storage [2/N] #348

Merged
merged 1 commit into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions omniscidb/ArrowStorage/ArrowStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,22 +546,21 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
static_cast<size_t>(at->num_rows()) - frag.offset)
: first_frag_size;

auto meta = std::make_shared<ChunkMetadata>();
meta->type = col_info->type;
meta->numElements = frag.row_count;
size_t num_bytes;
if (col_type->isFixedLenArray()) {
meta->numBytes = frag.row_count * col_type->size();
num_bytes = frag.row_count * col_type->size();
} else if (col_type->isVarLenArray()) {
meta->numBytes =
num_bytes =
computeTotalStringsLength(col_arr, frag.offset, frag.row_count);
} else {
meta->numBytes = frag.row_count * col_type->size();
num_bytes = frag.row_count * col_type->size();
}
auto meta = std::make_shared<ChunkMetadata>(
col_info->type, num_bytes, frag.row_count);

computeStats(
meta->fillChunkStats(computeStats(
col_arr->Slice(frag.offset, frag.row_count * elems_count),
col_type,
meta->chunkStats);
col_type));
frag.metadata[col_idx] = meta;
}
}); // each fragment
Expand All @@ -574,14 +573,14 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
frag_idx ? std::min(table.fragment_size,
static_cast<size_t>(at->num_rows()) - frag.offset)
: first_frag_size;
auto meta = std::make_shared<ChunkMetadata>();
meta->type = col_info->type;
meta->numElements = frag.row_count;
CHECK(col_type->isText());
meta->numBytes =
computeTotalStringsLength(col_arr, frag.offset, frag.row_count);
meta->chunkStats.has_nulls =
col_arr->Slice(frag.offset, frag.row_count)->null_count();
auto meta = std::make_shared<ChunkMetadata>(
col_info->type,
computeTotalStringsLength(col_arr, frag.offset, frag.row_count),
frag.row_count);
meta->fillStringChunkStats(
col_arr->Slice(frag.offset, frag.row_count)->null_count());

frag.metadata[col_idx] = meta;
}
}
Expand All @@ -607,12 +606,14 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
last_frag.row_count += first_frag.row_count;
for (size_t col_idx = 0; col_idx < last_frag.metadata.size(); ++col_idx) {
auto col_type = getColumnInfo(db_id_, table_id, col_idx + 1)->type;
last_frag.metadata[col_idx]->numElements +=
first_frag.metadata[col_idx]->numElements;
last_frag.metadata[col_idx]->numBytes += first_frag.metadata[col_idx]->numBytes;
mergeStats(last_frag.metadata[col_idx]->chunkStats,
first_frag.metadata[col_idx]->chunkStats,
col_type);
size_t num_elems = last_frag.metadata[col_idx]->numElements() +
first_frag.metadata[col_idx]->numElements();
size_t num_bytes = last_frag.metadata[col_idx]->numBytes() +
first_frag.metadata[col_idx]->numBytes();
auto stats = last_frag.metadata[col_idx]->chunkStats();
mergeStats(stats, first_frag.metadata[col_idx]->chunkStats(), col_type);
last_frag.metadata[col_idx] =
std::make_shared<ChunkMetadata>(col_type, num_bytes, num_elems, stats);
}
start_frag = 1;
}
Expand Down Expand Up @@ -904,9 +905,8 @@ void ArrowStorage::compareSchemas(std::shared_ptr<arrow::Schema> lhs,
}
}

void ArrowStorage::computeStats(std::shared_ptr<arrow::ChunkedArray> arr,
const hdk::ir::Type* type,
ChunkStats& stats) {
ChunkStats ArrowStorage::computeStats(std::shared_ptr<arrow::ChunkedArray> arr,
const hdk::ir::Type* type) {
auto elem_type =
type->isArray() ? type->as<hdk::ir::ArrayBaseType>()->elemType() : type;
std::unique_ptr<Encoder> encoder(Encoder::Create(nullptr, elem_type));
Expand All @@ -933,7 +933,9 @@ void ArrowStorage::computeStats(std::shared_ptr<arrow::ChunkedArray> arr,
}
}

ChunkStats stats;
encoder->fillChunkStats(stats, elem_type);
return stats;
}

std::shared_ptr<arrow::Table> ArrowStorage::parseCsvFile(
Expand Down
5 changes: 2 additions & 3 deletions omniscidb/ArrowStorage/ArrowStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider {
const TableOptions& options) const;
void compareSchemas(std::shared_ptr<arrow::Schema> lhs,
std::shared_ptr<arrow::Schema> rhs);
void computeStats(std::shared_ptr<arrow::ChunkedArray> arr,
const hdk::ir::Type* type,
ChunkStats& stats);
ChunkStats computeStats(std::shared_ptr<arrow::ChunkedArray> arr,
const hdk::ir::Type* type);
std::shared_ptr<arrow::Table> parseCsvFile(const std::string& file_name,
const CsvParseOptions parse_options,
const ColumnInfoList& col_infos = {});
Expand Down
7 changes: 4 additions & 3 deletions omniscidb/DataMgr/ArrayNoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ class ArrayNoneEncoder : public Encoder {
return n - start_idx;
}

void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
Encoder::getMetadata(chunkMetadata); // call on parent class
chunkMetadata->fillChunkStats(elem_min, elem_max, has_nulls);
std::shared_ptr<ChunkMetadata> getMetadata() override {
auto res = Encoder::getMetadata();
res->fillChunkStats(elem_min, elem_max, has_nulls);
return res;
}

// Only called from the executor for synthesized meta-information.
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/Chunk/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ ChunkIter Chunk::begin_iterator(const std::shared_ptr<ChunkMetadata>& chunk_meta
it.end_pos = buffer_->getMemoryPtr() + buffer_->size();
it.second_buf = nullptr;
}
it.num_elems = chunk_metadata->numElements;
it.num_elems = chunk_metadata->numElements();
return it;
}
} // namespace Chunk_NS
152 changes: 105 additions & 47 deletions omniscidb/DataMgr/ChunkMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,76 +154,134 @@ inline void mergeStats(ChunkStats& lhs,
}
}

struct ChunkMetadata {
const hdk::ir::Type* type;
size_t numBytes;
size_t numElements;
ChunkStats chunkStats;
class ChunkMetadata {
public:
using StatsMaterializeFn = std::function<void(ChunkStats&)>;

ChunkMetadata(const hdk::ir::Type* type,
const size_t num_bytes,
const size_t num_elements)
: type_(type), num_bytes_(num_bytes), num_elements_(num_elements) {}

ChunkMetadata(const hdk::ir::Type* type,
const size_t num_bytes,
const size_t num_elements,
const ChunkStats& chunk_stats)
: type_(type)
, num_bytes_(num_bytes)
, num_elements_(num_elements)
, chunk_stats_(chunk_stats) {}

ChunkMetadata(const hdk::ir::Type* type,
const size_t num_bytes,
const size_t num_elements,
StatsMaterializeFn stats_materialize_fn)
: type_(type)
, num_bytes_(num_bytes)
, num_elements_(num_elements)
, stats_materialize_fn_(std::move(stats_materialize_fn)) {}

const hdk::ir::Type* type() const { return type_; }
size_t numBytes() const { return num_bytes_; }
size_t numElements() const { return num_elements_; }
const ChunkStats& chunkStats() const {
maybeMaterializeStats();
return chunk_stats_;
}

#ifndef __CUDACC__
std::string dump() const {
std::string res = "type: " + type_->toString() +
" numBytes: " + to_string(num_bytes_) + " numElements " +
to_string(num_elements_);
auto elem_type =
type->isArray() ? type->as<hdk::ir::ArrayBaseType>()->elemType() : type;
// Unencoded strings have no min/max.
if (elem_type->isString()) {
return "type: " + type->toString() + " numBytes: " + to_string(numBytes) +
" numElements " + to_string(numElements) + " min: <invalid>" +
" max: <invalid>" + " has_nulls: " + to_string(chunkStats.has_nulls);
type_->isArray() ? type_->as<hdk::ir::ArrayBaseType>()->elemType() : type_;
if (stats_materialize_fn_) {
res +=
" min: <not materialized> max: <not materialized> has_nulls: <not "
"materialized>";
} else if (elem_type->isString()) {
// Unencoded strings have no min/max.
res += " min: <invalid> max: <invalid> has_nulls: " +
to_string(chunk_stats_.has_nulls);
} else if (elem_type->isExtDictionary()) {
return "type: " + type->toString() + " numBytes: " + to_string(numBytes) +
" numElements " + to_string(numElements) +
" min: " + to_string(chunkStats.min.intval) +
" max: " + to_string(chunkStats.max.intval) +
" has_nulls: " + to_string(chunkStats.has_nulls);
res += " min: " + to_string(chunk_stats_.min.intval) +
" max: " + to_string(chunk_stats_.max.intval) +
" has_nulls: " + to_string(chunk_stats_.has_nulls);
} else {
return "type: " + type->toString() + " numBytes: " + to_string(numBytes) +
" numElements " + to_string(numElements) +
" min: " + DatumToString(chunkStats.min, elem_type) +
" max: " + DatumToString(chunkStats.max, elem_type) +
" has_nulls: " + to_string(chunkStats.has_nulls);
res += " min: " + DatumToString(chunk_stats_.min, elem_type) +
" max: " + DatumToString(chunk_stats_.max, elem_type) +
" has_nulls: " + to_string(chunk_stats_.has_nulls);
}

return res;
}

std::string toString() const {
return dump();
}
#endif

ChunkMetadata(const hdk::ir::Type* type_,
const size_t num_bytes,
const size_t num_elements,
const ChunkStats& chunk_stats)
: type(type_)
, numBytes(num_bytes)
, numElements(num_elements)
, chunkStats(chunk_stats) {}

ChunkMetadata() {}

template <typename T>
void fillChunkStats(const T min, const T max, const bool has_nulls) {
::fillChunkStats(chunkStats, type, min, max, has_nulls);
StatsMaterializeFn().swap(stats_materialize_fn_);
::fillChunkStats(chunk_stats_, type_, min, max, has_nulls);
}

void fillChunkStats(const ChunkStats& new_stats) {
StatsMaterializeFn().swap(stats_materialize_fn_);
chunk_stats_ = new_stats;
}

void fillChunkStats(const Datum min, const Datum max, const bool has_nulls) {
chunkStats.has_nulls = has_nulls;
chunkStats.min = min;
chunkStats.max = max;
StatsMaterializeFn().swap(stats_materialize_fn_);
chunk_stats_.has_nulls = has_nulls;
chunk_stats_.min = min;
chunk_stats_.max = max;
}

void fillStringChunkStats(const bool has_nulls) {
StatsMaterializeFn().swap(stats_materialize_fn_);
chunk_stats_.has_nulls = has_nulls;
#ifndef __CUDACC__
chunk_stats_.min.stringval = nullptr;
chunk_stats_.max.stringval = nullptr;
#endif
}

bool operator==(const ChunkMetadata& that) const {
return type->equal(that.type) && numBytes == that.numBytes &&
numElements == that.numElements &&
DatumEqual(
chunkStats.min,
that.chunkStats.min,
type->isArray() ? type->as<hdk::ir::ArrayBaseType>()->elemType() : type) &&
DatumEqual(
chunkStats.max,
that.chunkStats.max,
type->isArray() ? type->as<hdk::ir::ArrayBaseType>()->elemType() : type) &&
chunkStats.has_nulls == that.chunkStats.has_nulls;
if (!type_->equal(that.type_) || num_bytes_ != that.num_bytes_ ||
num_elements_ != that.num_elements_) {
return false;
}

maybeMaterializeStats();
that.maybeMaterializeStats();

return DatumEqual(chunk_stats_.min,
that.chunk_stats_.min,
type_->isArray() ? type_->as<hdk::ir::ArrayBaseType>()->elemType()
: type_) &&
DatumEqual(chunk_stats_.max,
that.chunk_stats_.max,
type_->isArray() ? type_->as<hdk::ir::ArrayBaseType>()->elemType()
: type_) &&
chunk_stats_.has_nulls == that.chunk_stats_.has_nulls;
}

private:
void maybeMaterializeStats() const {
if (stats_materialize_fn_) {
stats_materialize_fn_(chunk_stats_);
StatsMaterializeFn().swap(stats_materialize_fn_);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be some logging or fatal error for failure mode here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect any logging and exceptions to be in the materializer because that's where synchronization happens. But I don't expect stats computation ever fail because we can always use the default stats.

}

const hdk::ir::Type* type_;
size_t num_bytes_;
size_t num_elements_;
mutable ChunkStats chunk_stats_;
mutable StatsMaterializeFn stats_materialize_fn_;
};

inline int64_t extract_min_stat_int_type(const ChunkStats& stats,
Expand Down
7 changes: 4 additions & 3 deletions omniscidb/DataMgr/DateDaysEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class DateDaysEncoder : public Encoder {
resetChunkStats();
}

void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
Encoder::getMetadata(chunkMetadata);
chunkMetadata->fillChunkStats(dataMin, dataMax, has_nulls);
std::shared_ptr<ChunkMetadata> getMetadata() override {
auto res = Encoder::getMetadata();
res->fillChunkStats(dataMin, dataMax, has_nulls);
return res;
}

// Only called from the executor for synthesized meta-information.
Expand Down
6 changes: 2 additions & 4 deletions omniscidb/DataMgr/Encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ Encoder::Encoder(Data_Namespace::AbstractBuffer* buffer)
, decimal_overflow_validator_(buffer ? buffer->type() : nullptr)
, date_days_overflow_validator_(buffer ? buffer->type() : nullptr){};

void Encoder::getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) {
chunkMetadata->type = buffer_->type();
chunkMetadata->numBytes = buffer_->size();
chunkMetadata->numElements = num_elems_;
std::shared_ptr<ChunkMetadata> Encoder::getMetadata() {
return std::make_shared<ChunkMetadata>(buffer_->type(), buffer_->size(), num_elems_);
}
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/Encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class Encoder {
Encoder(Data_Namespace::AbstractBuffer* buffer);
virtual ~Encoder() {}

virtual void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata);
virtual std::shared_ptr<ChunkMetadata> getMetadata();
// Only called from the executor for synthesized meta-information.
virtual std::shared_ptr<ChunkMetadata> getMetadata(const hdk::ir::Type* type) = 0;
virtual void updateStats(const int64_t val, const bool is_null) = 0;
Expand Down
7 changes: 4 additions & 3 deletions omniscidb/DataMgr/FixedLengthArrayNoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ class FixedLengthArrayNoneEncoder : public Encoder {
return dataSize / array_size;
}

void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
Encoder::getMetadata(chunkMetadata); // call on parent class
chunkMetadata->fillChunkStats(elem_min, elem_max, has_nulls);
std::shared_ptr<ChunkMetadata> getMetadata() override {
auto res = Encoder::getMetadata();
res->fillChunkStats(elem_min, elem_max, has_nulls);
return res;
}

// Only called from the executor for synthesized meta-information.
Expand Down
7 changes: 4 additions & 3 deletions omniscidb/DataMgr/FixedLengthEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class FixedLengthEncoder : public Encoder {
resetChunkStats();
}

void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
Encoder::getMetadata(chunkMetadata); // call on parent class
chunkMetadata->fillChunkStats(dataMin, dataMax, has_nulls);
std::shared_ptr<ChunkMetadata> getMetadata() override {
auto res = Encoder::getMetadata();
res->fillChunkStats(dataMin, dataMax, has_nulls);
return res;
}

// Only called from the executor for synthesized meta-information.
Expand Down
7 changes: 4 additions & 3 deletions omniscidb/DataMgr/NoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ class NoneEncoder : public Encoder {
resetChunkStats();
}

void getMetadata(const std::shared_ptr<ChunkMetadata>& chunkMetadata) override {
Encoder::getMetadata(chunkMetadata); // call on parent class
chunkMetadata->fillChunkStats(dataMin, dataMax, has_nulls);
std::shared_ptr<ChunkMetadata> getMetadata() override {
auto res = Encoder::getMetadata();
res->fillChunkStats(dataMin, dataMax, has_nulls);
return res;
}

// Only called from the executor for synthesized meta-information.
Expand Down
Loading