Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Replace dictionary proxies with nested dictionaries 08/N #683

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion omniscidb/ResultSet/ResultSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ std::string ResultSet::getStrScalarVal(const ScalarTargetValue& current_scalar,
} else {
if (col_type->isExtDictionary()) {
const int32_t dict_id = col_type->as<hdk::ir::ExtDictionaryType>()->dictId();
const auto sdp = getStringDictionaryProxy(dict_id);
const auto sdp = data_mgr_ ? row_set_mem_owner_->getOrAddStringDictProxy(dict_id)
: row_set_mem_owner_->getStringDictProxy(
dict_id); // unit tests bypass the DataMgr
const auto string_id = boost::get<int64_t>(current_scalar);
oss << "idx:"
<< ((string_id == inline_int_null_value<int32_t>()) ? "null"
Expand Down
35 changes: 12 additions & 23 deletions omniscidb/StringDictionary/StringDictionaryProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ std::string StringDictionaryProxy::getString(int32_t string_id) const {
}

std::string StringDictionaryProxy::getStringUnlocked(const int32_t string_id) const {
if (string_id >= 0 && storageEntryCount() > 0) {
if (string_id < generation_) {
return string_dict_->getString(string_id);
}
unsigned const string_index = transientIdToIndex(string_id);
Expand All @@ -157,10 +157,10 @@ std::vector<std::string> StringDictionaryProxy::getStrings(
strings.reserve(string_ids.size());
std::shared_lock<std::shared_mutex> read_lock(rw_mutex_);
for (const auto string_id : string_ids) {
if (string_id >= 0) {
strings.emplace_back(string_dict_->getString(string_id));
} else if (inline_int_null_value<int32_t>() == string_id) {
if (inline_int_null_value<int32_t>() == string_id) {
strings.emplace_back("");
} else if (string_id < generation_) {
strings.emplace_back(string_dict_->getString(string_id));
} else {
unsigned const string_index = transientIdToIndex(string_id);
strings.emplace_back(*transient_string_vec_[string_index]);
Expand Down Expand Up @@ -195,7 +195,7 @@ StringDictionaryProxy::buildIntersectionTranslationMapToOtherProxyUnlocked(
std::vector<std::string> transient_lookup_strings(num_transient_entries);
std::transform(transient_string_vec_.cbegin(),
transient_string_vec_.cend(),
transient_lookup_strings.rbegin(),
transient_lookup_strings.begin(),
[](std::string const* ptr) { return *ptr; });

// This lookup may have a different snapshot of
Expand All @@ -208,14 +208,14 @@ StringDictionaryProxy::buildIntersectionTranslationMapToOtherProxyUnlocked(
// a vector of pointer-to-strings so we don't have to materialize
// transient_string_vec_ into transient_lookup_strings.

num_transient_strings_not_translated =
dest_proxy->getTransientBulkImpl(transient_lookup_strings, id_map.data(), false);
num_transient_strings_not_translated = dest_proxy->getTransientBulkImpl(
transient_lookup_strings, id_map.transientData(), false);
}

// Now map strings in dictionary
// We place non-transient strings after the transient strings
// if they exist, otherwise at index 0
int32_t* translation_map_stored_entries_ptr = id_map.storageData();
int32_t* translation_map_stored_entries_ptr = id_map.data();

auto dest_transient_lookup_callback = [dest_proxy, translation_map_stored_entries_ptr](
const std::string_view& source_string,
Expand All @@ -239,8 +239,7 @@ StringDictionaryProxy::buildIntersectionTranslationMapToOtherProxyUnlocked(
: 0UL;

const size_t num_dest_entries = dest_proxy->entryCountUnlocked();
const size_t num_total_entries =
id_map.getVectorMap().size() - 1UL /* account for skipped entry -1 */;
const size_t num_total_entries = id_map.size();
CHECK_GT(num_total_entries, 0UL);
const size_t num_strings_not_translated =
num_transient_strings_not_translated + num_persisted_strings_not_translated;
Expand Down Expand Up @@ -324,26 +323,16 @@ StringDictionaryProxy::IdMap StringDictionaryProxy::buildUnionTranslationMapToOt
}
const int32_t map_domain_start = id_map.domainStart();
const int32_t map_domain_end = id_map.domainEnd();
// First iterate over transient strings and add to dest map
// Todo (todd): Add call to fetch string_views (local) or strings (distributed)
// for all non-translated ids to avoid string-by-string fetch
for (int32_t source_string_id = map_domain_start; source_string_id < -1;
for (int32_t source_string_id = map_domain_start; source_string_id < map_domain_end;
++source_string_id) {
if (id_map[source_string_id] == StringDictionary::INVALID_STR_ID) {
const auto source_string = getStringUnlocked(source_string_id);
const auto dest_string_id = dest_proxy->getOrAddTransientUnlocked(source_string);
id_map[source_string_id] = dest_string_id;
}
}
// Now iterate over stored strings
for (int32_t source_string_id = 0; source_string_id < map_domain_end;
++source_string_id) {
if (id_map[source_string_id] == StringDictionary::INVALID_STR_ID) {
const auto source_string = string_dict_->getString(source_string_id);
const auto dest_string_id = dest_proxy->getOrAddTransientUnlocked(source_string);
id_map[source_string_id] = dest_string_id;
}
}
}
return id_map;
}
Expand Down Expand Up @@ -449,7 +438,7 @@ std::vector<int32_t> StringDictionaryProxy::getRegexpLike(const std::string& pat

std::pair<const char*, size_t> StringDictionaryProxy::getStringBytes(
int32_t string_id) const noexcept {
if (string_id >= 0) {
if (string_id < generation_) {
return string_dict_.get()->getStringBytes(string_id);
}
unsigned const string_index = transientIdToIndex(string_id);
Expand Down Expand Up @@ -525,7 +514,7 @@ class StringLocalCallback : public StringDictionary::StringCallback {
};

std::ostream& operator<<(std::ostream& os, StringDictionaryProxy::IdMap const& id_map) {
return os << "IdMap(offset_(" << id_map.offset_ << ") vector_map_"
return os << "IdMap(dict_size_(" << id_map.dict_size_ << ") vector_map_"
<< shared::printContainer(id_map.vector_map_) << ')';
}

Expand Down
33 changes: 13 additions & 20 deletions omniscidb/StringDictionary/StringDictionaryProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,26 @@ class StringDictionaryProxy {
std::pair<const char*, size_t> getStringBytes(int32_t string_id) const noexcept;

class IdMap {
size_t const offset_;
const uint32_t dict_size_;
std::vector<int32_t> vector_map_;
int64_t num_untranslated_strings_{-1};

public:
// +1 is added to skip string_id=-1 reserved for INVALID_STR_ID. id_map[-1]==-1.
IdMap(uint32_t const tran_size, uint32_t const dict_size)
: offset_(tran_size + 1)
, vector_map_(offset_ + dict_size, StringDictionary::INVALID_STR_ID) {}
: dict_size_(dict_size)
, vector_map_(tran_size + dict_size, StringDictionary::INVALID_STR_ID) {}
IdMap(IdMap const&) = delete;
IdMap(IdMap&&) = default;
bool empty() const { return vector_map_.size() == 1; }
inline size_t getIndex(int32_t const id) const { return offset_ + id; }
bool empty() const { return vector_map_.empty(); }
inline size_t getIndex(int32_t const id) const { return id; }
std::vector<int32_t> const& getVectorMap() const { return vector_map_; }
size_t size() const { return vector_map_.size(); }
size_t numTransients() const { return offset_ - 1; }
size_t numNonTransients() const { return vector_map_.size() - offset_; }
size_t numTransients() const { return vector_map_.size() - dict_size_; }
size_t numNonTransients() const { return dict_size_; }
int32_t* data() { return vector_map_.data(); }
int32_t const* data() const { return vector_map_.data(); }
int32_t domainStart() const { return -static_cast<int32_t>(offset_); }
int32_t domainEnd() const { return static_cast<int32_t>(numNonTransients()); }
int32_t domainStart() const { return 0; }
int32_t domainEnd() const { return vector_map_.size(); }
// Next two methods are currently used by buildUnionTranslationMapToOtherProxy to
// short circuit iteration over ids after intersection translation if all
// ids translated. Currently the private num_untranslated_strings_ is initialized
Expand All @@ -114,7 +113,7 @@ class StringDictionaryProxy {
void setNumUntranslatedStrings(const size_t num_untranslated_strings) {
num_untranslated_strings_ = static_cast<int64_t>(num_untranslated_strings);
}
int32_t* storageData() { return vector_map_.data() + offset_; }
int32_t* transientData() { return vector_map_.data() + dict_size_; }
int32_t& operator[](int32_t const id) { return vector_map_[getIndex(id)]; }
int32_t operator[](int32_t const id) const { return vector_map_[getIndex(id)]; }
friend std::ostream& operator<<(std::ostream&, IdMap const&);
Expand Down Expand Up @@ -187,17 +186,11 @@ class StringDictionaryProxy {
void eachStringSerially(StringDictionary::StringCallback&) const;

private:
// INVALID_STR_ID = -1 is reserved for invalid string_ids.
// Thus the greatest valid transient string_id is -2.
static unsigned transientIdToIndex(int32_t const id) {
constexpr int max_transient_string_id = -2;
return static_cast<unsigned>(max_transient_string_id - id);
unsigned transientIdToIndex(int32_t const id) const {
return static_cast<unsigned>(id - generation_);
}

static int32_t transientIndexToId(unsigned const index) {
constexpr int max_transient_string_id = -2;
return static_cast<int32_t>(max_transient_string_id - index);
}
int32_t transientIndexToId(unsigned const index) const { return generation_ + index; }

/**
* @brief Returns the number of string entries in the underlying string dictionary,
Expand Down
1 change: 1 addition & 0 deletions omniscidb/Tests/ColumnarResultsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ void test_columnar_conversion(const std::vector<TargetInfo>& target_infos,
target_infos,
query_mem_desc,
generator,
0,
non_empty_step_size);

// Columnar Conversion:
Expand Down
1 change: 1 addition & 0 deletions omniscidb/Tests/GpuSharedMemoryTestHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ std::vector<std::unique_ptr<ResultSet>> create_and_fill_input_result_sets(
target_infos,
query_mem_desc,
generators[i],
0,
steps[i]);
}
return result_sets;
Expand Down
5 changes: 3 additions & 2 deletions omniscidb/Tests/ResultSetBaselineRadixSortTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void fill_storage_buffer_baseline_sort_int(int8_t* buff,
sizeof(K),
row_size_quad);
CHECK(value_slots);
fill_one_entry_baseline(value_slots, val, target_infos);
fill_one_entry_baseline(value_slots, val, target_infos, 0);
}
}

Expand Down Expand Up @@ -160,7 +160,8 @@ void fill_storage_buffer_baseline_sort_fp(int8_t* buff,
sizeof(int64_t),
key_component_count + target_slot_count);
CHECK(value_slots);
fill_one_entry_baseline(value_slots, val, target_infos, false, val == null_pattern);
fill_one_entry_baseline(
value_slots, val, target_infos, 0, false, val == null_pattern);
}
}

Expand Down
58 changes: 39 additions & 19 deletions omniscidb/Tests/ResultSetTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,14 +420,14 @@ void ResultSetEmulator::rse_fill_storage_buffer_perfect_hash_rowwise(
// exersized for null_val test
rs_values[i] = -1;
key_buff = fill_one_entry_no_collisions(
entries_buff, rs_query_mem_desc, v, rs_target_infos, false, true);
entries_buff, rs_query_mem_desc, v, rs_target_infos, 0, false, true);
} else {
key_buff = fill_one_entry_no_collisions(
entries_buff, rs_query_mem_desc, v, rs_target_infos, false, false);
entries_buff, rs_query_mem_desc, v, rs_target_infos, 0, false, false);
}
} else {
key_buff = fill_one_entry_no_collisions(
entries_buff, rs_query_mem_desc, v, rs_target_infos, false);
entries_buff, rs_query_mem_desc, v, rs_target_infos, 0, false);
}
} else {
auto key_buff_i64 = reinterpret_cast<int64_t*>(key_buff);
Expand All @@ -441,10 +441,10 @@ void ResultSetEmulator::rse_fill_storage_buffer_perfect_hash_rowwise(
rs_values[i] = -1;
}
key_buff = fill_one_entry_no_collisions(
entries_buff, rs_query_mem_desc, 0xdeadbeef, rs_target_infos, true, true);
entries_buff, rs_query_mem_desc, 0xdeadbeef, rs_target_infos, 0, true, true);
} else {
key_buff = fill_one_entry_no_collisions(
entries_buff, rs_query_mem_desc, 0xdeadbeef, rs_target_infos, true);
entries_buff, rs_query_mem_desc, 0xdeadbeef, rs_target_infos, 0, true);
}
}
}
Expand Down Expand Up @@ -562,10 +562,10 @@ void ResultSetEmulator::rse_fill_storage_buffer_baseline_rowwise(
if ((rs_flow == 2) &&
(i >= rs_entry_count - 4)) { // null_val test-cases: last four rows
rs_values[i] = -1;
fill_one_entry_baseline(value_slots, v, rs_target_infos, false, true);
fill_one_entry_baseline(value_slots, v, rs_target_infos, 0, false, true);
} else {
rs_values[i] = v;
fill_one_entry_baseline(value_slots, v, rs_target_infos, false, false);
fill_one_entry_baseline(value_slots, v, rs_target_infos, 0, false, false);
}
}
}
Expand Down Expand Up @@ -886,8 +886,12 @@ void test_iterate(const std::vector<TargetInfo>& target_infos,
}
const auto storage = result_set.allocateStorage();
EvenNumberGenerator generator;
fill_storage_buffer(
storage->getUnderlyingBuffer(), target_infos, query_mem_desc, generator, 2);
fill_storage_buffer(storage->getUnderlyingBuffer(),
target_infos,
query_mem_desc,
generator,
sdp->getBaseGeneration(),
2);
int64_t ref_val{0};
while (true) {
const auto row = result_set.getNextRow(true, false);
Expand Down Expand Up @@ -1010,7 +1014,7 @@ void run_reduction(const std::vector<TargetInfo>& target_infos,
auto executor = Executor::getExecutor(getDataMgr());
const auto row_set_mem_owner = std::make_shared<RowSetMemoryOwner>(
g_data_provider.get(), Executor::getArenaBlockSize());
row_set_mem_owner->addStringDict(g_sd, 1, g_sd->storageEntryCount());
auto sdp = row_set_mem_owner->addStringDict(g_sd, 1, g_sd->storageEntryCount());
const auto rs1 = std::make_unique<ResultSet>(target_infos,
ExecutorDeviceType::CPU,
query_mem_desc,
Expand All @@ -1019,8 +1023,12 @@ void run_reduction(const std::vector<TargetInfo>& target_infos,
0,
0);
storage1 = rs1->allocateStorage();
fill_storage_buffer(
storage1->getUnderlyingBuffer(), target_infos, query_mem_desc, generator1, step);
fill_storage_buffer(storage1->getUnderlyingBuffer(),
target_infos,
query_mem_desc,
generator1,
sdp->getBaseGeneration(),
step);
const auto rs2 = std::make_unique<ResultSet>(target_infos,
ExecutorDeviceType::CPU,
query_mem_desc,
Expand All @@ -1029,8 +1037,12 @@ void run_reduction(const std::vector<TargetInfo>& target_infos,
0,
0);
storage2 = rs2->allocateStorage();
fill_storage_buffer(
storage2->getUnderlyingBuffer(), target_infos, query_mem_desc, generator2, step);
fill_storage_buffer(storage2->getUnderlyingBuffer(),
target_infos,
query_mem_desc,
generator2,
sdp->getBaseGeneration(),
step);
ResultSetManager rs_manager;
std::vector<ResultSet*> storage_set{rs1.get(), rs2.get()};
rs_manager.reduce(storage_set, config(), executor.get());
Expand All @@ -1048,7 +1060,7 @@ void test_reduce(const std::vector<TargetInfo>& target_infos,
auto executor = Executor::getExecutor(getDataMgr());
const auto row_set_mem_owner = std::make_shared<RowSetMemoryOwner>(
g_data_provider.get(), Executor::getArenaBlockSize());
row_set_mem_owner->addStringDict(g_sd, 1, g_sd->storageEntryCount());
auto sdp = row_set_mem_owner->addStringDict(g_sd, 1, g_sd->storageEntryCount());
const auto rs1 = std::make_unique<ResultSet>(target_infos,
ExecutorDeviceType::CPU,
query_mem_desc,
Expand All @@ -1057,8 +1069,12 @@ void test_reduce(const std::vector<TargetInfo>& target_infos,
0,
0);
storage1 = rs1->allocateStorage();
fill_storage_buffer(
storage1->getUnderlyingBuffer(), target_infos, query_mem_desc, generator1, step);
fill_storage_buffer(storage1->getUnderlyingBuffer(),
target_infos,
query_mem_desc,
generator1,
sdp->getBaseGeneration(),
step);
const auto rs2 = std::make_unique<ResultSet>(target_infos,
ExecutorDeviceType::CPU,
query_mem_desc,
Expand All @@ -1067,8 +1083,12 @@ void test_reduce(const std::vector<TargetInfo>& target_infos,
0,
0);
storage2 = rs2->allocateStorage();
fill_storage_buffer(
storage2->getUnderlyingBuffer(), target_infos, query_mem_desc, generator2, step);
fill_storage_buffer(storage2->getUnderlyingBuffer(),
target_infos,
query_mem_desc,
generator2,
sdp->getBaseGeneration(),
step);
ResultSetManager rs_manager;
std::vector<ResultSet*> storage_set{rs1.get(), rs2.get()};
auto result_rs = rs_manager.reduce(storage_set, config(), executor.get());
Expand Down
Loading