Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 341dbed

Browse files
committed
Support dictionary proxies in ResultSetRegistry.
Signed-off-by: ienkovich <[email protected]>
1 parent e176743 commit 341dbed

File tree

9 files changed

+206
-12
lines changed

9 files changed

+206
-12
lines changed

omniscidb/QueryEngine/Execute.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,7 @@ ResultSetPtr Executor::reduceMultiDeviceResults(
11701170
return std::make_shared<ResultSet>(targets,
11711171
ExecutorDeviceType::CPU,
11721172
QueryMemoryDescriptor(),
1173-
nullptr,
1173+
row_set_mem_owner,
11741174
data_mgr_,
11751175
blockSize(),
11761176
gridSize());

omniscidb/ResultSet/ResultSet.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,11 @@ StringDictionary* ResultSet::getStringDictionaryProxy(int const dict_id) const {
493493
return row_set_mem_owner_->getOrAddStringDictProxy(dict_id);
494494
}
495495

496+
std::shared_ptr<StringDictionary> ResultSet::getStringDictionaryProxyOwned(
497+
int const dict_id) const {
498+
return row_set_mem_owner_->getStringDictProxyOwned(dict_id);
499+
}
500+
496501
class ResultSet::CellCallback {
497502
std::vector<int32_t> const id_map_;
498503
int64_t const null_int_;

omniscidb/ResultSet/ResultSet.h

+2
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,8 @@ class ResultSet {
524524
getUniqueStringsForDictEncodedTargetCol(const size_t col_idx) const;
525525

526526
StringDictionary* getStringDictionaryProxy(int const dict_id) const;
527+
std::shared_ptr<StringDictionary> getStringDictionaryProxyOwned(
528+
int const dict_id) const;
527529

528530
template <typename ENTRY_TYPE, QueryDescriptionType QUERY_TYPE, bool COLUMNAR_FORMAT>
529531
ENTRY_TYPE getEntryAt(const size_t row_idx,

omniscidb/ResultSet/RowSetMemoryOwner.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ StringDictionary* RowSetMemoryOwner::getOrAddStringDictProxy(const int dict_id_i
4848
CHECK_LE(dd->dictNBits, 32);
4949
return addStringDict(dd->stringDict, dict_id, generation);
5050
}
51+
// It's possible the original dictionary has been removed from its storage
52+
// but we still have it in a proxy.
53+
if (dict_id != DictRef::literalsDictId) {
54+
auto res = getStringDictProxyOwned(dict_id_in);
55+
CHECK(res) << "Cannot find dict or proxy " << dict_id_in;
56+
CHECK(generation < 0 || res->getBaseGeneration() == generation);
57+
return res.get();
58+
}
5159
CHECK_EQ(dict_id, DictRef::literalsDictId);
5260
if (!lit_str_dict_proxy_) {
5361
DictRef literal_dict_ref(DictRef::invalidDbId, DictRef::literalsDictId);
@@ -58,6 +66,19 @@ StringDictionary* RowSetMemoryOwner::getOrAddStringDictProxy(const int dict_id_i
5866
return lit_str_dict_proxy_.get();
5967
}
6068

69+
std::shared_ptr<StringDictionary> RowSetMemoryOwner::getStringDictProxyOwned(
70+
const int dict_id) {
71+
std::lock_guard<std::mutex> lock(state_mutex_);
72+
if (dict_id == DictRef::literalsDictId) {
73+
return lit_str_dict_proxy_;
74+
}
75+
auto it = str_dict_proxy_owned_.find(dict_id);
76+
if (it != str_dict_proxy_owned_.end()) {
77+
return it->second;
78+
}
79+
return nullptr;
80+
}
81+
6182
quantile::TDigest* RowSetMemoryOwner::nullTDigest(double const q) {
6283
std::lock_guard<std::mutex> lock(state_mutex_);
6384
return t_digests_

omniscidb/ResultSet/RowSetMemoryOwner.h

+1
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
232232

233233
StringDictionary* getOrAddStringDictProxy(const int dict_id_in,
234234
const int64_t generation = -1);
235+
std::shared_ptr<StringDictionary> getStringDictProxyOwned(const int dict_id);
235236

236237
void addLiteralStringDictProxy(std::shared_ptr<StringDictionary> lit_str_dict_proxy) {
237238
std::lock_guard<std::mutex> lock(state_mutex_);

omniscidb/ResultSetRegistry/ResultSetRegistry.cpp

+74-10
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,55 @@ ResultSetTableTokenPtr ResultSetRegistry::put(ResultSetTable table) {
113113
bool has_varlen = false;
114114
bool has_array = false;
115115
for (size_t col_idx = 0; col_idx < first_rs->colCount(); ++col_idx) {
116-
addColumnInfo(db_id_,
117-
table_id,
118-
columnId(col_idx),
119-
first_rs->colName(col_idx),
120-
first_rs->colType(col_idx),
121-
false);
122-
has_varlen = has_varlen || first_rs->colType(col_idx)->isVarLen();
123-
has_array = has_array || first_rs->colType(col_idx)->isArray();
116+
auto col_type = first_rs->colType(col_idx);
117+
if (col_type->isExtDictionary()) {
118+
auto dict_type = col_type->as<hdk::ir::ExtDictionaryType>();
119+
auto dict_proxy = first_rs->getStringDictionaryProxyOwned(dict_type->dictId());
120+
// If there is no proxy and dictionary is owned by the registry then simply
121+
// re-use it.
122+
if (!dict_proxy && getSchemaId(dict_type->dictId()) == schema_id_) {
123+
CHECK(dicts_.count(dict_type->dictId()));
124+
dict_proxy = dicts_.at(dict_type->dictId())->dict_descriptor->stringDict;
125+
}
126+
// If we have a dictionary with no proxy then it's not safe to use because it can
127+
// simply be removed from its storage. To avoid this, we create a proxy dictionary.
128+
// We require result set to have a row set memory owner for that.
129+
if (!dict_proxy) {
130+
CHECK(dict_type->dictId() > 0) << dict_type->toString();
131+
CHECK(first_rs->getRowSetMemOwner());
132+
CHECK(first_rs->getStringDictionaryProxy(dict_type->dictId()));
133+
dict_proxy = first_rs->getStringDictionaryProxyOwned(dict_type->dictId());
134+
CHECK(dict_proxy);
135+
}
136+
// Add proxy as a new dictionary if it exists and is not registered yet.
137+
if (dict_proxy->getDictId() <= 0) {
138+
auto dict_id = addSchemaIdChecked(next_dict_id_++, schema_id_);
139+
dict_proxy->setDictId(dict_id);
140+
auto dict_desc = std::make_unique<DictDescriptor>(db_id_,
141+
dict_id,
142+
first_rs->colName(col_idx),
143+
/*nbits=*/dict_type->size() * 8,
144+
/*is_shared=*/true,
145+
/*refcount=*/1,
146+
table_name,
147+
/*temp=*/true);
148+
dict_desc->stringDict = dict_proxy;
149+
auto dict_data = std::make_unique<DictionaryData>();
150+
dict_data->dict_descriptor = std::move(dict_desc);
151+
CHECK(!dicts_.count(dict_id));
152+
dicts_.emplace(dict_id, std::move(dict_data));
153+
}
154+
// Register table as a dictionary user and fix-up dictionary type.
155+
auto dict_id = dict_proxy->getDictId();
156+
CHECK_EQ(getSchemaId(dict_id), schema_id_);
157+
CHECK(dicts_.count(dict_id));
158+
dicts_.at(dict_id)->table_ids.insert(table_id);
159+
col_type = hdk::ir::Context::defaultCtx().extDict(dict_type->elemType(), dict_id);
160+
}
161+
addColumnInfo(
162+
db_id_, table_id, columnId(col_idx), first_rs->colName(col_idx), col_type, false);
163+
has_varlen = has_varlen || col_type->isVarLen();
164+
has_array = has_array || col_type->isArray();
124165
}
125166
addRowidColumn(db_id_, table_id, columnId(first_rs->colCount()));
126167

@@ -169,11 +210,31 @@ void ResultSetRegistry::drop(const ResultSetTableToken& token) {
169210
mapd_unique_lock<mapd_shared_mutex> schema_lock(schema_mutex_);
170211
mapd_unique_lock<mapd_shared_mutex> data_lock(data_mutex_);
171212

213+
// Drop data.
172214
CHECK(tables_.count(token.tableId()));
173215
std::unique_ptr<TableData> table = std::move(tables_.at(token.tableId()));
174216
mapd_unique_lock<mapd_shared_mutex> table_lock(table->mutex);
175217
tables_.erase(token.tableId());
176218

219+
// Drop dicts.
220+
auto cols = listColumnsNoLock(db_id_, token.tableId());
221+
std::unordered_set<int> used_dicts;
222+
for (auto& col : cols) {
223+
if (auto dict_type = col->type->as<hdk::ir::ExtDictionaryType>()) {
224+
used_dicts.insert(dict_type->dictId());
225+
}
226+
}
227+
for (int dict_id : used_dicts) {
228+
CHECK_EQ(getSchemaId(dict_id), schema_id_);
229+
CHECK(dicts_.count(dict_id));
230+
auto& desc = *dicts_.at(dict_id);
231+
desc.table_ids.erase(token.tableId());
232+
if (desc.table_ids.empty()) {
233+
dicts_.erase(dict_id);
234+
}
235+
}
236+
237+
// Drop schema.
177238
SimpleSchemaProvider::dropTable(token.dbId(), token.tableId());
178239
}
179240

@@ -514,8 +575,11 @@ TableFragmentsInfo ResultSetRegistry::getTableMetadata(int db_id, int table_id)
514575
}
515576

516577
const DictDescriptor* ResultSetRegistry::getDictMetadata(int dict_id, bool load_dict) {
517-
// Currently, we don't hold any dictionaries in the registry.
518-
UNREACHABLE();
578+
mapd_shared_lock<mapd_shared_mutex> data_lock(data_mutex_);
579+
CHECK_EQ(getSchemaId(dict_id), schema_id_);
580+
if (dicts_.count(dict_id)) {
581+
return dicts_.at(dict_id)->dict_descriptor.get();
582+
}
519583
return nullptr;
520584
}
521585

omniscidb/ResultSetRegistry/ResultSetRegistry.h

+7
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,17 @@ class ResultSetRegistry : public SimpleSchemaProvider,
8181
TableStats table_stats;
8282
};
8383

84+
struct DictionaryData {
85+
std::unique_ptr<DictDescriptor> dict_descriptor;
86+
std::set<int> table_ids;
87+
};
88+
8489
const int db_id_;
8590
const int schema_id_;
8691
int next_table_id_ = 1;
92+
int next_dict_id_ = 1;
8793
std::unordered_map<int, std::unique_ptr<TableData>> tables_;
94+
std::unordered_map<int, std::unique_ptr<DictionaryData>> dicts_;
8895
const ConfigPtr config_;
8996
mutable mapd_shared_mutex data_mutex_;
9097
};

omniscidb/StringDictionary/StringDictionary.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ class StringDictionary {
8989

9090
int32_t getDbId() const noexcept;
9191
int32_t getDictId() const noexcept;
92+
void setDictId(int32_t new_id) {
93+
CHECK_EQ(dict_ref_.dictId, -1);
94+
CHECK_GT(new_id, 0);
95+
dict_ref_.dictId = new_id;
96+
}
9297

9398
StringDictionary* getBaseDictionary() const noexcept { return base_dict_.get(); }
9499
int64_t getBaseGeneration() const noexcept { return base_generation_; }
@@ -249,7 +254,7 @@ class StringDictionary {
249254
uint32_t hashById(int string_id) const { return hashByIndex(idToIndex(string_id)); }
250255
uint32_t hashByIndex(int string_idx) const { return hash_cache_[string_idx]; }
251256

252-
const DictRef dict_ref_;
257+
DictRef dict_ref_;
253258
const std::shared_ptr<StringDictionary> base_dict_;
254259
const int64_t base_generation_;
255260
size_t str_count_;

omniscidb/Tests/QueryBuilderTest.cpp

+89
Original file line numberDiff line numberDiff line change
@@ -7020,13 +7020,19 @@ class Issue588 : public TestSuite {
70207020
insertCsvValues("test_588_3", "1,1\n2,2\n3,3");
70217021
createTable("test_588_4", {{"id", ctx().int64()}, {"D", ctx().int64()}});
70227022
insertCsvValues("test_588_4", "1,2\n2,3\n3,3");
7023+
createTable("test_588_5",
7024+
{{"id", ctx().int32()},
7025+
{"s1", ctx().extDict(ctx().text(), 0)},
7026+
{"s2", ctx().extDict(ctx().text(), 0)}});
7027+
insertCsvValues("test_588_5", "1,str1,str22\n2,str2,str22\n3,str3,str33");
70237028
}
70247029

70257030
static void TearDownTestSuite() {
70267031
dropTable("test_588_1");
70277032
dropTable("test_588_2");
70287033
dropTable("test_588_3");
70297034
dropTable("test_588_4");
7035+
dropTable("test_588_5");
70307036
}
70317037
};
70327038

@@ -7049,6 +7055,89 @@ TEST_F(Issue588, Reproducer1) {
70497055
auto res3 = runQuery(std::move(dag3));
70507056
}
70517057

7058+
TEST_F(Issue588, Reproducer2) {
7059+
QueryBuilder builder(ctx(), getSchemaProvider(), configPtr());
7060+
auto scan1 = builder.scan("test_588_1");
7061+
auto dag1 = scan1.proj({scan1.ref("A"), builder.cst("str")}).finalize();
7062+
auto res1 = runQuery(std::move(dag1));
7063+
7064+
auto scan2 = builder.scan(res1.tableName());
7065+
auto dag2 = scan2.proj({0, 1}).finalize();
7066+
auto res2 = runQuery(std::move(dag2));
7067+
}
7068+
7069+
TEST_F(Issue588, Reproducer3) {
7070+
QueryBuilder builder(ctx(), getSchemaProvider(), configPtr());
7071+
auto scan1 = builder.scan("test_588_5");
7072+
auto dict1_type = scan1.ref("s1").type();
7073+
auto dag1 = scan1
7074+
.proj({scan1.ref("id"),
7075+
builder.ifThenElse(scan1.ref("id").gt(2),
7076+
scan1.ref("s1"),
7077+
builder.cst("str").cast(dict1_type))})
7078+
.finalize();
7079+
auto res1 = runQuery(std::move(dag1));
7080+
7081+
auto scan2 = builder.scan(res1.tableName());
7082+
auto dag2 = scan2.proj({0, 1}).finalize();
7083+
auto res2 = runQuery(std::move(dag2));
7084+
}
7085+
7086+
TEST_F(Issue588, Reproducer4) {
7087+
QueryBuilder builder(ctx(), getSchemaProvider(), configPtr());
7088+
auto scan1 = builder.scan("test_588_5");
7089+
auto dict1_type = scan1.ref("s1").type();
7090+
auto dag1 = scan1
7091+
.proj({scan1.ref("id"),
7092+
builder.ifThenElse(scan1.ref("id").gt(2),
7093+
scan1.ref("s1"),
7094+
scan1.ref("s2").cast(dict1_type))})
7095+
.finalize();
7096+
auto res1 = runQuery(std::move(dag1));
7097+
7098+
auto scan2 = builder.scan(res1.tableName());
7099+
auto dag2 = scan2.proj({0, 1}).finalize();
7100+
auto res2 = runQuery(std::move(dag2));
7101+
}
7102+
7103+
TEST_F(Issue588, Reproducer5) {
7104+
QueryBuilder builder(ctx(), getSchemaProvider(), configPtr());
7105+
auto scan1 = builder.scan("test_588_5");
7106+
auto dict1_type = scan1.ref("s1").type();
7107+
auto dag1 = scan1
7108+
.proj({scan1.ref("id"),
7109+
builder.ifThenElse(
7110+
scan1.ref("id").gt(2),
7111+
scan1.ref("s1"),
7112+
builder.ifThenElse(scan1.ref("id").gt(1),
7113+
scan1.ref("s2").cast(dict1_type),
7114+
builder.cst("str").cast(dict1_type)))})
7115+
.finalize();
7116+
auto res1 = runQuery(std::move(dag1));
7117+
7118+
auto scan2 = builder.scan(res1.tableName());
7119+
auto dag2 = scan2.proj({0, 1}).finalize();
7120+
auto res2 = runQuery(std::move(dag2));
7121+
}
7122+
7123+
class Issue667 : public TestSuite {
7124+
protected:
7125+
static void SetUpTestSuite() {
7126+
createTable("test_667", {{"str", ctx().extDict(ctx().text(), 0)}});
7127+
insertCsvValues("test_667", "1\n2\n3");
7128+
}
7129+
7130+
static void TearDownTestSuite() {}
7131+
};
7132+
7133+
TEST_F(Issue667, DropInputThenConvertResult) {
7134+
QueryBuilder builder(ctx(), getSchemaProvider(), configPtr());
7135+
auto dag = builder.scan("test_667").proj("str").finalize();
7136+
auto res = runQuery(std::move(dag));
7137+
dropTable("test_667");
7138+
compare_res_data(res, std::vector<std::string>({"1"s, "2"s, "3"s}));
7139+
}
7140+
70527141
TEST_F(QueryBuilderTest, RunOnResult) {
70537142
QueryBuilder builder(ctx(), schema_mgr_, configPtr());
70547143

0 commit comments

Comments
 (0)