Skip to content

Commit

Permalink
Flush Reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
pfifer committed May 15, 2017
1 parent 2722572 commit 55ca106
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 21 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ set(SOURCE_FILES
aws/utils/time_sensitive_queue.h
aws/utils/token_bucket.h
aws/utils/utils.cc
aws/utils/utils.h)
aws/utils/utils.h
aws/utils/processing_statistics_logger.cc
aws/utils/processing_statistics_logger.h)



Expand Down
8 changes: 7 additions & 1 deletion aws/kinesis/core/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <aws/kinesis/core/configuration.h>
#include <aws/utils/concurrent_hash_map.h>
#include <aws/utils/executor.h>
#include <aws/utils/processing_statistics_logger.h>

namespace aws {
namespace kinesis {
Expand All @@ -41,12 +42,14 @@ class Aggregator : boost::noncopyable {
const std::shared_ptr<ShardMap> shard_map,
const DeadlineCallback& deadline_callback,
const std::shared_ptr<aws::kinesis::core::Configuration>& config,
aws::utils::flush_statistics_aggregator& flush_stats,
const std::shared_ptr<aws::metrics::MetricsManager>& metrics_manager =
std::make_shared<aws::metrics::NullMetricsManager>())
: executor_(executor),
shard_map_(shard_map),
deadline_callback_(deadline_callback),
config_(config),
flush_stats_(flush_stats),
metrics_manager_(metrics_manager),
reducers_([this](auto) { return this->make_reducer(); }) {}

Expand Down Expand Up @@ -79,14 +82,17 @@ class Aggregator : boost::noncopyable {
executor_,
deadline_callback_,
config_->aggregation_max_size(),
config_->aggregation_max_count());
config_->aggregation_max_count(),
flush_stats_
);
}

std::shared_ptr<aws::utils::Executor> executor_;
std::shared_ptr<ShardMap> shard_map_;
DeadlineCallback deadline_callback_;
std::shared_ptr<aws::kinesis::core::Configuration> config_;
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager_;
aws::utils::flush_statistics_aggregator& flush_stats_;
ReducerMap reducers_;
};

Expand Down
3 changes: 3 additions & 0 deletions aws/kinesis/core/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <aws/kinesis/core/reducer.h>
#include <aws/kinesis/core/configuration.h>
#include <aws/utils/concurrent_hash_map.h>
#include <aws/utils/processing_statistics_logger.h>

namespace aws {
namespace kinesis {
Expand All @@ -32,13 +33,15 @@ class Collector : boost::noncopyable {
const std::shared_ptr<aws::utils::Executor>& executor,
const FlushCallback& flush_callback,
const std::shared_ptr<aws::kinesis::core::Configuration>& config,
aws::utils::flush_statistics_aggregator& flush_stats,
const std::shared_ptr<aws::metrics::MetricsManager>& metrics_manager =
std::make_shared<aws::metrics::NullMetricsManager>())
: flush_callback_(flush_callback),
reducer_(executor,
[this](auto prr) { this->handle_flush(std::move(prr)); },
config->collection_max_size(),
config->collection_max_count(),
flush_stats,
[this](auto kr) { return this->should_flush(kr); }),
buffered_data_([](auto) { return new std::atomic<size_t>(0); }) {}

Expand Down
32 changes: 23 additions & 9 deletions aws/kinesis/core/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#define AWS_KINESIS_CORE_PIPELINE_H_

#include <boost/format.hpp>
#include <iomanip>

#include <aws/kinesis/core/aggregator.h>
#include <aws/kinesis/core/collector.h>
Expand All @@ -25,6 +26,7 @@
#include <aws/kinesis/core/retrier.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/metrics/metrics_manager.h>
#include <aws/utils/processing_statistics_logger.h>

namespace aws {
namespace kinesis {
Expand All @@ -46,6 +48,7 @@ class Pipeline : boost::noncopyable {
: stream_(std::move(stream)),
region_(std::move(region)),
config_(std::move(config)),
stats_logger_(stream_, config_->record_max_buffered_time()),
executor_(std::move(executor)),
kinesis_client_(std::move(kinesis_client)),
metrics_manager_(std::move(metrics_manager)),
Expand All @@ -58,11 +61,12 @@ class Pipeline : boost::noncopyable {
metrics_manager_)),
aggregator_(
std::make_shared<Aggregator>(
executor_,
shard_map_,
[this](auto kr) { this->limiter_put(kr); },
config_,
metrics_manager_)),
executor_,
shard_map_,
[this](auto kr) { this->limiter_put(kr); },
config_,
stats_logger_.stage1(),
metrics_manager_)),
limiter_(
std::make_shared<Limiter>(
executor_,
Expand All @@ -71,10 +75,11 @@ class Pipeline : boost::noncopyable {
config_)),
collector_(
std::make_shared<Collector>(
executor_,
[this](auto prr) { this->send_put_records_request(prr); },
config_,
metrics_manager_)),
executor_,
[this](auto prr) { this->send_put_records_request(prr); },
config_,
stats_logger_.stage2(),
metrics_manager_)),
retrier_(
std::make_shared<Retrier>(
config_,
Expand Down Expand Up @@ -111,6 +116,7 @@ class Pipeline : boost::noncopyable {
}

private:

void aggregator_put(const std::shared_ptr<UserRecord>& ur) {
auto kr = aggregator_->put(ur);
if (kr) {
Expand Down Expand Up @@ -148,6 +154,7 @@ class Pipeline : boost::noncopyable {
sdk_ctx));
ctx->set_end(std::chrono::steady_clock::now());
ctx->set_outcome(outcome);
this->request_completed(ctx);
// At the time of writing, the SDK can spawn a large number of
// threads in order to achieve request parallelism. These threads will
// later put items into the IPC manager after they finish the logic in
Expand All @@ -161,6 +168,10 @@ class Pipeline : boost::noncopyable {
prc);
}

void request_completed(std::shared_ptr<PutRecordsContext> context) {
stats_logger_.request_complete(context);
}

void retrier_put_kr(const std::shared_ptr<KinesisRecord>& kr) {
executor_->submit([=] {
retrier_->put(kr,
Expand All @@ -172,6 +183,7 @@ class Pipeline : boost::noncopyable {
std::string stream_;
std::string region_;
std::shared_ptr<Configuration> config_;
aws::utils::processing_statistics_logger stats_logger_;
std::shared_ptr<aws::utils::Executor> executor_;
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client_;
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager_;
Expand All @@ -185,6 +197,8 @@ class Pipeline : boost::noncopyable {

std::shared_ptr<aws::metrics::Metric> user_records_rcvd_metric_;
std::atomic<uint64_t> outstanding_user_records_;


};

} //namespace core
Expand Down
30 changes: 22 additions & 8 deletions aws/kinesis/core/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include <mutex>

#include <aws/utils/logging.h>

#include <aws/utils/executor.h>
#include <aws/utils/processing_statistics_logger.h>
#include <aws/mutex.h>

namespace aws {
Expand Down Expand Up @@ -57,11 +57,13 @@ class Reducer : boost::noncopyable {
const std::function<void (std::shared_ptr<U>)>& flush_callback,
size_t size_limit,
size_t count_limit,
aws::utils::flush_statistics_aggregator& flush_stats,
FlushPredicate flush_predicate = [](auto) { return false; })
: executor_(executor),
flush_callback_(flush_callback),
size_limit_(size_limit),
count_limit_(count_limit),
flush_stats_(flush_stats),
flush_predicate_(flush_predicate),
container_(std::make_shared<U>()),
scheduled_callback_(
Expand All @@ -79,11 +81,13 @@ class Reducer : boost::noncopyable {
auto size = container_->size();
auto estimated_size = container_->estimated_size();
auto flush_predicate_result = flush_predicate_(input);
if (size >= count_limit_ ||
estimated_size >= size_limit_ ||
flush_predicate_result) {

auto output = flush(lock);
aws::utils::flush_statistics_context flush_reason;
flush_reason.record_count(size >= count_limit_).data_size(estimated_size >= size_limit_)
.predicate_match(flush_predicate_result);

if (flush_reason.flush_required()) {
auto output = flush(lock, flush_reason);
if (output && output->size() > 0) {
return output;
}
Expand All @@ -96,7 +100,9 @@ class Reducer : boost::noncopyable {

// Manually trigger a flush, as though a deadline has been reached
void flush() {
deadline_reached();
aws::utils::flush_statistics_context flush_reason;
flush_reason.manual(true);
trigger_flush(flush_reason);
}

// Records in the process of being flushed won't be counted
Expand All @@ -116,7 +122,7 @@ class Reducer : boost::noncopyable {
using Mutex = aws::mutex;
using Lock = aws::unique_lock<Mutex>;

std::shared_ptr<U> flush(Lock& lock) {
std::shared_ptr<U> flush(Lock &lock, aws::utils::flush_statistics_context &flush_reason) {
if (!lock) {
lock.lock();
}
Expand Down Expand Up @@ -154,6 +160,7 @@ class Reducer : boost::noncopyable {
}

set_deadline();
flush_stats_.merge(flush_reason, flush_container->size());

return flush_container;
}
Expand All @@ -171,8 +178,14 @@ class Reducer : boost::noncopyable {
}

void deadline_reached() {
aws::utils::flush_statistics_context flush_reason;
flush_reason.timed(true);
trigger_flush(flush_reason);
}

void trigger_flush(aws::utils::flush_statistics_context &reason) {
Lock lock(lock_);
auto r = flush(lock);
auto r = flush(lock, reason);
lock.unlock();
if (r && r->size() > 0) {
flush_callback_(r);
Expand All @@ -187,6 +200,7 @@ class Reducer : boost::noncopyable {
Mutex lock_;
std::shared_ptr<U> container_;
std::shared_ptr<aws::utils::ScheduledCallback> scheduled_callback_;
aws::utils::flush_statistics_aggregator& flush_stats_;
};

} //namespace core
Expand Down
6 changes: 5 additions & 1 deletion aws/kinesis/core/test/aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <aws/kinesis/core/test/test_utils.h>
#include <aws/utils/io_service_executor.h>
#include <aws/utils/utils.h>
#include <aws/utils/processing_statistics_logger.h>

namespace {

Expand Down Expand Up @@ -67,6 +68,8 @@ class MockShardMap : public aws::kinesis::core::ShardMap {
std::vector<boost::multiprecision::uint128_t> limits_;
};

aws::utils::flush_statistics_aggregator flush_stats("Test", "InRecords", "OutRecords");

auto make_aggregator(
bool shard_map_down = false,
FlushCallback cb = [](auto) {},
Expand All @@ -81,7 +84,8 @@ auto make_aggregator(
std::make_shared<aws::utils::IoServiceExecutor>(4),
std::make_shared<MockShardMap>(shard_map_down),
cb,
config);
config,
flush_stats);
}

} //namespace
Expand Down
5 changes: 4 additions & 1 deletion aws/kinesis/core/test/reducer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <aws/utils/utils.h>
#include <aws/utils/io_service_executor.h>
#include <aws/kinesis/core/test/test_utils.h>
#include <aws/utils/processing_statistics_logger.h>

namespace {

Expand All @@ -27,11 +28,13 @@ using Reducer =
using FlushCallback =
std::function<void (std::shared_ptr<aws::kinesis::core::KinesisRecord>)>;

aws::utils::flush_statistics_aggregator flush_stats("Test", "TestRecords", "TestRecords2");

std::shared_ptr<Reducer> make_reducer(size_t size_limit = 256 * 1024,
size_t count_limit = 1000,
FlushCallback cb = [](auto) {}) {
auto executor = std::make_shared<aws::utils::IoServiceExecutor>(8);
return std::make_shared<Reducer>(executor, cb, size_limit, count_limit);
return std::make_shared<Reducer>(executor, cb, size_limit, count_limit, flush_stats);
}

template <typename T>
Expand Down
Loading

0 comments on commit 55ca106

Please sign in to comment.