Skip to content

Commit b52e9dd

Browse files
committed
Reclaim resources on PG creation failure and add concurrency support for PG creation
- Reclaim resources if PG creation fails: - Ensure chunks are returned to the device heap if PG creation fails. - Remove replication device if PG creation fails. - Add concurrency support for receiving PG creation requests: - Use `select_chunks_for_pg()` to support concurrent requests instead of only checking size. - Add concurrency tests to verify the behavior of concurrent PG creation requests.
1 parent 38ab472 commit b52e9dd

File tree

5 files changed

+139
-16
lines changed

5 files changed

+139
-16
lines changed

conanfile.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomeObjectConan(ConanFile):
1111
name = "homeobject"
12-
version = "2.2.0"
12+
version = "2.2.1"
1313

1414
homepage = "https://github.com/eBay/HomeObject"
1515
description = "Blob Store built on HomeReplication"

src/lib/homestore_backend/heap_chunk_selector.cpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -192,19 +192,20 @@ bool HeapChunkSelector::is_chunk_available(const pg_id_t pg_id, const chunk_num_
192192

193193
std::optional< uint32_t > HeapChunkSelector::select_chunks_for_pg(pg_id_t pg_id, uint64_t pg_size) {
194194
std::unique_lock lock_guard(m_chunk_selector_mtx);
195-
if (m_per_pg_chunks.find(pg_id) != m_per_pg_chunks.end()) {
196-
LOGWARNMOD(homeobject, "PG had already created, pg_id {}", pg_id);
197-
return std::nullopt;
198-
}
199-
200195
const auto chunk_size = get_chunk_size();
201196
if (pg_size < chunk_size) {
202197
LOGWARNMOD(homeobject, "pg_size {} is less than chunk_size {}", pg_size, chunk_size);
203198
return std::nullopt;
204199
}
205-
206200
const uint32_t num_chunk = sisl::round_down(pg_size, chunk_size) / chunk_size;
207201

202+
if (m_per_pg_chunks.find(pg_id) != m_per_pg_chunks.end()) {
203+
// leader may call select_chunks_for_pg multiple times
204+
RELEASE_ASSERT(num_chunk == m_per_pg_chunks[pg_id]->m_pg_chunks.size(), "num_chunk should be same");
205+
LOGWARNMOD(homeobject, "PG had already created, pg_id {}", pg_id);
206+
return num_chunk;
207+
}
208+
208209
// Select a pdev with the most available num chunk
209210
auto most_avail_dev_it = std::max_element(m_per_dev_heap.begin(), m_per_dev_heap.end(),
210211
[](const std::pair< const uint32_t, std::shared_ptr< ChunkHeap > >& lhs,

src/lib/homestore_backend/hs_pg_manager.cpp

+40-8
Original file line numberDiff line numberDiff line change
@@ -64,32 +64,58 @@ PGManager::NullAsyncResult HSHomeObject::_create_pg(PGInfo&& pg_info, std::set<
6464
auto pg_id = pg_info.id;
6565
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) return folly::Unit();
6666

67-
const auto most_avail_num_chunks = chunk_selector()->most_avail_num_chunks();
6867
const auto chunk_size = chunk_selector()->get_chunk_size();
6968
if (pg_info.size < chunk_size) {
7069
LOGW("Not support to create PG which pg_size {} < chunk_size {}", pg_info.size, chunk_size);
7170
return folly::makeUnexpected(PGError::INVALID_ARG);
7271
}
7372

74-
const auto needed_num_chunks = sisl::round_down(pg_info.size, chunk_size) / chunk_size;
75-
if (needed_num_chunks > most_avail_num_chunks) {
76-
LOGW("No enough space to create pg, pg_id {}, needed_num_chunks {}, most_avail_num_chunks {}", pg_id,
77-
needed_num_chunks, most_avail_num_chunks);
73+
auto const num_chunk = chunk_selector()->select_chunks_for_pg(pg_id, pg_info.size);
74+
if (!num_chunk.has_value()) {
75+
LOGW("Failed to select chunks for pg {}", pg_id);
7876
return folly::makeUnexpected(PGError::NO_SPACE_LEFT);
7977
}
8078

8179
pg_info.chunk_size = chunk_size;
8280
pg_info.replica_set_uuid = boost::uuids::random_generator()();
81+
const auto repl_dev_group_id = pg_info.replica_set_uuid;
8382
return hs_repl_service()
8483
.create_repl_dev(pg_info.replica_set_uuid, peers)
8584
.via(executor_)
8685
.thenValue([this, pg_info = std::move(pg_info)](auto&& v) mutable -> PGManager::NullAsyncResult {
86+
#ifdef _PRERELEASE
87+
if (iomgr_flip::instance()->test_flip("create_pg_create_repl_dev_error")) {
88+
LOGW("Simulating create repl dev error in creating pg");
89+
v = folly::makeUnexpected(ReplServiceError::FAILED);
90+
}
91+
#endif
92+
8793
if (v.hasError()) { return folly::makeUnexpected(toPgError(v.error())); }
8894
// we will write a PGHeader across the raft group and when it is committed
8995
// all raft members will create PGinfo and index table for this PG.
9096

9197
// FIXME:https://github.com/eBay/HomeObject/pull/136#discussion_r1470504271
9298
return do_create_pg(v.value(), std::move(pg_info));
99+
})
100+
.thenValue([this, pg_id, repl_dev_group_id](auto&& r) -> PGManager::NullAsyncResult {
101+
// reclaim resources if failed to create pg
102+
if (r.hasError()) {
103+
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
104+
RELEASE_ASSERT(res, "Failed to return pg {} chunks to dev_heap", pg_id);
105+
// no matter if create repl dev successfully, remove it.
106+
// if don't have repl dev, it will return ReplServiceError::SERVER_NOT_FOUND
107+
return hs_repl_service()
108+
.remove_repl_dev(repl_dev_group_id)
109+
.deferValue([r, repl_dev_group_id](auto&& e) -> PGManager::NullAsyncResult {
110+
if (e != ReplServiceError::OK) {
111+
LOGW("Failed to remove repl device which group_id {}, error: {}", repl_dev_group_id, e);
112+
}
113+
114+
// still return the original error
115+
return folly::makeUnexpected(r.error());
116+
});
117+
}
118+
return folly::Unit();
93119
});
94120
}
95121

@@ -104,6 +130,13 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe
104130
req->header()->seal();
105131
std::memcpy(req->header_extn(), serailized_pg_info.data(), info_size);
106132

133+
#ifdef _PRERELEASE
134+
if (iomgr_flip::instance()->test_flip("create_pg_raft_message_error")) {
135+
LOGW("Simulating raft message error in creating pg");
136+
return folly::makeUnexpected(PGError::UNKNOWN);
137+
}
138+
#endif
139+
107140
// replicate this create pg message to all raft members of this group
108141
repl_dev->async_alloc_write(req->header_buf(), sisl::blob{}, sisl::sg_list{}, req);
109142
return req->result().deferValue([req](auto const& e) -> PGManager::NullAsyncResult {
@@ -291,7 +324,7 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
291324
auto lg = std::scoped_lock(_pg_lock);
292325
auto iter = _pg_map.find(pg_id);
293326
if (iter == _pg_map.end()) {
294-
LOGW("on pg destroy with unknown pg_id {}", pg_id);
327+
LOGW("mark pg destroyed with unknown pg_id {}", pg_id);
295328
return;
296329
}
297330
auto& pg = iter->second;
@@ -301,8 +334,7 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
301334
}
302335

303336
void HSHomeObject::reset_pg_chunks(pg_id_t pg_id) {
304-
bool res = chunk_selector_->reset_pg_chunks(pg_id);
305-
RELEASE_ASSERT(res, "Failed to reset all chunks in pg {}", pg_id);
337+
chunk_selector_->reset_pg_chunks(pg_id);
306338
auto fut = homestore::hs()->cp_mgr().trigger_cp_flush(true /* force */);
307339
auto on_complete = [&](auto success) {
308340
RELEASE_ASSERT(success, "Failed to trigger CP flush");

src/lib/homestore_backend/hs_shard_manager.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,6 @@ void HSHomeObject::destroy_shards(pg_id_t pg_id) {
553553

554554
auto& pg = iter->second;
555555
for (auto& shard : pg->shards_) {
556-
// release open shard v_chunk
557556
auto hs_shard = s_cast< HS_Shard* >(shard.get());
558557
// destroy shard super blk
559558
hs_shard->sb_.destroy();

src/lib/homestore_backend/tests/hs_pg_tests.cpp

+91
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "homeobj_fixture.hpp"
2+
#include <homestore/replication_service.hpp>
23

34
TEST_F(HomeObjectFixture, PGStatsTest) {
45
LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num());
@@ -157,3 +158,93 @@ TEST_F(HomeObjectFixture, PGRecoveryTest) {
157158
verify_hs_pg(reserved_pg, recovered_pg);
158159
}
159160
}
161+
162+
TEST_F(HomeObjectFixture, ConcurrencyCreatePG) {
163+
g_helper->sync();
164+
165+
LOGINFO("print num chunks {}", _obj_inst->chunk_selector()->m_chunks.size());
166+
auto const pg_num = 10;
167+
// concurrent create pg
168+
std::vector< std::future< void > > futures;
169+
for (pg_id_t i = 1; i <= pg_num; ++i) {
170+
futures.emplace_back(std::async(std::launch::async, [this, i]() { create_pg(i); }));
171+
}
172+
for (auto& future : futures) {
173+
future.get();
174+
}
175+
176+
// verify all pgs are created
177+
for (pg_id_t i = 1; i <= pg_num; ++i) {
178+
ASSERT_TRUE(pg_exist(i));
179+
LOGINFO("Create pg {} successfully", i);
180+
}
181+
}
182+
183+
TEST_F(HomeObjectFixture, CreatePGFailed) {
184+
#ifdef _PRERELEASE
185+
set_basic_flip("create_pg_create_repl_dev_error", 1); // simulate read pg snapshot data error
186+
set_basic_flip("create_pg_raft_message_error", 1); // simulate generate shard blob list error
187+
#endif
188+
189+
// test twice to trigger each simulate error
190+
for (auto i = 0; i < 2; ++i) {
191+
g_helper->sync();
192+
auto const pg_id = 1;
193+
const uint8_t leader_replica_num = 0;
194+
auto my_replica_num = g_helper->replica_num();
195+
auto pg_size = SISL_OPTIONS["pg_size"].as< uint64_t >() * Mi;
196+
auto name = g_helper->test_name();
197+
if (leader_replica_num == my_replica_num) {
198+
auto members = g_helper->members();
199+
auto info = homeobject::PGInfo(pg_id);
200+
info.size = pg_size;
201+
for (const auto& member : members) {
202+
if (leader_replica_num == member.second) {
203+
// by default, leader is the first member
204+
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 1});
205+
} else {
206+
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 0});
207+
}
208+
}
209+
auto p = _obj_inst->pg_manager()->create_pg(std::move(info)).get();
210+
ASSERT_FALSE(p);
211+
ASSERT_EQ(PGError::UNKNOWN, p.error());
212+
213+
// verify pg resource
214+
// since pg creation failed, the pg chunks should not exist
215+
ASSERT_TRUE(_obj_inst->chunk_selector()->m_per_pg_chunks.find(pg_id) ==
216+
_obj_inst->chunk_selector()->m_per_pg_chunks.end());
217+
// wait for repl gc.
218+
std::this_thread::sleep_for(std::chrono::seconds(70));
219+
int num_repl = 0;
220+
_obj_inst->hs_repl_service().iterate_repl_devs([&num_repl](cshared< homestore::ReplDev >&) { num_repl++; });
221+
LOGINFO("Failed to create pg {} at leader, times {}, num_repl {}", pg_id, i, num_repl);
222+
ASSERT_EQ(0, num_repl);
223+
224+
} else {
225+
auto start_time = std::chrono::steady_clock::now();
226+
bool res = true;
227+
// follower need to wait for pg creation
228+
while (!pg_exist(pg_id)) {
229+
auto current_time = std::chrono::steady_clock::now();
230+
auto duration = std::chrono::duration_cast< std::chrono::seconds >(current_time - start_time).count();
231+
if (duration >= 20) {
232+
LOGINFO("Failed to create pg {} at follower", pg_id);
233+
res = false;
234+
break;
235+
}
236+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
237+
}
238+
ASSERT_FALSE(res);
239+
}
240+
}
241+
242+
// test create pg successfully
243+
g_helper->sync();
244+
auto const pg_id = 1;
245+
create_pg(pg_id);
246+
ASSERT_TRUE(pg_exist(pg_id));
247+
LOGINFO("create pg {} successfully", pg_id);
248+
restart();
249+
ASSERT_TRUE(pg_exist(pg_id));
250+
}

0 commit comments

Comments
 (0)