Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/common/memory/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ envoy_cc_library(
"//include/envoy/event:dispatcher_interface",
"//include/envoy/server:overload_manager_interface",
"//include/envoy/stats:stats_interface",
"//source/common/stats:symbol_table_lib",
],
)
5 changes: 4 additions & 1 deletion source/common/memory/heap_shrinker.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common/memory/heap_shrinker.h"

#include "common/memory/utils.h"
#include "common/stats/symbol_table_impl.h"

#include "absl/strings/str_cat.h"

Expand All @@ -18,7 +19,9 @@ HeapShrinker::HeapShrinker(Event::Dispatcher& dispatcher, Server::OverloadManage
[this](Server::OverloadActionState state) {
active_ = (state == Server::OverloadActionState::Active);
})) {
shrink_counter_ = &stats.counter(absl::StrCat("overload.", action_name, ".shrink_count"));
Envoy::Stats::StatNameManagedStorage stat_name(
absl::StrCat("overload.", action_name, ".shrink_count"), stats.symbolTable());
shrink_counter_ = &stats.counterFromStatName(stat_name.statName());
timer_ = dispatcher.createTimer([this] {
shrinkHeap();
timer_->enableTimer(kTimerInterval);
Expand Down
23 changes: 12 additions & 11 deletions source/extensions/filters/http/dynamo/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ envoy_cc_library(
hdrs = ["dynamo_filter.h"],
deps = [
":dynamo_request_parser_lib",
":dynamo_utility_lib",
":dynamo_stats_lib",
"//include/envoy/http:filter_interface",
"//include/envoy/runtime:runtime_interface",
"//source/common/buffer:buffer_lib",
Expand All @@ -37,16 +37,6 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "dynamo_utility_lib",
srcs = ["dynamo_utility.cc"],
hdrs = ["dynamo_utility.h"],
deps = [
"//include/envoy/stats:stats_interface",
"//source/common/stats:stats_lib",
],
)

envoy_cc_library(
name = "config",
srcs = ["config.cc"],
Expand All @@ -59,3 +49,14 @@ envoy_cc_library(
"//source/extensions/filters/http/common:empty_http_filter_config_lib",
],
)

envoy_cc_library(
name = "dynamo_stats_lib",
srcs = ["dynamo_stats.cc"],
hdrs = ["dynamo_stats.h"],
deps = [
":dynamo_request_parser_lib",
"//include/envoy/stats:stats_interface",
"//source/common/stats:symbol_table_lib",
],
)
8 changes: 5 additions & 3 deletions source/extensions/filters/http/dynamo/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/registry/registry.h"

#include "extensions/filters/http/dynamo/dynamo_filter.h"
#include "extensions/filters/http/dynamo/dynamo_stats.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -14,9 +15,10 @@ namespace Dynamo {
Http::FilterFactoryCb
DynamoFilterConfig::createFilter(const std::string& stat_prefix,
Server::Configuration::FactoryContext& context) {
return [&context, stat_prefix](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(Http::StreamFilterSharedPtr{new Dynamo::DynamoFilter(
context.runtime(), stat_prefix, context.scope(), context.dispatcher().timeSource())});
auto stats = std::make_shared<DynamoStats>(context.scope(), stat_prefix);
return [&context, stat_prefix, stats](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<Dynamo::DynamoFilter>(
context.runtime(), stats, context.dispatcher().timeSource()));
};
}

Expand Down
69 changes: 32 additions & 37 deletions source/extensions/filters/http/dynamo/dynamo_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "common/json/json_loader.h"

#include "extensions/filters/http/dynamo/dynamo_request_parser.h"
#include "extensions/filters/http/dynamo/dynamo_utility.h"
#include "extensions/filters/http/dynamo/dynamo_stats.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -62,7 +62,7 @@ void DynamoFilter::onDecodeComplete(const Buffer::Instance& data) {
table_descriptor_ = RequestParser::parseTable(operation_, *json_body);
} catch (const Json::Exception& jsonEx) {
// Body parsing failed. This should not happen, just put a stat for that.
scope_.counter(fmt::format("{}invalid_req_body", stat_prefix_)).inc();
stats_->counter({stats_->invalid_req_body_}).inc();
}
}
}
Expand All @@ -89,7 +89,7 @@ void DynamoFilter::onEncodeComplete(const Buffer::Instance& data) {
}
} catch (const Json::Exception&) {
// Body parsing failed. This should not happen, just put a stat for that.
scope_.counter(fmt::format("{}invalid_resp_body", stat_prefix_)).inc();
stats_->counter({stats_->invalid_resp_body_}).inc();
}
}
}
Expand Down Expand Up @@ -158,15 +158,15 @@ void DynamoFilter::chargeBasicStats(uint64_t status) {
if (!operation_.empty()) {
chargeStatsPerEntity(operation_, "operation", status);
} else {
scope_.counter(fmt::format("{}operation_missing", stat_prefix_)).inc();
stats_->counter({stats_->operation_missing_}).inc();
}

if (!table_descriptor_.table_name.empty()) {
chargeStatsPerEntity(table_descriptor_.table_name, "table", status);
} else if (table_descriptor_.is_single_table) {
scope_.counter(fmt::format("{}table_missing", stat_prefix_)).inc();
stats_->counter({stats_->table_missing_}).inc();
} else {
scope_.counter(fmt::format("{}multiple_tables", stat_prefix_)).inc();
stats_->counter({stats_->multiple_tables_}).inc();
}
}

Expand All @@ -175,39 +175,33 @@ void DynamoFilter::chargeStatsPerEntity(const std::string& entity, const std::st
std::chrono::milliseconds latency = std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.monotonicTime() - start_decode_);

std::string group_string =
Http::CodeUtility::groupStringForResponseCode(static_cast<Http::Code>(status));
size_t group_index = DynamoStats::groupIndex(status);
const Stats::StatName entity_type_name = stats_->getStatName(entity_type);
const Stats::StatName entity_name = stats_->getStatName(entity);
const Stats::StatName total_name =
stats_->getStatName(absl::StrCat("upstream_rq_total_", status));
const Stats::StatName time_name = stats_->getStatName(absl::StrCat("upstream_rq_time_", status));

scope_.counter(fmt::format("{}{}.{}.upstream_rq_total", stat_prefix_, entity_type, entity)).inc();
scope_
.counter(fmt::format("{}{}.{}.upstream_rq_total_{}", stat_prefix_, entity_type, entity,
group_string))
.inc();
scope_
.counter(fmt::format("{}{}.{}.upstream_rq_total_{}", stat_prefix_, entity_type, entity,
std::to_string(status)))
.inc();
stats_->counter({entity_type_name, entity_name, stats_->upstream_rq_total_}).inc();
const Stats::StatName total_group = stats_->upstream_rq_total_groups_[group_index];
stats_->counter({entity_type_name, entity_name, total_group}).inc();
stats_->counter({entity_type_name, entity_name, total_name}).inc();

scope_.histogram(fmt::format("{}{}.{}.upstream_rq_time", stat_prefix_, entity_type, entity))
.recordValue(latency.count());
scope_
.histogram(fmt::format("{}{}.{}.upstream_rq_time_{}", stat_prefix_, entity_type, entity,
group_string))
.recordValue(latency.count());
scope_
.histogram(fmt::format("{}{}.{}.upstream_rq_time_{}", stat_prefix_, entity_type, entity,
std::to_string(status)))
stats_->histogram({entity_type_name, entity_name, stats_->upstream_rq_time_})
.recordValue(latency.count());
const Stats::StatName time_group = stats_->upstream_rq_time_groups_[group_index];
stats_->histogram({entity_type_name, entity_name, time_group}).recordValue(latency.count());
stats_->histogram({entity_type_name, entity_name, time_name}).recordValue(latency.count());
}

void DynamoFilter::chargeUnProcessedKeysStats(const Json::Object& json_body) {
// The unprocessed keys block contains a list of tables and keys for that table that did not
// complete apart of the batch operation. Only the table names will be logged for errors.
std::vector<std::string> unprocessed_tables = RequestParser::parseBatchUnProcessedKeys(json_body);
for (const std::string& unprocessed_table : unprocessed_tables) {
scope_
.counter(
fmt::format("{}error.{}.BatchFailureUnprocessedKeys", stat_prefix_, unprocessed_table))
stats_
->counter({stats_->error_, stats_->getStatName(unprocessed_table),
stats_->batch_failure_unprocessed_keys_})
.inc();
}
}
Expand All @@ -217,15 +211,15 @@ void DynamoFilter::chargeFailureSpecificStats(const Json::Object& json_body) {

if (!error_type.empty()) {
if (table_descriptor_.table_name.empty()) {
scope_.counter(fmt::format("{}error.no_table.{}", stat_prefix_, error_type)).inc();
stats_->counter({stats_->error_, stats_->no_table_, stats_->getStatName(error_type)}).inc();
} else {
scope_
.counter(
fmt::format("{}error.{}.{}", stat_prefix_, table_descriptor_.table_name, error_type))
stats_
->counter({stats_->error_, stats_->getStatName(table_descriptor_.table_name),
stats_->getStatName(error_type)})
.inc();
}
} else {
scope_.counter(fmt::format("{}empty_response_body", stat_prefix_)).inc();
stats_->counter({stats_->empty_response_body_}).inc();
}
}

Expand All @@ -237,9 +231,10 @@ void DynamoFilter::chargeTablePartitionIdStats(const Json::Object& json_body) {
std::vector<RequestParser::PartitionDescriptor> partitions =
RequestParser::parsePartitions(json_body);
for (const RequestParser::PartitionDescriptor& partition : partitions) {
std::string scope_string = Utility::buildPartitionStatString(
stat_prefix_, table_descriptor_.table_name, operation_, partition.partition_id_);
scope_.counter(scope_string).add(partition.capacity_);
stats_
->buildPartitionStatCounter(table_descriptor_.table_name, operation_,
partition.partition_id_)
.add(partition.capacity_);
}
}

Expand Down
10 changes: 4 additions & 6 deletions source/extensions/filters/http/dynamo/dynamo_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/json/json_loader.h"

#include "extensions/filters/http/dynamo/dynamo_request_parser.h"
#include "extensions/filters/http/dynamo/dynamo_stats.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -24,10 +25,8 @@ namespace Dynamo {
*/
class DynamoFilter : public Http::StreamFilter {
public:
DynamoFilter(Runtime::Loader& runtime, const std::string& stat_prefix, Stats::Scope& scope,
TimeSource& time_system)
: runtime_(runtime), stat_prefix_(stat_prefix + "dynamodb."), scope_(scope),
time_source_(time_system) {
DynamoFilter(Runtime::Loader& runtime, const DynamoStatsSharedPtr& stats, TimeSource& time_source)
: runtime_(runtime), stats_(stats), time_source_(time_source) {
enabled_ = runtime_.snapshot().featureEnabled("dynamodb.filter_enabled", 100);
}

Expand Down Expand Up @@ -68,8 +67,7 @@ class DynamoFilter : public Http::StreamFilter {
void chargeTablePartitionIdStats(const Json::Object& json_body);

Runtime::Loader& runtime_;
std::string stat_prefix_;
Stats::Scope& scope_;
const DynamoStatsSharedPtr stats_;

bool enabled_{};
std::string operation_{};
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/filters/http/dynamo/dynamo_request_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,24 @@ RequestParser::parsePartitions(const Json::Object& json_data) {
return partition_descriptors;
}

void RequestParser::forEachStatString(const StringFn& fn) {
for (const std::string& str : SINGLE_TABLE_OPERATIONS) {
fn(str);
}
for (const std::string& str : SUPPORTED_ERROR_TYPES) {
fn(str);
}
for (const std::string& str : BATCH_OPERATIONS) {
fn(str);
}
for (const std::string& str : TRANSACT_OPERATIONS) {
fn(str);
}
for (const std::string& str : TRANSACT_ITEM_OPERATIONS) {
fn(str);
}
}

} // namespace Dynamo
} // namespace HttpFilters
} // namespace Extensions
Expand Down
11 changes: 11 additions & 0 deletions source/extensions/filters/http/dynamo/dynamo_request_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ class RequestParser {
*/
static std::vector<PartitionDescriptor> parsePartitions(const Json::Object& json_data);

using StringFn = std::function<void(const std::string&)>;

/**
* Calls a function for every string that is likely to be included as a token
* in a stat. This is not functionally necessary, but can reduce potentially
* contented access to create entries in the symbol table in the hot path.
*
* @param fn the function to call for every potential stat name.
*/
static void forEachStatString(const StringFn& fn);

private:
static const Http::LowerCaseString X_AMZ_TARGET;
static const std::vector<std::string> SINGLE_TABLE_OPERATIONS;
Expand Down
Loading