Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report Statistics About Sending Data To Kinesis #102

Merged
merged 11 commits into from
May 16, 2017
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ set(SOURCE_FILES
aws/utils/time_sensitive_queue.h
aws/utils/token_bucket.h
aws/utils/utils.cc
aws/utils/utils.h)
aws/utils/utils.h
aws/utils/processing_statistics_logger.cc
aws/utils/processing_statistics_logger.h)



Expand Down
8 changes: 7 additions & 1 deletion aws/kinesis/core/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <aws/kinesis/core/configuration.h>
#include <aws/utils/concurrent_hash_map.h>
#include <aws/utils/executor.h>
#include <aws/utils/processing_statistics_logger.h>

namespace aws {
namespace kinesis {
Expand All @@ -41,12 +42,14 @@ class Aggregator : boost::noncopyable {
const std::shared_ptr<ShardMap> shard_map,
const DeadlineCallback& deadline_callback,
const std::shared_ptr<aws::kinesis::core::Configuration>& config,
aws::utils::flush_statistics_aggregator& flush_stats,
const std::shared_ptr<aws::metrics::MetricsManager>& metrics_manager =
std::make_shared<aws::metrics::NullMetricsManager>())
: executor_(executor),
shard_map_(shard_map),
deadline_callback_(deadline_callback),
config_(config),
flush_stats_(flush_stats),
metrics_manager_(metrics_manager),
reducers_([this](auto) { return this->make_reducer(); }) {}

Expand Down Expand Up @@ -79,14 +82,17 @@ class Aggregator : boost::noncopyable {
executor_,
deadline_callback_,
config_->aggregation_max_size(),
config_->aggregation_max_count());
config_->aggregation_max_count(),
flush_stats_
);
}

std::shared_ptr<aws::utils::Executor> executor_;
std::shared_ptr<ShardMap> shard_map_;
DeadlineCallback deadline_callback_;
std::shared_ptr<aws::kinesis::core::Configuration> config_;
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager_;
aws::utils::flush_statistics_aggregator& flush_stats_;
ReducerMap reducers_;
};

Expand Down
3 changes: 3 additions & 0 deletions aws/kinesis/core/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <aws/kinesis/core/reducer.h>
#include <aws/kinesis/core/configuration.h>
#include <aws/utils/concurrent_hash_map.h>
#include <aws/utils/processing_statistics_logger.h>

namespace aws {
namespace kinesis {
Expand All @@ -32,13 +33,15 @@ class Collector : boost::noncopyable {
const std::shared_ptr<aws::utils::Executor>& executor,
const FlushCallback& flush_callback,
const std::shared_ptr<aws::kinesis::core::Configuration>& config,
aws::utils::flush_statistics_aggregator& flush_stats,
const std::shared_ptr<aws::metrics::MetricsManager>& metrics_manager =
std::make_shared<aws::metrics::NullMetricsManager>())
: flush_callback_(flush_callback),
reducer_(executor,
[this](auto prr) { this->handle_flush(std::move(prr)); },
config->collection_max_size(),
config->collection_max_count(),
flush_stats,
[this](auto kr) { return this->should_flush(kr); }),
buffered_data_([](auto) { return new std::atomic<size_t>(0); }) {}

Expand Down
32 changes: 23 additions & 9 deletions aws/kinesis/core/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#define AWS_KINESIS_CORE_PIPELINE_H_

#include <boost/format.hpp>
#include <iomanip>

#include <aws/kinesis/core/aggregator.h>
#include <aws/kinesis/core/collector.h>
Expand All @@ -25,6 +26,7 @@
#include <aws/kinesis/core/retrier.h>
#include <aws/kinesis/KinesisClient.h>
#include <aws/metrics/metrics_manager.h>
#include <aws/utils/processing_statistics_logger.h>

namespace aws {
namespace kinesis {
Expand All @@ -46,6 +48,7 @@ class Pipeline : boost::noncopyable {
: stream_(std::move(stream)),
region_(std::move(region)),
config_(std::move(config)),
stats_logger_(stream_, config_->record_max_buffered_time()),
executor_(std::move(executor)),
kinesis_client_(std::move(kinesis_client)),
metrics_manager_(std::move(metrics_manager)),
Expand All @@ -58,11 +61,12 @@ class Pipeline : boost::noncopyable {
metrics_manager_)),
aggregator_(
std::make_shared<Aggregator>(
executor_,
shard_map_,
[this](auto kr) { this->limiter_put(kr); },
config_,
metrics_manager_)),
executor_,
shard_map_,
[this](auto kr) { this->limiter_put(kr); },
config_,
stats_logger_.stage1(),
metrics_manager_)),
limiter_(
std::make_shared<Limiter>(
executor_,
Expand All @@ -71,10 +75,11 @@ class Pipeline : boost::noncopyable {
config_)),
collector_(
std::make_shared<Collector>(
executor_,
[this](auto prr) { this->send_put_records_request(prr); },
config_,
metrics_manager_)),
executor_,
[this](auto prr) { this->send_put_records_request(prr); },
config_,
stats_logger_.stage2(),
metrics_manager_)),
retrier_(
std::make_shared<Retrier>(
config_,
Expand Down Expand Up @@ -111,6 +116,7 @@ class Pipeline : boost::noncopyable {
}

private:

void aggregator_put(const std::shared_ptr<UserRecord>& ur) {
auto kr = aggregator_->put(ur);
if (kr) {
Expand Down Expand Up @@ -148,6 +154,7 @@ class Pipeline : boost::noncopyable {
sdk_ctx));
ctx->set_end(std::chrono::steady_clock::now());
ctx->set_outcome(outcome);
this->request_completed(ctx);
// At the time of writing, the SDK can spawn a large number of
// threads in order to achieve request parallelism. These threads will
// later put items into the IPC manager after they finish the logic in
Expand All @@ -161,6 +168,10 @@ class Pipeline : boost::noncopyable {
prc);
}

void request_completed(std::shared_ptr<PutRecordsContext> context) {
stats_logger_.request_complete(context);
}

void retrier_put_kr(const std::shared_ptr<KinesisRecord>& kr) {
executor_->submit([=] {
retrier_->put(kr,
Expand All @@ -172,6 +183,7 @@ class Pipeline : boost::noncopyable {
std::string stream_;
std::string region_;
std::shared_ptr<Configuration> config_;
aws::utils::processing_statistics_logger stats_logger_;
std::shared_ptr<aws::utils::Executor> executor_;
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client_;
std::shared_ptr<aws::metrics::MetricsManager> metrics_manager_;
Expand All @@ -185,6 +197,8 @@ class Pipeline : boost::noncopyable {

std::shared_ptr<aws::metrics::Metric> user_records_rcvd_metric_;
std::atomic<uint64_t> outstanding_user_records_;


};

} //namespace core
Expand Down
32 changes: 24 additions & 8 deletions aws/kinesis/core/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include <mutex>

#include <aws/utils/logging.h>

#include <aws/utils/executor.h>
#include <aws/utils/processing_statistics_logger.h>
#include <aws/mutex.h>

namespace aws {
Expand Down Expand Up @@ -51,17 +51,20 @@ template <typename T, typename U>
class Reducer : boost::noncopyable {
public:
using FlushPredicate = std::function<bool (const std::shared_ptr<T>&)>;
using FlushReason = aws::utils::flush_statistics_context;

Reducer(
const std::shared_ptr<aws::utils::Executor>& executor,
const std::function<void (std::shared_ptr<U>)>& flush_callback,
size_t size_limit,
size_t count_limit,
aws::utils::flush_statistics_aggregator& flush_stats,
FlushPredicate flush_predicate = [](auto) { return false; })
: executor_(executor),
flush_callback_(flush_callback),
size_limit_(size_limit),
count_limit_(count_limit),
flush_stats_(flush_stats),
flush_predicate_(flush_predicate),
container_(std::make_shared<U>()),
scheduled_callback_(
Expand All @@ -79,11 +82,13 @@ class Reducer : boost::noncopyable {
auto size = container_->size();
auto estimated_size = container_->estimated_size();
auto flush_predicate_result = flush_predicate_(input);
if (size >= count_limit_ ||
estimated_size >= size_limit_ ||
flush_predicate_result) {

auto output = flush(lock);
FlushReason flush_reason;
flush_reason.record_count(size >= count_limit_).data_size(estimated_size >= size_limit_)
.predicate_match(flush_predicate_result);

if (flush_reason.flush_required()) {
auto output = flush(lock, flush_reason);
if (output && output->size() > 0) {
return output;
}
Expand All @@ -96,7 +101,9 @@ class Reducer : boost::noncopyable {

// Manually trigger a flush, as though a deadline has been reached
void flush() {
deadline_reached();
FlushReason flush_reason;
flush_reason.manual(true);
trigger_flush(flush_reason);
}

// Records in the process of being flushed won't be counted
Expand All @@ -116,7 +123,8 @@ class Reducer : boost::noncopyable {
using Mutex = aws::mutex;
using Lock = aws::unique_lock<Mutex>;

std::shared_ptr<U> flush(Lock& lock) {

std::shared_ptr<U> flush(Lock &lock, FlushReason &flush_reason) {
if (!lock) {
lock.lock();
}
Expand Down Expand Up @@ -154,6 +162,7 @@ class Reducer : boost::noncopyable {
}

set_deadline();
flush_stats_.merge(flush_reason, flush_container->size());

return flush_container;
}
Expand All @@ -171,8 +180,14 @@ class Reducer : boost::noncopyable {
}

void deadline_reached() {
FlushReason flush_reason;
flush_reason.timed(true);
trigger_flush(flush_reason);
}

void trigger_flush(FlushReason &reason) {
Lock lock(lock_);
auto r = flush(lock);
auto r = flush(lock, reason);
lock.unlock();
if (r && r->size() > 0) {
flush_callback_(r);
Expand All @@ -187,6 +202,7 @@ class Reducer : boost::noncopyable {
Mutex lock_;
std::shared_ptr<U> container_;
std::shared_ptr<aws::utils::ScheduledCallback> scheduled_callback_;
aws::utils::flush_statistics_aggregator& flush_stats_;
};

} //namespace core
Expand Down
6 changes: 5 additions & 1 deletion aws/kinesis/core/test/aggregator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <aws/kinesis/core/test/test_utils.h>
#include <aws/utils/io_service_executor.h>
#include <aws/utils/utils.h>
#include <aws/utils/processing_statistics_logger.h>

namespace {

Expand Down Expand Up @@ -67,6 +68,8 @@ class MockShardMap : public aws::kinesis::core::ShardMap {
std::vector<boost::multiprecision::uint128_t> limits_;
};

aws::utils::flush_statistics_aggregator flush_stats("Test", "InRecords", "OutRecords");

auto make_aggregator(
bool shard_map_down = false,
FlushCallback cb = [](auto) {},
Expand All @@ -81,7 +84,8 @@ auto make_aggregator(
std::make_shared<aws::utils::IoServiceExecutor>(4),
std::make_shared<MockShardMap>(shard_map_down),
cb,
config);
config,
flush_stats);
}

} //namespace
Expand Down
5 changes: 4 additions & 1 deletion aws/kinesis/core/test/reducer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <aws/utils/utils.h>
#include <aws/utils/io_service_executor.h>
#include <aws/kinesis/core/test/test_utils.h>
#include <aws/utils/processing_statistics_logger.h>

namespace {

Expand All @@ -27,11 +28,13 @@ using Reducer =
using FlushCallback =
std::function<void (std::shared_ptr<aws::kinesis::core::KinesisRecord>)>;

aws::utils::flush_statistics_aggregator flush_stats("Test", "TestRecords", "TestRecords2");

std::shared_ptr<Reducer> make_reducer(size_t size_limit = 256 * 1024,
size_t count_limit = 1000,
FlushCallback cb = [](auto) {}) {
auto executor = std::make_shared<aws::utils::IoServiceExecutor>(8);
return std::make_shared<Reducer>(executor, cb, size_limit, count_limit);
return std::make_shared<Reducer>(executor, cb, size_limit, count_limit, flush_stats);
}

template <typename T>
Expand Down
Loading