Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions source/extensions/stat_sinks/common/statsd/statsd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,30 @@ void Writer::write(const std::string& message) {
}

UdpStatsdSink::UdpStatsdSink(ThreadLocal::SlotAllocator& tls,
Network::Address::InstanceConstSharedPtr address, const bool use_tag)
: tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag) {
Network::Address::InstanceConstSharedPtr address, const bool use_tag,
const std::string& prefix)
: tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag),
prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix) {
tls_->set([this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<Writer>(this->server_address_);
});
}

void UdpStatsdSink::flushCounter(const Stats::Counter& counter, uint64_t delta) {
const std::string message(
fmt::format("envoy.{}:{}|c{}", getName(counter), delta, buildTagStr(counter.tags())));
fmt::format("{}.{}:{}|c{}", prefix_, getName(counter), delta, buildTagStr(counter.tags())));
tls_->getTyped<Writer>().write(message);
}

void UdpStatsdSink::flushGauge(const Stats::Gauge& gauge, uint64_t value) {
const std::string message(
fmt::format("envoy.{}:{}|g{}", getName(gauge), value, buildTagStr(gauge.tags())));
fmt::format("{}.{}:{}|g{}", prefix_, getName(gauge), value, buildTagStr(gauge.tags())));
tls_->getTyped<Writer>().write(message);
}

void UdpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) {
// For statsd histograms are all timers.
const std::string message(fmt::format("envoy.{}:{}|ms{}", getName(histogram),
const std::string message(fmt::format("{}.{}:{}|ms{}", prefix_, getName(histogram),
std::chrono::milliseconds(value).count(),
buildTagStr(histogram.tags())));
tls_->getTyped<Writer>().write(message);
Expand All @@ -86,13 +88,12 @@ const std::string UdpStatsdSink::buildTagStr(const std::vector<Stats::Tag>& tags
return "|#" + StringUtil::join(tag_strings, ",");
}

char TcpStatsdSink::STAT_PREFIX[] = "envoy.";

TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info,
const std::string& cluster_name, ThreadLocal::SlotAllocator& tls,
Upstream::ClusterManager& cluster_manager, Stats::Scope& scope)
: tls_(tls.allocateSlot()), cluster_manager_(cluster_manager),
cx_overflow_stat_(scope.counter("statsd.cx_overflow")) {
Upstream::ClusterManager& cluster_manager, Stats::Scope& scope,
const std::string& prefix)
: prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix), tls_(tls.allocateSlot()),
cluster_manager_(cluster_manager), cx_overflow_stat_(scope.counter("statsd.cx_overflow")) {

Config::Utility::checkClusterAndLocalInfo("tcp statsd", cluster_name, cluster_manager,
local_info);
Expand Down Expand Up @@ -124,8 +125,9 @@ void TcpStatsdSink::TlsSink::beginFlush(bool expect_empty_buffer) {

void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value, char stat_type) {
ASSERT(current_slice_mem_ != nullptr);
// 40 > 6 (prefix) + 4 (random chars) + 30 for number (bigger than it will ever be)
const uint32_t max_size = name.size() + 40;
// 36 > 1 ("." after prefix) + 1 (":" after name) + 4 (postfix chars, e.g., "|ms\n") + 30 for
// number (bigger than it will ever be)
const uint32_t max_size = name.size() + parent_.getPrefix().size() + 36;
if (current_buffer_slice_.len_ - usedBuffer() < max_size) {
endFlush(false);
beginFlush(false);
Expand All @@ -135,8 +137,9 @@ void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value
// This written this way for maximum perf since with a large number of stats and at a high flush
// rate this can become expensive.
const char* snapped_current = current_slice_mem_;
memcpy(current_slice_mem_, STAT_PREFIX, sizeof(STAT_PREFIX) - 1);
current_slice_mem_ += sizeof(STAT_PREFIX) - 1;
memcpy(current_slice_mem_, parent_.getPrefix().c_str(), parent_.getPrefix().size());
current_slice_mem_ += parent_.getPrefix().size();
*current_slice_mem_++ = '.';
memcpy(current_slice_mem_, name.c_str(), name.size());
current_slice_mem_ += name.size();
*current_slice_mem_++ = ':';
Expand Down Expand Up @@ -178,7 +181,8 @@ void TcpStatsdSink::TlsSink::onTimespanComplete(const std::string& name,
// Ultimately it would be nice to perf optimize this path also, but it's not very frequent. It's
// also currently not possible that this interleaves with any counter/gauge flushing.
ASSERT(current_slice_mem_ == nullptr);
Buffer::OwnedImpl buffer(fmt::format("envoy.{}:{}|ms\n", name, ms.count()));
Buffer::OwnedImpl buffer(
fmt::format("{}.{}:{}|ms\n", parent_.getPrefix().c_str(), name, ms.count()));
write(buffer);
}

Expand Down
19 changes: 14 additions & 5 deletions source/extensions/stat_sinks/common/statsd/statsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
#include "envoy/upstream/cluster_manager.h"

#include "common/buffer/buffer_impl.h"
#include "common/common/macros.h"

namespace Envoy {
namespace Extensions {
namespace StatSinks {
namespace Common {
namespace Statsd {

static const std::string& getDefaultPrefix() { CONSTRUCT_ON_FIRST_USE(std::string, "envoy"); }

/**
* This is a simple UDP localhost writer for statsd messages.
*/
Expand All @@ -38,11 +41,12 @@ class Writer : public ThreadLocal::ThreadLocalObject {
class UdpStatsdSink : public Stats::Sink {
public:
UdpStatsdSink(ThreadLocal::SlotAllocator& tls, Network::Address::InstanceConstSharedPtr address,
const bool use_tag);
const bool use_tag, const std::string& prefix = getDefaultPrefix());
// For testing.
UdpStatsdSink(ThreadLocal::SlotAllocator& tls, const std::shared_ptr<Writer>& writer,
const bool use_tag)
: tls_(tls.allocateSlot()), use_tag_(use_tag) {
const bool use_tag, const std::string& prefix = getDefaultPrefix())
: tls_(tls.allocateSlot()), use_tag_(use_tag),
prefix_(prefix.empty() ? getDefaultPrefix() : prefix) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we want to disallow an empty prefix since the user already has the option to not provide the argument and get the default? (same for TCP)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An empty prefix doesn't make sense, imo. If an empty prefix is passed in, we just use the default instead. Also, if a prefix isn't specified statsd_sink.prefix() in createStatsSink will be an empty string. We can avoid multiple checks for an empty string by doing it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM - I didn't realize this was coming directly from the proto, so there is no way to specify a default.

tls_->set(
[writer](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return writer; });
}
Expand All @@ -57,6 +61,7 @@ class UdpStatsdSink : public Stats::Sink {
// Called in unit test to validate writer construction and address.
int getFdForTests() { return tls_->getTyped<Writer>().getFdForTests(); }
bool getUseTagForTest() { return use_tag_; }
const std::string& getPrefix() { return prefix_; }

private:
const std::string getName(const Stats::Metric& metric);
Expand All @@ -65,6 +70,8 @@ class UdpStatsdSink : public Stats::Sink {
ThreadLocal::SlotPtr tls_;
Network::Address::InstanceConstSharedPtr server_address_;
const bool use_tag_;
// Prefix for all flushed stats.
const std::string prefix_;
};

/**
Expand All @@ -74,7 +81,7 @@ class TcpStatsdSink : public Stats::Sink {
public:
TcpStatsdSink(const LocalInfo::LocalInfo& local_info, const std::string& cluster_name,
ThreadLocal::SlotAllocator& tls, Upstream::ClusterManager& cluster_manager,
Stats::Scope& scope);
Stats::Scope& scope, const std::string& prefix = getDefaultPrefix());

// Stats::Sink
void beginFlush() override { tls_->getTyped<TlsSink>().beginFlush(true); }
Expand All @@ -95,6 +102,8 @@ class TcpStatsdSink : public Stats::Sink {
std::chrono::milliseconds(value));
}

const std::string& getPrefix() { return prefix_; }

private:
struct TlsSink : public ThreadLocal::ThreadLocalObject, public Network::ConnectionCallbacks {
TlsSink(TcpStatsdSink& parent, Event::Dispatcher& dispatcher);
Expand Down Expand Up @@ -130,7 +139,7 @@ class TcpStatsdSink : public Stats::Sink {
static constexpr uint32_t FLUSH_SLICE_SIZE_BYTES = (1024 * 16);

// Prefix for all flushed stats.
static char STAT_PREFIX[];
const std::string prefix_;

Upstream::ClusterInfoConstSharedPtr cluster_info_;
ThreadLocal::SlotPtr tls_;
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/stat_sinks/statsd/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ Stats::SinkPtr StatsdSinkFactory::createStatsSink(const Protobuf::Message& confi
Network::Address::resolveProtoAddress(statsd_sink.address());
ENVOY_LOG(debug, "statsd UDP ip address: {}", address->asString());
return std::make_unique<Common::Statsd::UdpStatsdSink>(server.threadLocal(), std::move(address),
false);
false, statsd_sink.prefix());
}
case envoy::config::metrics::v2::StatsdSink::kTcpClusterName:
ENVOY_LOG(debug, "statsd TCP cluster: {}", statsd_sink.tcp_cluster_name());
return std::make_unique<Common::Statsd::TcpStatsdSink>(
server.localInfo(), statsd_sink.tcp_cluster_name(), server.threadLocal(),
server.clusterManager(), server.stats());
server.clusterManager(), server.stats(), statsd_sink.prefix());
default:
// Verified by schema.
NOT_REACHED;
Expand Down
103 changes: 103 additions & 0 deletions test/extensions/stats_sinks/statsd/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,109 @@ TEST(StatsConfigTest, ValidTcpStatsd) {
EXPECT_NE(dynamic_cast<Common::Statsd::TcpStatsdSink*>(sink.get()), nullptr);
}

TEST(StatsConfigTest, UdpSinkDefaultPrefix) {
const std::string name = StatsSinkNames::get().STATSD;
auto defaultPrefix = Common::Statsd::getDefaultPrefix();

envoy::config::metrics::v2::StatsdSink sink_config;
envoy::api::v2::core::Address& address = *sink_config.mutable_address();
envoy::api::v2::core::SocketAddress& socket_address = *address.mutable_socket_address();
socket_address.set_protocol(envoy::api::v2::core::SocketAddress::UDP);
socket_address.set_address("127.0.0.1");
socket_address.set_port_value(8125);
EXPECT_EQ(sink_config.prefix(), "");

Server::Configuration::StatsSinkFactory* factory =
Registry::FactoryRegistry<Server::Configuration::StatsSinkFactory>::getFactory(name);
ASSERT_NE(factory, nullptr);
ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto();
MessageUtil::jsonConvert(sink_config, *message);

NiceMock<Server::MockInstance> server;
Stats::SinkPtr sink = factory->createStatsSink(*message, server);
ASSERT_NE(sink, nullptr);

auto udp_sink = dynamic_cast<Common::Statsd::UdpStatsdSink*>(sink.get());
ASSERT_NE(udp_sink, nullptr);
EXPECT_EQ(udp_sink->getPrefix(), defaultPrefix);
}

TEST(StatsConfigTest, UdpSinkCustomPrefix) {
const std::string name = StatsSinkNames::get().STATSD;
const std::string customPrefix = "prefix.test";

envoy::config::metrics::v2::StatsdSink sink_config;
envoy::api::v2::core::Address& address = *sink_config.mutable_address();
envoy::api::v2::core::SocketAddress& socket_address = *address.mutable_socket_address();
socket_address.set_protocol(envoy::api::v2::core::SocketAddress::UDP);
socket_address.set_address("127.0.0.1");
socket_address.set_port_value(8125);
sink_config.set_prefix(customPrefix);
EXPECT_NE(sink_config.prefix(), "");

Server::Configuration::StatsSinkFactory* factory =
Registry::FactoryRegistry<Server::Configuration::StatsSinkFactory>::getFactory(name);
ASSERT_NE(factory, nullptr);
ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto();
MessageUtil::jsonConvert(sink_config, *message);

NiceMock<Server::MockInstance> server;
Stats::SinkPtr sink = factory->createStatsSink(*message, server);
ASSERT_NE(sink, nullptr);

auto udp_sink = dynamic_cast<Common::Statsd::UdpStatsdSink*>(sink.get());
ASSERT_NE(udp_sink, nullptr);
EXPECT_EQ(udp_sink->getPrefix(), customPrefix);
}

TEST(StatsConfigTest, TcpSinkDefaultPrefix) {
const std::string name = StatsSinkNames::get().STATSD;

envoy::config::metrics::v2::StatsdSink sink_config;
auto defaultPrefix = Common::Statsd::getDefaultPrefix();
sink_config.set_tcp_cluster_name("fake_cluster");

Server::Configuration::StatsSinkFactory* factory =
Registry::FactoryRegistry<Server::Configuration::StatsSinkFactory>::getFactory(name);
ASSERT_NE(factory, nullptr);
EXPECT_EQ(sink_config.prefix(), "");
ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto();
MessageUtil::jsonConvert(sink_config, *message);

NiceMock<Server::MockInstance> server;
Stats::SinkPtr sink = factory->createStatsSink(*message, server);
ASSERT_NE(sink, nullptr);

auto tcp_sink = dynamic_cast<Common::Statsd::TcpStatsdSink*>(sink.get());
ASSERT_NE(tcp_sink, nullptr);
EXPECT_EQ(tcp_sink->getPrefix(), defaultPrefix);
}

TEST(StatsConfigTest, TcpSinkCustomPrefix) {
const std::string name = StatsSinkNames::get().STATSD;

envoy::config::metrics::v2::StatsdSink sink_config;
std::string prefix = "prefixTest";
sink_config.set_tcp_cluster_name("fake_cluster");
ASSERT_NE(sink_config.prefix(), prefix);
sink_config.set_prefix(prefix);
EXPECT_EQ(sink_config.prefix(), prefix);
Server::Configuration::StatsSinkFactory* factory =
Registry::FactoryRegistry<Server::Configuration::StatsSinkFactory>::getFactory(name);
ASSERT_NE(factory, nullptr);

ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto();
MessageUtil::jsonConvert(sink_config, *message);

NiceMock<Server::MockInstance> server;
Stats::SinkPtr sink = factory->createStatsSink(*message, server);
ASSERT_NE(sink, nullptr);

auto tcp_sink = dynamic_cast<Common::Statsd::TcpStatsdSink*>(sink.get());
ASSERT_NE(tcp_sink, nullptr);
EXPECT_EQ(tcp_sink->getPrefix(), prefix);
}

class StatsConfigLoopbackTest : public testing::TestWithParam<Network::Address::IpVersion> {};
INSTANTIATE_TEST_CASE_P(IpVersions, StatsConfigLoopbackTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
Expand Down