Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retry logic during stream scaling by succeeding records that lande… #576

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug)
endif(CMAKE_BUILD_TYPE MATCHES Debug)

if(APPLE)
add_compile_options("-Wno-enum-constexpr-conversion")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mmacosx-version-min=10.13 -framework Foundation -framework SystemConfiguration")
endif()

Expand Down
3 changes: 3 additions & 0 deletions aws/kinesis/core/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class Aggregator : boost::noncopyable {
}
if (!shard_id) {
auto kr = std::make_shared<KinesisRecord>();
// during retries, the records can have the predicted shard set from the last run. Clearing out the state here
// because retrier expects these records to not have predicted shard so they don't get retried due to this.
ur->reset_predicted_shard();
kr->add(ur);
return kr;
} else {
Expand Down
4 changes: 3 additions & 1 deletion aws/kinesis/core/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <aws/kinesis/core/put_records_context.h>
#include <aws/kinesis/core/retrier.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/kinesis/model/ListShardsRequest.h>
#include <aws/metrics/metrics_manager.h>
#include <aws/utils/processing_statistics_logger.h>
#include <aws/sts/STSClient.h>
Expand Down Expand Up @@ -69,7 +70,7 @@ class Pipeline : boost::noncopyable {
shard_map_(
std::make_shared<ShardMap>(
executor_,
kinesis_client_,
[this](auto& req, auto& handler, auto& context) { kinesis_client_->ListShardsAsync(req, handler, context); },
stream_,
stream_arn_,
metrics_manager_)),
Expand Down Expand Up @@ -99,6 +100,7 @@ class Pipeline : boost::noncopyable {
config_,
[this](auto& ur) { this->finish_user_record(ur); },
[this](auto& ur) { this->aggregator_put(ur); },
[this](auto& actual_shard) { return shard_map_->hashrange(actual_shard); },
[this](auto& tp, auto predicted_shard) { shard_map_->invalidate(tp, predicted_shard); },
[this](auto& code, auto& msg) {
limiter_->add_error(code, msg);
Expand Down
72 changes: 44 additions & 28 deletions aws/kinesis/core/retrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ Retrier::handle_put_records_result(std::shared_ptr<PutRecordsContext> prc) {
//So either all user records in a kinesis record landed on the correct shard
//or none did. So we can invalidate just once per kinesis record.
bool should_invalidate_on_incorrect_shard = true;
for (auto& ur : kr->items()) {
auto hashrange_actual_shard = shard_map_hashrange_cb_(ShardMap::shard_id_from_str(put_result.GetShardId()));
for (auto& ur : kr->items()) {
should_invalidate_on_incorrect_shard &= succeed_if_correct_shard(ur,
start,
end,
put_result.GetShardId(),
put_result.GetSequenceNumber(),
should_invalidate_on_incorrect_shard);
should_invalidate_on_incorrect_shard,
hashrange_actual_shard);
}
} else {
auto& err_code = put_result.GetErrorCode();
Expand Down Expand Up @@ -155,7 +157,7 @@ void Retrier::retry_not_expired(const std::shared_ptr<UserRecord>& ur,
std::chrono::steady_clock::now(),
std::chrono::steady_clock::now(),
"Expired",
"Record has reached expiration");
"Record " + std::to_string(ur->source_id()) +" has reached expiration");
} else {
// TimeSensitive automatically sets the deadline to the expiration if
// the given deadline is later than the expiration.
Expand Down Expand Up @@ -192,37 +194,51 @@ void Retrier::fail(const std::shared_ptr<UserRecord>& ur,
bool Retrier::succeed_if_correct_shard(const std::shared_ptr<UserRecord>& ur,
TimePoint start,
TimePoint end,
const std::string& shard_id,
const std::string& shard_id,
const std::string& sequence_number,
const bool should_invalidate_on_incorrect_shard) {
const bool should_invalidate_on_incorrect_shard,
const boost::optional<std::pair<uint128_t, uint128_t>>& hashrange_actual_shard) {
const uint64_t actual_shard = ShardMap::shard_id_from_str(shard_id);
if (ur->predicted_shard() &&
*ur->predicted_shard() != actual_shard) {
//We should call invalidate only if:
// 1. If we are told to invalidate on incorrect shard.
if (should_invalidate_on_incorrect_shard) {
LOG(warning) << "Record went to shard " << shard_id << " instead of the "
if (ur->predicted_shard() && *ur->predicted_shard() != actual_shard) {
// retry if shard is not found or hashrange of the user record doesn't fit into the actual shard's hashrange
if (!hashrange_actual_shard || !((*hashrange_actual_shard).first <= ur->hash_key() &&
chenylee-aws marked this conversation as resolved.
Show resolved Hide resolved
(*hashrange_actual_shard).second >= ur->hash_key())) {
// invalidate because this is a new shard or shard felt outside of actual shards hashrange.
invalidate_cache(ur, start, actual_shard, should_invalidate_on_incorrect_shard);

retry_not_expired(ur,
start,
end,
"Wrong Shard",
"Record " + std::to_string(ur->source_id()) + " did not end up in expected shard.");
return false;
}
// child shard is numbered higher than the ancestor. if we landed on a child shard it means the parent shard can
// be removed now from the in-memory map. This will help the shardmap to converge
else if (*ur->predicted_shard() < actual_shard) {
invalidate_cache(ur, start, actual_shard, should_invalidate_on_incorrect_shard);
}
}
finish_user_record(
ur,
Attempt()
.set_start(start)
.set_end(end)
.set_result(shard_id, sequence_number));
return true;
}

void Retrier::invalidate_cache(const std::shared_ptr<UserRecord>& ur,
const TimePoint start,
const uint64_t& actual_shard,
bool should_invalidate_on_incorrect_shard) {
if (should_invalidate_on_incorrect_shard) {
chenylee-aws marked this conversation as resolved.
Show resolved Hide resolved
LOG(warning) << "Record " << ur->source_id() << " went to shard " << actual_shard << " instead of the "
<< "predicted shard " << *ur->predicted_shard() << "; this "
<< "usually means the sharp map has changed.";
<< "usually means the sharp map has changed.";

shard_map_invalidate_cb_(start, ur->predicted_shard());
}

retry_not_expired(ur,
start,
end,
"Wrong Shard",
"Record did not end up in expected shard.");
return false;
} else {
finish_user_record(
ur,
Attempt()
.set_start(start)
.set_end(end)
.set_result(shard_id, sequence_number));
return true;
}
}

void Retrier::finish_user_record(const std::shared_ptr<UserRecord>& ur,
Expand Down
15 changes: 14 additions & 1 deletion aws/kinesis/core/retrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <aws/kinesis/core/put_records_context.h>
#include <aws/kinesis/core/put_records_request.h>
#include <aws/kinesis/core/shard_map.h>
#include <aws/kinesis/model/Shard.h>
#include <aws/metrics/metrics_manager.h>

namespace aws {
Expand Down Expand Up @@ -53,25 +54,29 @@ class MetricsPutter {

class Retrier {
public:
using uint128_t = boost::multiprecision::uint128_t;
using Configuration = aws::kinesis::core::Configuration;
using TimePoint = std::chrono::steady_clock::time_point;
// using Result = std::shared_ptr<aws::http::HttpResult>;
using UserRecordCallback =
std::function<void (const std::shared_ptr<UserRecord>&)>;
using ShardMapGetHashrangeCallback = std::function<boost::optional<std::pair<uint128_t, uint128_t>> (const uint64_t&)>;
using ShardMapInvalidateCallback = std::function<void (const TimePoint&, const boost::optional<uint64_t>)>;
using ErrorCallback =
std::function<void (const std::string&, const std::string&)>;

Retrier(std::shared_ptr<Configuration> config,
UserRecordCallback finish_cb,
UserRecordCallback retry_cb,
ShardMapGetHashrangeCallback shard_map_hashrange_cb,
ShardMapInvalidateCallback shard_map_invalidate_cb,
ErrorCallback error_cb = ErrorCallback(),
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager =
std::make_shared<aws::metrics::NullMetricsManager>())
: config_(config),
finish_cb_(finish_cb),
retry_cb_(retry_cb),
shard_map_hashrange_cb_(shard_map_hashrange_cb),
shard_map_invalidate_cb_(shard_map_invalidate_cb),
error_cb_(error_cb),
metrics_manager_(metrics_manager) {}
Expand Down Expand Up @@ -119,7 +124,8 @@ class Retrier {
TimePoint end,
const std::string& shard_id,
const std::string& sequence_number,
const bool should_invalidate_on_incorrect_shard);
const bool should_invalidate_on_incorrect_shard,
const boost::optional<std::pair<uint128_t, uint128_t>>& hashrange_actual_shard);

void finish_user_record(const std::shared_ptr<UserRecord>& ur,
const Attempt& final_attempt);
Expand All @@ -128,12 +134,19 @@ class Retrier {

void emit_metrics(const std::shared_ptr<PutRecordsContext>& prc);

void invalidate_cache(const std::shared_ptr<UserRecord>& ur,
const TimePoint start,
const uint64_t& actual_shard,
bool should_invalidate_on_incorrect_shard);

std::shared_ptr<Configuration> config_;
UserRecordCallback finish_cb_;
UserRecordCallback retry_cb_;
ShardMapGetHashrangeCallback shard_map_hashrange_cb_;
ShardMapInvalidateCallback shard_map_invalidate_cb_;
ErrorCallback error_cb_;
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager_;
std::shared_ptr<ShardMap> shard_map_;
};

} //namespace core
Expand Down
90 changes: 61 additions & 29 deletions aws/kinesis/core/shard_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* limitations under the License.
*/

#include <thread>
#include <aws/kinesis/core/shard_map.h>

#include <aws/kinesis/model/ListShardsRequest.h>
Expand All @@ -23,25 +24,31 @@ namespace core {

const std::chrono::milliseconds ShardMap::kMinBackoff{1000};
const std::chrono::milliseconds ShardMap::kMaxBackoff{30000};
const std::chrono::milliseconds ShardMap::kClosedShardTtl{60000};

ShardMap::ShardMap(
std::shared_ptr<aws::utils::Executor> executor,
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
// std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client,
ListShardsCaller list_shards_caller,
std::string stream,
std::string stream_arn,
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager,
std::chrono::milliseconds min_backoff,
std::chrono::milliseconds max_backoff)
std::chrono::milliseconds max_backoff,
std::chrono::milliseconds closed_shard_ttl)
: executor_(std::move(executor)),
kinesis_client_(std::move(kinesis_client)),
stream_(std::move(stream)),
stream_arn_(std::move(stream_arn)),
metrics_manager_(std::move(metrics_manager)),
state_(INVALID),
min_backoff_(min_backoff),
max_backoff_(max_backoff),
backoff_(min_backoff_) {
closed_shard_ttl_(closed_shard_ttl),
backoff_(min_backoff_),
list_shards_callback_(list_shards_callback) {
update();
std::thread cleanup_thread_(&ShardMap::cleanup, this);
cleanup_thread_.detach();
}

boost::optional<uint64_t> ShardMap::shard_id(const uint128_t& hash_key) {
Expand All @@ -65,13 +72,21 @@ boost::optional<uint64_t> ShardMap::shard_id(const uint128_t& hash_key) {
return boost::none;
}

boost::optional<std::pair<ShardMap::uint128_t, ShardMap::uint128_t>> ShardMap::hashrange(const uint64_t& shard_id) {
ReadLock lock(shard_cache_mutex_);
const auto& it = shard_id_to_shard_hashkey_cache_.find(shard_id);
if (it != shard_id_to_shard_hashkey_cache_.end()) {
return it->second;
}
return boost::none;
}


void ShardMap::invalidate(const TimePoint& seen_at, const boost::optional<uint64_t> predicted_shard) {
WriteLock lock(mutex_);

if (seen_at > updated_at_ && state_ == READY) {
if (!predicted_shard || std::binary_search(open_shard_ids_.begin(), open_shard_ids_.end(), *predicted_shard)) {
if (!predicted_shard || open_shards_.count(*predicted_shard)) {
std::chrono::duration<double, std::milli> fp_ms = seen_at - updated_at_;
LOG(info) << "Deciding to update shard map for \"" << stream_
<<"\" with a gap between seen_at and updated_at_ of " << fp_ms.count() << " ms " << "predicted shard: " << predicted_shard;
Expand Down Expand Up @@ -110,13 +125,12 @@ void ShardMap::list_shards(const Aws::String& next_token) {
shardFilter.SetType(Aws::Kinesis::Model::ShardFilterType::AT_LATEST);
req.SetShardFilter(shardFilter);
}

kinesis_client_->ListShardsAsync(
req,
[this](auto /*client*/, auto& /*req*/, auto& outcome, auto& /*ctx*/) {
this->list_shards_callback(outcome);
},
std::shared_ptr<const Aws::Client::AsyncCallerContext>());
list_shards_callback_(
req,
[this](auto /*client*/, auto& /*req*/, auto& outcome, auto& /*ctx*/) {
this->list_shards_callback(outcome);
},
std::shared_ptr<const Aws::Client::AsyncCallerContext>());
}

void ShardMap::list_shards_callback(
Expand All @@ -126,16 +140,21 @@ void ShardMap::list_shards_callback(
update_fail(e.GetExceptionName(), e.GetMessage());
return;
}

auto& shards = outcome.GetResult().GetShards();
for (auto& shard : shards) {
// We use shard filter for server end to filter out closed shards
store_open_shard(shard_id_from_str(shard.GetShardId()),
uint128_t(shard.GetHashKeyRange().GetEndingHashKey()));
}

{
WriteLock lock(shard_cache_mutex_);
for (auto& shard : shards) {
const auto& range = shard.GetHashKeyRange();
const auto& hashkey_start = uint128_t(range.GetStartingHashKey());
const auto& hashkey_end = uint128_t(range.GetEndingHashKey());
const auto& shard_id = shard_id_from_str(shard.GetShardId());
end_hash_key_to_shard_id_.push_back({hashkey_end, shard_id});
open_shards_.insert({shard_id, shard});
shard_id_to_shard_hashkey_cache_.insert({shard_id, {hashkey_start, hashkey_end}});
}
}
backoff_ = min_backoff_;

auto& next_token = outcome.GetResult().GetNextToken();
if (!next_token.empty()) {
list_shards(next_token);
Expand All @@ -147,7 +166,6 @@ void ShardMap::list_shards_callback(
WriteLock lock(mutex_);
state_ = READY;
updated_at_ = std::chrono::steady_clock::now();

LOG(info) << "Successfully updated shard map for stream \""
<< stream_ << (stream_arn_.empty() ? "\"" : "\" (arn: \"" + stream_arn_ + "\"). Found ")
<< end_hash_key_to_shard_id_.size() << " shards";
Expand All @@ -174,24 +192,38 @@ void ShardMap::update_fail(const std::string &code, const std::string &msg) {
backoff_ = std::min(backoff_ * 3 / 2, max_backoff_);
}


void ShardMap::clear_all_stored_shards() {
end_hash_key_to_shard_id_.clear();
open_shard_ids_.clear();
}

void ShardMap::store_open_shard(const uint64_t shard_id, const uint128_t end_hash_key) {
end_hash_key_to_shard_id_.push_back(
std::make_pair(end_hash_key, shard_id));
open_shard_ids_.push_back(shard_id);
open_shards_.clear();
}

void ShardMap::sort_all_open_shards() {
std::sort(end_hash_key_to_shard_id_.begin(),
chenylee-aws marked this conversation as resolved.
Show resolved Hide resolved
end_hash_key_to_shard_id_.end());
std::sort(open_shard_ids_.begin(), open_shard_ids_.end());
}

void ShardMap::cleanup() {
while (true) {
std::this_thread::sleep_for(closed_shard_ttl_ / 2);
chenylee-aws marked this conversation as resolved.
Show resolved Hide resolved
auto now = std::chrono::steady_clock::now();
// readlock on the main mutex and the state_ check ensures that we are not runing list shards so it's safe to
// clean up the map.
ReadLock lock(mutex_);
// if it's been a while since the last shardmap update, we can remove the unused closed shards.
if (updated_at_ + closed_shard_ttl_ < now && state_ == READY) {
if (open_shards_.size() != shard_id_to_shard_hashkey_cache_.size()) {
WriteLock lock(shard_cache_mutex_);
for (auto it = shard_id_to_shard_hashkey_cache_.begin(); it != shard_id_to_shard_hashkey_cache_.end();) {
if (open_shards_.count(it->first) == 0) {
it = shard_id_to_shard_hashkey_cache_.erase(it);
} else {
++it;
}
}
}
}
}
}

} //namespace core
} //namespace kinesis
Expand Down
Loading