Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cylee99 committed Aug 19, 2024
1 parent e6e1e6c commit 3e61369
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 39 deletions.
16 changes: 8 additions & 8 deletions aws/kinesis/core/retrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ bool Retrier::succeed_if_correct_shard(const std::shared_ptr<UserRecord>& ur,
const boost::optional<std::pair<uint128_t, uint128_t>>& hashrange_actual_shard) {
const uint64_t actual_shard = ShardMap::shard_id_from_str(shard_id);
if (ur->predicted_shard() && *ur->predicted_shard() != actual_shard) {
// retry if shard is not found or hashrange of the user record doesn't fit into the actual shard's hashrange
// retry if shard is not found or hash key of the user record doesn't fit into the actual shard's hashrange
if (!hashrange_actual_shard || !((*hashrange_actual_shard).first <= ur->hash_key() &&
(*hashrange_actual_shard).second >= ur->hash_key())) {
// invalidate because this is a new shard or shard felt outside of actual shards hashrange.
Expand Down Expand Up @@ -231,14 +231,14 @@ bool Retrier::succeed_if_correct_shard(const std::shared_ptr<UserRecord>& ur,
void Retrier::invalidate_cache(const std::shared_ptr<UserRecord>& ur,
const TimePoint start,
const uint64_t& actual_shard,
bool should_invalidate_on_incorrect_shard) {
if (should_invalidate_on_incorrect_shard) {
LOG(warning) << "Record " << ur->source_id() << " went to shard " << actual_shard << " instead of the "
<< "predicted shard " << *ur->predicted_shard() << "; this "
<< "usually means the sharp map has changed.";
const bool should_invalidate_on_incorrect_shard) {
if (should_invalidate_on_incorrect_shard) {
LOG(warning) << "Record " << ur->source_id() << " went to shard " << actual_shard << " instead of the "
<< "predicted shard " << *ur->predicted_shard() << "; this "
<< "usually means the sharp map has changed.";

shard_map_invalidate_cb_(start, ur->predicted_shard());
}
shard_map_invalidate_cb_(start, ur->predicted_shard());
}
}

void Retrier::finish_user_record(const std::shared_ptr<UserRecord>& ur,
Expand Down
41 changes: 23 additions & 18 deletions aws/kinesis/core/shard_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ const std::chrono::milliseconds ShardMap::kClosedShardTtl{60000};

ShardMap::ShardMap(
std::shared_ptr<aws::utils::Executor> executor,
// std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
ListShardsCaller list_shards_caller,
ListShardsCallBack list_shards_callback,
std::string stream,
std::string stream_arn,
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager,
Expand Down Expand Up @@ -204,23 +203,29 @@ void ShardMap::sort_all_open_shards() {

void ShardMap::cleanup() {
while (true) {
std::this_thread::sleep_for(closed_shard_ttl_ / 2);
auto now = std::chrono::steady_clock::now();
// readlock on the main mutex and the state_ check ensures that we are not runing list shards so it's safe to
// clean up the map.
ReadLock lock(mutex_);
// if it's been a while since the last shardmap update, we can remove the unused closed shards.
if (updated_at_ + closed_shard_ttl_ < now && state_ == READY) {
if (open_shards_.size() != shard_id_to_shard_hashkey_cache_.size()) {
WriteLock lock(shard_cache_mutex_);
for (auto it = shard_id_to_shard_hashkey_cache_.begin(); it != shard_id_to_shard_hashkey_cache_.end();) {
if (open_shards_.count(it->first) == 0) {
it = shard_id_to_shard_hashkey_cache_.erase(it);
} else {
++it;
try {
std::this_thread::sleep_for(closed_shard_ttl_ / 2);
const auto now = std::chrono::steady_clock::now();
// readlock on the main mutex and the state_ check ensures that we are not runing list shards so it's safe to
// clean up the map.
ReadLock lock(mutex_);
// if it's been a while since the last shardmap update, we can remove the unused closed shards.
if (updated_at_ + closed_shard_ttl_ < now && state_ == READY) {
if (open_shards_.size() != shard_id_to_shard_hashkey_cache_.size()) {
WriteLock lock(shard_cache_mutex_);
for (auto it = shard_id_to_shard_hashkey_cache_.begin(); it != shard_id_to_shard_hashkey_cache_.end();) {
if (open_shards_.count(it->first) == 0) {
it = shard_id_to_shard_hashkey_cache_.erase(it);
} else {
++it;
}
}
}
}
}
}
} catch (const std::exception &e) {
LOG(error) << "Exception occurred while cleaning up shardmap cache : " << e.what();
} catch (...) {
LOG(error) << "Unknown exception while cleaning up shardmap cache.";
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion aws/kinesis/core/shard_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ShardMap : boost::noncopyable {
std::chrono::milliseconds closed_shard_ttl = kClosedShardTtl);

virtual boost::optional<uint64_t> shard_id(const uint128_t& hash_key);
boost::optional<std::pair<ShardMap::uint128_t, ShardMap::uint128_t>> hashrange(const uint64_t& shard_id);
boost::optional<std::pair<uint128_t, uint128_t>> hashrange(const uint64_t& shard_id);

void invalidate(const TimePoint& seen_at, const boost::optional<uint64_t> predicted_shard);

Expand Down
23 changes: 11 additions & 12 deletions aws/kinesis/core/test/shard_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,44 +83,43 @@ class Wrapper {
Wrapper(
std::list<Aws::Kinesis::Model::ListShardsOutcome> outcomes_list_shards,
int delay = 1500)
: num_req_received_(0) {
mock_kinesis_client_ = std::make_shared<MockKinesisClient>(
: num_req_received_(0),
mock_kinesis_client_(
outcomes_list_shards,
[this] { num_req_received_++; });
shard_map_ =
std::make_shared<aws::kinesis::core::ShardMap>(
[this] { num_req_received_++; }),
shard_map_(
std::make_shared<aws::utils::IoServiceExecutor>(1),
[this](auto& req, auto& handler, auto& context) { mock_kinesis_client_->ListShardsAsync(req, handler, context); },
[this](auto& req, auto& handler, auto& context) { mock_kinesis_client_.ListShardsAsync(req, handler, context); },
kStreamName,
kStreamARN,
std::make_shared<aws::metrics::NullMetricsManager>(),
std::chrono::milliseconds(100),
std::chrono::milliseconds(1000),
std::chrono::milliseconds(100));
std::chrono::milliseconds(100)) {

aws::utils::sleep_for(std::chrono::milliseconds(delay));
}

boost::optional<uint64_t> shard_id(const char* key) {
return shard_map_->shard_id(uint128_t(std::string(key)));
return shard_map_.shard_id(uint128_t(std::string(key)));
}

boost::optional<std::pair<uint128_t, uint128_t>> hashrange(const uint64_t shard_id) {
return shard_map_->hashrange(shard_id);
return shard_map_.hashrange(shard_id);
}

size_t num_req_received() const {
return num_req_received_;
}

void invalidate(std::chrono::steady_clock::time_point tp, boost::optional<uint64_t> shard_id) {
shard_map_->invalidate(tp, shard_id);
shard_map_.invalidate(tp, shard_id);
}

private:
size_t num_req_received_;
std::shared_ptr<aws::kinesis::core::ShardMap> shard_map_;
std::shared_ptr<MockKinesisClient> mock_kinesis_client_;
MockKinesisClient mock_kinesis_client_;
aws::kinesis::core::ShardMap shard_map_;
};


Expand Down

0 comments on commit 3e61369

Please sign in to comment.