Skip to content

Commit

Permalink
Homeobject side graceful shutdwon
Browse files Browse the repository at this point in the history
this PR add counters to record the pending request and start shutdown
homestore when no pending request
  • Loading branch information
JacksonYao287 committed Jan 15, 2025
1 parent 77d89ef commit ab6ba94
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 31 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.2.1"
version = "2.2.2"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
6 changes: 4 additions & 2 deletions src/include/homeobject/blob_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
namespace homeobject {

ENUM(BlobErrorCode, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, UNSUPPORTED_OP, NOT_LEADER, REPLICATION_ERROR,
UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD, RETRY_REQUEST);
UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD, RETRY_REQUEST,
SHUTTING_DOWN);
struct BlobError {
BlobErrorCode code;
// set when we are not the current leader of the PG.
Expand All @@ -27,7 +28,8 @@ struct BlobError {
struct Blob {
Blob() = default;
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o) : body(std::move(b)), user_key(u), object_off(o) {}
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o, peer_id_t l) : body(std::move(b)), user_key(u), object_off(o), current_leader(l) {}
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o, peer_id_t l) :
body(std::move(b)), user_key(u), object_off(o), current_leader(l) {}

Blob clone() const;

Expand Down
2 changes: 1 addition & 1 deletion src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace homeobject {

ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP,
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST);
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR, RETRY_REQUEST, SHUTTING_DOWN);

struct PGMember {
// Max length is based on homestore::replica_member_info::max_name_len - 1. Last byte is null terminated.
Expand Down
4 changes: 2 additions & 2 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace homeobject {

ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNSUPPORTED_OP, UNKNOWN_PG, UNKNOWN_SHARD,
PG_NOT_READY, CRC_MISMATCH, NO_SPACE_LEFT, RETRY_REQUEST);
PG_NOT_READY, CRC_MISMATCH, NO_SPACE_LEFT, RETRY_REQUEST, SHUTTING_DOWN);

struct ShardInfo {
enum class State : uint8_t {
Expand All @@ -22,7 +22,7 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn; // created_lsn
uint64_t lsn; // created_lsn
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
49 changes: 43 additions & 6 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj
};

BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) {
if (is_shutting_down()) {
LOGI("service is being shut down");
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
}
incr_pending_request_num();

auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
blob_id_t new_blob_id;
Expand All @@ -103,11 +109,13 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s

if (!repl_dev->is_leader()) {
LOGW("failed to put blob for pg [{}], shard [{}], not leader", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

if (!repl_dev->is_ready_for_traffic()) {
LOGW("failed to put blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST));
}

Expand Down Expand Up @@ -242,7 +250,10 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
auto msg_header = r_cast< ReplicationMessageHeader const* >(header.cbytes());
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); }
if (ctx) {
ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH)));
decr_pending_request_num();
}
return;
}

Expand All @@ -259,11 +270,18 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
if (ctx) {
ctx->promise_.setValue(success ? BlobManager::Result< BlobInfo >(blob_info)
: folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR)));
decr_pending_request_num();
}
}

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
uint64_t req_len) const {
if (is_shutting_down()) {
LOGI("service is being shut down");
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
}
incr_pending_request_num();

auto& pg_id = shard.placement_group;
shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
Expand All @@ -280,12 +298,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,

if (!repl_dev->is_ready_for_traffic()) {
LOGW("failed to get blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST));
}

auto r = get_blob_from_index_table(index_table, shard.id, blob_id);
if (!r) {
BLOGE(shard.id, blob_id, "Blob not found in index during get blob");
decr_pending_request_num();
return folly::makeUnexpected(r.error());
}

Expand All @@ -309,17 +329,20 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > {
if (result) {
BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value());
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
}

BlobHeader const* header = r_cast< BlobHeader const* >(read_buf.cbytes());
if (!header->valid()) {
BLOGE(shard_id, blob_id, "Invalid header found: [header={}]", header->to_string());
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
}

if (header->shard_id != shard_id) {
BLOGE(shard_id, blob_id, "Invalid shard_id in header: [header={}]", header->to_string());
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
}

Expand All @@ -336,12 +359,14 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) {
BLOGE(shard_id, blob_id, "Hash mismatch header [{}] [computed={:np}]", header->to_string(),
spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len));
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH));
}

if (req_offset + req_len > header->blob_size) {
BLOGE(shard_id, blob_id, "Invalid offset length requested in get blob offset={} len={} size={}",
req_offset, req_len, header->blob_size);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::INVALID_ARG));
}

Expand All @@ -352,6 +377,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< home
std::memcpy(body.bytes(), blob_bytes + req_offset, res_len);

BLOGT(blob_id, shard_id, "Blob get success: blkid={}", blkid.to_string());
decr_pending_request_num();
return Blob(std::move(body), std::move(user_key), header->object_offset, repl_dev->get_leader_id());
});
}
Expand All @@ -366,14 +392,16 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error shard:{}", msg_header->shard_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); }
if (ctx) {
ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH)));
decr_pending_request_num();
}
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

auto pg_iter = _pg_map.find(msg_header->pg_id);
if (pg_iter == _pg_map.end()) {
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later",
msg_header->pg_id);
LOGW("Received a blob_put on an unknown pg:{}, underlying engine will retry this later", msg_header->pg_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

Expand All @@ -392,7 +420,7 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;

if (msg_header->blob_id != 0) {
//check if the blob already exists, if yes, return the blk id
// check if the blob already exists, if yes, return the blk id
auto index_table = d_cast< HS_PG* >(pg_iter->second.get())->index_table_;
auto r = get_blob_from_index_table(index_table, msg_header->shard_id, msg_header->blob_id);
if (r.hasValue()) {
Expand All @@ -405,6 +433,12 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
}

BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
if (is_shutting_down()) {
LOGI("service is being shut down");
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
}
incr_pending_request_num();

BLOGT(shard.id, blob_id, "deleting blob");
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
Expand All @@ -419,11 +453,13 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo

if (!repl_dev->is_leader()) {
LOGW("failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader", pg_id, shard.id, blob_id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

if (!repl_dev->is_ready_for_traffic()) {
LOGW("failed to del blob for pg [{}], shard [{}], not ready for traffic", pg_id, shard.id);
decr_pending_request_num();
return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST));
}

Expand All @@ -441,7 +477,8 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
std::memcpy(req->key_buf().bytes(), &blob_id, sizeof(blob_id_t));

repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), sisl::sg_list{}, req);
return req->result().deferValue([repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
return req->result().deferValue([this, repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
decr_pending_request_num();
if (result.hasError()) {
auto err = result.error();
if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); }
Expand Down
19 changes: 15 additions & 4 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ class HSReplApplication : public homestore::ReplApplication {
return it->second;
}

void on_repl_devs_init_completed() override {
_home_object->on_replica_restart();
}
void on_repl_devs_init_completed() override { _home_object->on_replica_restart(); }

std::pair< std::string, uint16_t > lookup_peer(homestore::replica_id_t uuid) const override {
std::string endpoint;
Expand Down Expand Up @@ -298,9 +296,22 @@ HSHomeObject::~HSHomeObject() {
}
trigger_timed_events();
#endif

start_shutting_down();
// Wait for all pending requests to complete
while (true) {
auto pending_reqs = get_pending_request_num();
if (0 == pending_reqs) break;
LOGI("waiting for {} pending requests to complete", pending_reqs);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
LOGI("start shutting down HomeStore");

homestore::HomeStore::instance()->shutdown();
homestore::HomeStore::reset_instance();
iomanager.stop();

LOGI("complete shutting down HomeStore");
}

HomeObjectStats HSHomeObject::_get_stats() const {
Expand All @@ -318,7 +329,7 @@ HomeObjectStats HSHomeObject::_get_stats() const {

stats.num_open_shards = num_open_shards;
stats.avail_open_shards = chunk_selector()->total_chunks() - num_open_shards;
stats.num_disks = chunk_selector()->total_disks();
stats.num_disks = chunk_selector()->total_disks();
return stats;
}

Expand Down
23 changes: 19 additions & 4 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class HSHomeObject : public HomeObjectImpl {
homestore::chunk_num_t p_chunk_id;
homestore::chunk_num_t v_chunk_id;
};
//TODO this blk is used to store snapshot metadata/status for recovery
// TODO this blk is used to store snapshot metadata/status for recovery
struct snapshot_info_superblk {};
#pragma pack()

Expand Down Expand Up @@ -318,7 +318,8 @@ class HSHomeObject : public HomeObjectImpl {
bool update_cursor(objId id);
objId expected_next_obj_id();
bool generate_shard_blob_list();
BlobManager::AsyncResult< sisl::io_blob_safe > load_blob_data(const BlobInfo& blob_info, ResyncBlobState& state);
BlobManager::AsyncResult< sisl::io_blob_safe > load_blob_data(const BlobInfo& blob_info,
ResyncBlobState& state);
bool create_pg_snapshot_data(sisl::io_blob_safe& meta_blob);
bool create_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
bool create_blobs_snapshot_data(sisl::io_blob_safe& data_blob);
Expand All @@ -334,7 +335,7 @@ class HSHomeObject : public HomeObjectImpl {

objId cur_obj_id_{0, 0};
int64_t cur_shard_idx_{-1};
std::vector<BlobInfo> cur_blob_list_{0};
std::vector< BlobInfo > cur_blob_list_{0};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
flatbuffers::FlatBufferBuilder builder_;
Expand Down Expand Up @@ -579,7 +580,7 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const &blob_info);
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const& blob_info);
homestore::ReplResult< homestore::blk_alloc_hints >
blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx);
void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size,
Expand Down Expand Up @@ -642,6 +643,20 @@ class HSHomeObject : public HomeObjectImpl {
* @param pg_id The ID of the PG to be cleaned.
*/
void cleanup_pg_resources(pg_id_t pg_id);

// graceful shutdown related
private:
std::atomic_bool shutting_down{false};
mutable std::atomic_uint64_t pending_request_num{0};

bool is_shutting_down() const { return shutting_down.load(); }
void start_shutting_down() { shutting_down = true; }

uint64_t get_pending_request_num() const { return pending_request_num.load(); }

// only leader will call incr and decr pending request num
void incr_pending_request_num() const { pending_request_num++; }
void decr_pending_request_num() const { pending_request_num--; }
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
Loading

0 comments on commit ab6ba94

Please sign in to comment.