diff --git a/source/extensions/stat_sinks/common/statsd/statsd.cc b/source/extensions/stat_sinks/common/statsd/statsd.cc index 4f30a049443df..edba7ce99af04 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.cc +++ b/source/extensions/stat_sinks/common/statsd/statsd.cc @@ -38,8 +38,10 @@ void Writer::write(const std::string& message) { } UdpStatsdSink::UdpStatsdSink(ThreadLocal::SlotAllocator& tls, - Network::Address::InstanceConstSharedPtr address, const bool use_tag) - : tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag) { + Network::Address::InstanceConstSharedPtr address, const bool use_tag, + const std::string& prefix) + : tls_(tls.allocateSlot()), server_address_(std::move(address)), use_tag_(use_tag), + prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix) { tls_->set([this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return std::make_shared(this->server_address_); }); @@ -47,19 +49,19 @@ UdpStatsdSink::UdpStatsdSink(ThreadLocal::SlotAllocator& tls, void UdpStatsdSink::flushCounter(const Stats::Counter& counter, uint64_t delta) { const std::string message( - fmt::format("envoy.{}:{}|c{}", getName(counter), delta, buildTagStr(counter.tags()))); + fmt::format("{}.{}:{}|c{}", prefix_, getName(counter), delta, buildTagStr(counter.tags()))); tls_->getTyped().write(message); } void UdpStatsdSink::flushGauge(const Stats::Gauge& gauge, uint64_t value) { const std::string message( - fmt::format("envoy.{}:{}|g{}", getName(gauge), value, buildTagStr(gauge.tags()))); + fmt::format("{}.{}:{}|g{}", prefix_, getName(gauge), value, buildTagStr(gauge.tags()))); tls_->getTyped().write(message); } void UdpStatsdSink::onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) { // For statsd histograms are all timers. - const std::string message(fmt::format("envoy.{}:{}|ms{}", getName(histogram), + const std::string message(fmt::format("{}.{}:{}|ms{}", prefix_, getName(histogram), std::chrono::milliseconds(value).count(), buildTagStr(histogram.tags()))); tls_->getTyped().write(message); @@ -86,13 +88,12 @@ const std::string UdpStatsdSink::buildTagStr(const std::vector& tags return "|#" + StringUtil::join(tag_strings, ","); } -char TcpStatsdSink::STAT_PREFIX[] = "envoy."; - TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info, const std::string& cluster_name, ThreadLocal::SlotAllocator& tls, - Upstream::ClusterManager& cluster_manager, Stats::Scope& scope) - : tls_(tls.allocateSlot()), cluster_manager_(cluster_manager), - cx_overflow_stat_(scope.counter("statsd.cx_overflow")) { + Upstream::ClusterManager& cluster_manager, Stats::Scope& scope, + const std::string& prefix) + : prefix_(prefix.empty() ? Statsd::getDefaultPrefix() : prefix), tls_(tls.allocateSlot()), + cluster_manager_(cluster_manager), cx_overflow_stat_(scope.counter("statsd.cx_overflow")) { Config::Utility::checkClusterAndLocalInfo("tcp statsd", cluster_name, cluster_manager, local_info); @@ -124,8 +125,9 @@ void TcpStatsdSink::TlsSink::beginFlush(bool expect_empty_buffer) { void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value, char stat_type) { ASSERT(current_slice_mem_ != nullptr); - // 40 > 6 (prefix) + 4 (random chars) + 30 for number (bigger than it will ever be) - const uint32_t max_size = name.size() + 40; + // 36 > 1 ("." after prefix) + 1 (":" after name) + 4 (postfix chars, e.g., "|ms\n") + 30 for + // number (bigger than it will ever be) + const uint32_t max_size = name.size() + parent_.getPrefix().size() + 36; if (current_buffer_slice_.len_ - usedBuffer() < max_size) { endFlush(false); beginFlush(false); @@ -135,8 +137,9 @@ void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value // This written this way for maximum perf since with a large number of stats and at a high flush // rate this can become expensive. const char* snapped_current = current_slice_mem_; - memcpy(current_slice_mem_, STAT_PREFIX, sizeof(STAT_PREFIX) - 1); - current_slice_mem_ += sizeof(STAT_PREFIX) - 1; + memcpy(current_slice_mem_, parent_.getPrefix().c_str(), parent_.getPrefix().size()); + current_slice_mem_ += parent_.getPrefix().size(); + *current_slice_mem_++ = '.'; memcpy(current_slice_mem_, name.c_str(), name.size()); current_slice_mem_ += name.size(); *current_slice_mem_++ = ':'; @@ -178,7 +181,8 @@ void TcpStatsdSink::TlsSink::onTimespanComplete(const std::string& name, // Ultimately it would be nice to perf optimize this path also, but it's not very frequent. It's // also currently not possible that this interleaves with any counter/gauge flushing. ASSERT(current_slice_mem_ == nullptr); - Buffer::OwnedImpl buffer(fmt::format("envoy.{}:{}|ms\n", name, ms.count())); + Buffer::OwnedImpl buffer( + fmt::format("{}.{}:{}|ms\n", parent_.getPrefix().c_str(), name, ms.count())); write(buffer); } diff --git a/source/extensions/stat_sinks/common/statsd/statsd.h b/source/extensions/stat_sinks/common/statsd/statsd.h index 78f0a76fc9a7d..f891cec85bedb 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.h +++ b/source/extensions/stat_sinks/common/statsd/statsd.h @@ -7,6 +7,7 @@ #include "envoy/upstream/cluster_manager.h" #include "common/buffer/buffer_impl.h" +#include "common/common/macros.h" namespace Envoy { namespace Extensions { @@ -14,6 +15,8 @@ namespace StatSinks { namespace Common { namespace Statsd { +static const std::string& getDefaultPrefix() { CONSTRUCT_ON_FIRST_USE(std::string, "envoy"); } + /** * This is a simple UDP localhost writer for statsd messages. */ @@ -38,11 +41,12 @@ class Writer : public ThreadLocal::ThreadLocalObject { class UdpStatsdSink : public Stats::Sink { public: UdpStatsdSink(ThreadLocal::SlotAllocator& tls, Network::Address::InstanceConstSharedPtr address, - const bool use_tag); + const bool use_tag, const std::string& prefix = getDefaultPrefix()); // For testing. UdpStatsdSink(ThreadLocal::SlotAllocator& tls, const std::shared_ptr& writer, - const bool use_tag) - : tls_(tls.allocateSlot()), use_tag_(use_tag) { + const bool use_tag, const std::string& prefix = getDefaultPrefix()) + : tls_(tls.allocateSlot()), use_tag_(use_tag), + prefix_(prefix.empty() ? getDefaultPrefix() : prefix) { tls_->set( [writer](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return writer; }); } @@ -57,6 +61,7 @@ class UdpStatsdSink : public Stats::Sink { // Called in unit test to validate writer construction and address. int getFdForTests() { return tls_->getTyped().getFdForTests(); } bool getUseTagForTest() { return use_tag_; } + const std::string& getPrefix() { return prefix_; } private: const std::string getName(const Stats::Metric& metric); @@ -65,6 +70,8 @@ class UdpStatsdSink : public Stats::Sink { ThreadLocal::SlotPtr tls_; Network::Address::InstanceConstSharedPtr server_address_; const bool use_tag_; + // Prefix for all flushed stats. + const std::string prefix_; }; /** @@ -74,7 +81,7 @@ class TcpStatsdSink : public Stats::Sink { public: TcpStatsdSink(const LocalInfo::LocalInfo& local_info, const std::string& cluster_name, ThreadLocal::SlotAllocator& tls, Upstream::ClusterManager& cluster_manager, - Stats::Scope& scope); + Stats::Scope& scope, const std::string& prefix = getDefaultPrefix()); // Stats::Sink void beginFlush() override { tls_->getTyped().beginFlush(true); } @@ -95,6 +102,8 @@ class TcpStatsdSink : public Stats::Sink { std::chrono::milliseconds(value)); } + const std::string& getPrefix() { return prefix_; } + private: struct TlsSink : public ThreadLocal::ThreadLocalObject, public Network::ConnectionCallbacks { TlsSink(TcpStatsdSink& parent, Event::Dispatcher& dispatcher); @@ -130,7 +139,7 @@ class TcpStatsdSink : public Stats::Sink { static constexpr uint32_t FLUSH_SLICE_SIZE_BYTES = (1024 * 16); // Prefix for all flushed stats. - static char STAT_PREFIX[]; + const std::string prefix_; Upstream::ClusterInfoConstSharedPtr cluster_info_; ThreadLocal::SlotPtr tls_; diff --git a/source/extensions/stat_sinks/statsd/config.cc b/source/extensions/stat_sinks/statsd/config.cc index 5fa1ebbd51fe9..eb7fe0c3f496c 100644 --- a/source/extensions/stat_sinks/statsd/config.cc +++ b/source/extensions/stat_sinks/statsd/config.cc @@ -25,13 +25,13 @@ Stats::SinkPtr StatsdSinkFactory::createStatsSink(const Protobuf::Message& confi Network::Address::resolveProtoAddress(statsd_sink.address()); ENVOY_LOG(debug, "statsd UDP ip address: {}", address->asString()); return std::make_unique(server.threadLocal(), std::move(address), - false); + false, statsd_sink.prefix()); } case envoy::config::metrics::v2::StatsdSink::kTcpClusterName: ENVOY_LOG(debug, "statsd TCP cluster: {}", statsd_sink.tcp_cluster_name()); return std::make_unique( server.localInfo(), statsd_sink.tcp_cluster_name(), server.threadLocal(), - server.clusterManager(), server.stats()); + server.clusterManager(), server.stats(), statsd_sink.prefix()); default: // Verified by schema. NOT_REACHED; diff --git a/test/extensions/stats_sinks/statsd/config_test.cc b/test/extensions/stats_sinks/statsd/config_test.cc index e01101d261006..40ed862e2461f 100644 --- a/test/extensions/stats_sinks/statsd/config_test.cc +++ b/test/extensions/stats_sinks/statsd/config_test.cc @@ -45,6 +45,109 @@ TEST(StatsConfigTest, ValidTcpStatsd) { EXPECT_NE(dynamic_cast(sink.get()), nullptr); } +TEST(StatsConfigTest, UdpSinkDefaultPrefix) { + const std::string name = StatsSinkNames::get().STATSD; + auto defaultPrefix = Common::Statsd::getDefaultPrefix(); + + envoy::config::metrics::v2::StatsdSink sink_config; + envoy::api::v2::core::Address& address = *sink_config.mutable_address(); + envoy::api::v2::core::SocketAddress& socket_address = *address.mutable_socket_address(); + socket_address.set_protocol(envoy::api::v2::core::SocketAddress::UDP); + socket_address.set_address("127.0.0.1"); + socket_address.set_port_value(8125); + EXPECT_EQ(sink_config.prefix(), ""); + + Server::Configuration::StatsSinkFactory* factory = + Registry::FactoryRegistry::getFactory(name); + ASSERT_NE(factory, nullptr); + ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto(); + MessageUtil::jsonConvert(sink_config, *message); + + NiceMock server; + Stats::SinkPtr sink = factory->createStatsSink(*message, server); + ASSERT_NE(sink, nullptr); + + auto udp_sink = dynamic_cast(sink.get()); + ASSERT_NE(udp_sink, nullptr); + EXPECT_EQ(udp_sink->getPrefix(), defaultPrefix); +} + +TEST(StatsConfigTest, UdpSinkCustomPrefix) { + const std::string name = StatsSinkNames::get().STATSD; + const std::string customPrefix = "prefix.test"; + + envoy::config::metrics::v2::StatsdSink sink_config; + envoy::api::v2::core::Address& address = *sink_config.mutable_address(); + envoy::api::v2::core::SocketAddress& socket_address = *address.mutable_socket_address(); + socket_address.set_protocol(envoy::api::v2::core::SocketAddress::UDP); + socket_address.set_address("127.0.0.1"); + socket_address.set_port_value(8125); + sink_config.set_prefix(customPrefix); + EXPECT_NE(sink_config.prefix(), ""); + + Server::Configuration::StatsSinkFactory* factory = + Registry::FactoryRegistry::getFactory(name); + ASSERT_NE(factory, nullptr); + ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto(); + MessageUtil::jsonConvert(sink_config, *message); + + NiceMock server; + Stats::SinkPtr sink = factory->createStatsSink(*message, server); + ASSERT_NE(sink, nullptr); + + auto udp_sink = dynamic_cast(sink.get()); + ASSERT_NE(udp_sink, nullptr); + EXPECT_EQ(udp_sink->getPrefix(), customPrefix); +} + +TEST(StatsConfigTest, TcpSinkDefaultPrefix) { + const std::string name = StatsSinkNames::get().STATSD; + + envoy::config::metrics::v2::StatsdSink sink_config; + auto defaultPrefix = Common::Statsd::getDefaultPrefix(); + sink_config.set_tcp_cluster_name("fake_cluster"); + + Server::Configuration::StatsSinkFactory* factory = + Registry::FactoryRegistry::getFactory(name); + ASSERT_NE(factory, nullptr); + EXPECT_EQ(sink_config.prefix(), ""); + ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto(); + MessageUtil::jsonConvert(sink_config, *message); + + NiceMock server; + Stats::SinkPtr sink = factory->createStatsSink(*message, server); + ASSERT_NE(sink, nullptr); + + auto tcp_sink = dynamic_cast(sink.get()); + ASSERT_NE(tcp_sink, nullptr); + EXPECT_EQ(tcp_sink->getPrefix(), defaultPrefix); +} + +TEST(StatsConfigTest, TcpSinkCustomPrefix) { + const std::string name = StatsSinkNames::get().STATSD; + + envoy::config::metrics::v2::StatsdSink sink_config; + std::string prefix = "prefixTest"; + sink_config.set_tcp_cluster_name("fake_cluster"); + ASSERT_NE(sink_config.prefix(), prefix); + sink_config.set_prefix(prefix); + EXPECT_EQ(sink_config.prefix(), prefix); + Server::Configuration::StatsSinkFactory* factory = + Registry::FactoryRegistry::getFactory(name); + ASSERT_NE(factory, nullptr); + + ProtobufTypes::MessagePtr message = factory->createEmptyConfigProto(); + MessageUtil::jsonConvert(sink_config, *message); + + NiceMock server; + Stats::SinkPtr sink = factory->createStatsSink(*message, server); + ASSERT_NE(sink, nullptr); + + auto tcp_sink = dynamic_cast(sink.get()); + ASSERT_NE(tcp_sink, nullptr); + EXPECT_EQ(tcp_sink->getPrefix(), prefix); +} + class StatsConfigLoopbackTest : public testing::TestWithParam {}; INSTANTIATE_TEST_CASE_P(IpVersions, StatsConfigLoopbackTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),