Skip to content

Commit eb99f51

Browse files
Support multiple versions of schema when index write (vesoft-inc#2073)
1 parent bf6af1a commit eb99f51

11 files changed

+150
-41
lines changed

src/graph/test/IndexTest.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,83 @@ TEST_F(IndexTest, EdgeIndexTTL) {
687687
}
688688
}
689689

690+
TEST_F(IndexTest, AlterTag) {
691+
auto client = gEnv->getClient();
692+
ASSERT_NE(nullptr, client);
693+
{
694+
cpp2::ExecutionResponse resp;
695+
std::string query = "CREATE SPACE tag_index_space(partition_num=1, replica_factor=1)";
696+
auto code = client->execute(query, resp);
697+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
698+
699+
query = "USE tag_index_space";
700+
code = client->execute(query, resp);
701+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
702+
703+
query = "CREATE TAG person(name string, age int, gender string, email string)";
704+
code = client->execute(query, resp);
705+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
706+
}
707+
sleep(FLAGS_heartbeat_interval_secs + 1);
708+
// Single Tag Single Field
709+
{
710+
cpp2::ExecutionResponse resp;
711+
std::string query = "CREATE TAG INDEX single_person_index ON person(name)";
712+
auto code = client->execute(query, resp);
713+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
714+
}
715+
sleep(FLAGS_heartbeat_interval_secs + 1);
716+
{
717+
cpp2::ExecutionResponse resp;
718+
auto query = "INSERT VERTEX person(name, age, gender, email) VALUES "
719+
"100: (\"Tim\", 18, \"M\", \"[email protected]\")";
720+
auto code = client->execute(query, resp);
721+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
722+
}
723+
{
724+
cpp2::ExecutionResponse resp;
725+
auto query = "ALTER TAG person ADD (col1 int)";
726+
auto code = client->execute(query, resp);
727+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
728+
}
729+
sleep(FLAGS_heartbeat_interval_secs + 1);
730+
// Single Tag Single Field
731+
{
732+
cpp2::ExecutionResponse resp;
733+
std::string query = "CREATE TAG INDEX single_person_index2 ON person(col1)";
734+
auto code = client->execute(query, resp);
735+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
736+
}
737+
sleep(FLAGS_heartbeat_interval_secs + 1);
738+
{
739+
cpp2::ExecutionResponse resp;
740+
auto query = "INSERT VERTEX person(name, age, gender, email, col1) VALUES "
741+
"100:(\"Tim\", 18, \"M\", \"[email protected]\", 5)";
742+
auto code = client->execute(query, resp);
743+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
744+
}
745+
{
746+
cpp2::ExecutionResponse resp;
747+
auto query = "LOOKUP ON person WHERE person.col1 == 5 YIELD person.col1, person.name";
748+
auto code = client->execute(query, resp);
749+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
750+
std::vector<std::tuple<VertexID, int64_t, std::string>> expected = {
751+
{100, 5, "Tim"},
752+
};
753+
ASSERT_TRUE(verifyResult(resp, expected));
754+
}
755+
{
756+
cpp2::ExecutionResponse resp;
757+
auto query = "LOOKUP ON person where person.name == \"Tim\" YIELD person.name, person.col1";
758+
auto code = client->execute(query, resp);
759+
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
760+
std::vector<std::tuple<VertexID, std::string, int64_t>> expected = {
761+
{100, "Tim", 5},
762+
};
763+
ASSERT_TRUE(verifyResult(resp, expected));
764+
}
765+
}
766+
690767
} // namespace graph
691768
} // namespace nebula
692769

src/storage/BaseProcessor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ class BaseProcessor {
150150
return tHost;
151151
}
152152

153-
IndexValues collectIndexValues(RowReader* reader,
154-
const std::vector<nebula::cpp2::ColumnDef>& cols);
153+
StatusOr<IndexValues> collectIndexValues(RowReader* reader,
154+
const std::vector<nebula::cpp2::ColumnDef>& cols);
155155

156156
void collectProps(RowReader* reader, const std::vector<PropContext>& props,
157157
Collector* collector);

src/storage/BaseProcessor.inl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,22 @@ kvstore::ResultCode BaseProcessor<RESP>::doRangeWithPrefix(
118118
}
119119

120120
template <typename RESP>
121-
IndexValues
121+
StatusOr<IndexValues>
122122
BaseProcessor<RESP>::collectIndexValues(RowReader* reader,
123123
const std::vector<nebula::cpp2::ColumnDef>& cols) {
124124
IndexValues values;
125125
if (reader == nullptr) {
126-
return values;
126+
return Status::Error("Invalid row reader");
127127
}
128128
for (auto& col : cols) {
129129
auto res = RowReader::getPropByName(reader, col.get_name());
130-
if (!ok(res)) {
131-
LOG(ERROR) << "Skip bad column prop " << col.get_name();
130+
if (ok(res)) {
131+
auto val = NebulaKeyUtils::encodeVariant(value(std::move(res)));
132+
values.emplace_back(col.get_type().get_type(), std::move(val));
133+
} else {
134+
LOG(ERROR) << "Skip bad column prop : " << col.get_name();
135+
return Status::Error("Skip bad column prop : %s", col.get_name().c_str());
132136
}
133-
auto val = NebulaKeyUtils::encodeVariant(value(std::move(res)));
134-
values.emplace_back(col.get_type().get_type(), std::move(val));
135137
}
136138
return values;
137139
}

src/storage/admin/RebuildEdgeIndexProcessor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,11 @@ void RebuildEdgeIndexProcessor::process(const cpp2::RebuildIndexRequest& req) {
8787
continue;
8888
}
8989
auto values = collectIndexValues(reader.get(), item->get_fields());
90+
if (!values.ok()) {
91+
continue;
92+
}
9093
auto indexKey = NebulaKeyUtils::edgeIndexKey(part, indexID, source,
91-
ranking, destination, values);
94+
ranking, destination, values.value());
9295
data.emplace_back(std::move(indexKey), "");
9396
batchNum += 1;
9497
iter->next();

src/storage/admin/RebuildTagIndexProcessor.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,11 @@ void RebuildTagIndexProcessor::process(const cpp2::RebuildIndexRequest& req) {
8484
continue;
8585
}
8686
auto values = collectIndexValues(reader.get(), item->get_fields());
87-
88-
auto indexKey = NebulaKeyUtils::vertexIndexKey(part, indexID, vertex, values);
87+
if (!values.ok()) {
88+
continue;
89+
}
90+
auto indexKey = NebulaKeyUtils::vertexIndexKey(part, indexID,
91+
vertex, values.value());
8992
data.emplace_back(std::move(indexKey), "");
9093
batchNum += 1;
9194
iter->next();

src/storage/mutate/AddEdgesProcessor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ std::string AddEdgesProcessor::addEdges(int64_t version, PartitionID partId,
125125
}
126126
}
127127
auto ni = indexKey(partId, nReader.get(), e.first, index);
128-
batchHolder->put(std::move(ni), "");
128+
if (!ni.empty()) {
129+
batchHolder->put(std::move(ni), "");
130+
}
129131
}
130132
}
131133
/*
@@ -164,12 +166,15 @@ std::string AddEdgesProcessor::indexKey(PartitionID partId,
164166
const folly::StringPiece& rawKey,
165167
std::shared_ptr<nebula::cpp2::IndexItem> index) {
166168
auto values = collectIndexValues(reader, index->get_fields());
169+
if (!values.ok()) {
170+
return "";
171+
}
167172
return NebulaKeyUtils::edgeIndexKey(partId,
168173
index->get_index_id(),
169174
NebulaKeyUtils::getSrcId(rawKey),
170175
NebulaKeyUtils::getRank(rawKey),
171176
NebulaKeyUtils::getDstId(rawKey),
172-
values);
177+
values.value());
173178
}
174179

175180
} // namespace storage

src/storage/mutate/AddVerticesProcessor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ std::string AddVerticesProcessor::addVertices(int64_t version, PartitionID partI
142142
}
143143
}
144144
auto ni = indexKey(partId, vId, nReader.get(), index);
145-
batchHolder->put(std::move(ni), "");
145+
if (!ni.empty()) {
146+
batchHolder->put(std::move(ni), "");
147+
}
146148
}
147149
}
148150
/*
@@ -177,9 +179,12 @@ std::string AddVerticesProcessor::indexKey(PartitionID partId,
177179
RowReader* reader,
178180
std::shared_ptr<nebula::cpp2::IndexItem> index) {
179181
auto values = collectIndexValues(reader, index->get_fields());
182+
if (!values.ok()) {
183+
return "";
184+
}
180185
return NebulaKeyUtils::vertexIndexKey(partId,
181186
index->get_index_id(),
182-
vId, values);
187+
vId, values.value());
183188
}
184189

185190
} // namespace storage

src/storage/mutate/DeleteEdgesProcessor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,15 @@ DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
114114
}
115115
auto values = collectIndexValues(reader.get(),
116116
index->get_fields());
117+
if (!values.ok()) {
118+
continue;
119+
}
117120
auto indexKey = NebulaKeyUtils::edgeIndexKey(partId,
118121
indexId,
119122
srcId,
120123
rank,
121124
dstId,
122-
values);
125+
values.value());
123126
batchHolder->remove(std::move(indexKey));
124127
}
125128
}

src/storage/mutate/DeleteVerticesProcessor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,13 @@ DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
135135
}
136136
const auto& cols = index->get_fields();
137137
auto values = collectIndexValues(reader.get(), cols);
138+
if (!values.ok()) {
139+
continue;
140+
}
138141
auto indexKey = NebulaKeyUtils::vertexIndexKey(partId,
139142
indexId,
140143
vertex,
141-
values);
144+
values.value());
142145
batchHolder->remove(std::move(indexKey));
143146
}
144147
}

src/storage/mutate/UpdateEdgeProcessor.cpp

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -296,13 +296,15 @@ std::string UpdateEdgeProcessor::updateAndWriteBack(PartitionID partId,
296296
}
297297
auto rValues = collectIndexValues(rReader.get(),
298298
index->get_fields());
299-
auto rIndexKey = NebulaKeyUtils::edgeIndexKey(partId,
300-
indexId,
301-
edgeKey.src,
302-
edgeKey.ranking,
303-
edgeKey.dst,
304-
rValues);
305-
batchHolder->remove(std::move(rIndexKey));
299+
if (rValues.ok()) {
300+
auto rIndexKey = NebulaKeyUtils::edgeIndexKey(partId,
301+
indexId,
302+
edgeKey.src,
303+
edgeKey.ranking,
304+
edgeKey.dst,
305+
rValues.value());
306+
batchHolder->remove(std::move(rIndexKey));
307+
}
306308
}
307309
if (reader == nullptr) {
308310
reader = RowReader::getEdgePropReader(this->schemaMan_,
@@ -313,13 +315,15 @@ std::string UpdateEdgeProcessor::updateAndWriteBack(PartitionID partId,
313315

314316
auto values = collectIndexValues(reader.get(),
315317
index->get_fields());
316-
auto indexKey = NebulaKeyUtils::edgeIndexKey(partId,
317-
indexId,
318-
edgeKey.src,
319-
edgeKey.ranking,
320-
edgeKey.dst,
321-
values);
322-
batchHolder->put(std::move(indexKey), "");
318+
if (values.ok()) {
319+
auto indexKey = NebulaKeyUtils::edgeIndexKey(partId,
320+
indexId,
321+
edgeKey.src,
322+
edgeKey.ranking,
323+
edgeKey.dst,
324+
values.value());
325+
batchHolder->put(std::move(indexKey), "");
326+
}
323327
}
324328
}
325329
}

0 commit comments

Comments
 (0)