diff --git a/api/envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.proto b/api/envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.proto index cf93ab04ed8fd..c86f82d3e4c6f 100644 --- a/api/envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.proto +++ b/api/envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.proto @@ -16,7 +16,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // [#extension: envoy.tracers.opentelemetry.samplers.dynatrace] message DynatraceSamplerConfig { - string tenant_id = 1; + string tenant = 1; string cluster_id = 2; diff --git a/source/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler.cc b/source/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler.cc index 8d06116cceb3f..d1965b7812de3 100644 --- a/source/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler.cc +++ b/source/extensions/tracers/opentelemetry/samplers/always_on/always_on_sampler.cc @@ -18,7 +18,7 @@ SamplingResult AlwaysOnSampler::shouldSample(const absl::optional p OptRef /*trace_context*/, const std::vector& /*links*/) { SamplingResult result; - result.decision = Decision::RECORD_AND_SAMPLE; + result.decision = Decision::RecordAndSample; if (parent_context.has_value()) { result.tracestate = parent_context.value().tracestate(); } diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD b/source/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD index 551c83ca06da9..6245dc6146199 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD @@ -25,12 +25,17 @@ envoy_cc_library( name = "dynatrace_sampler_lib", srcs = [ "dynatrace_sampler.cc", + "sampler_config.cc", "sampler_config_fetcher.cc", + "sampling_controller.cc", ], hdrs = [ "dynatrace_sampler.h", "sampler_config.h", "sampler_config_fetcher.h", + "sampling_controller.h", + "stream_summary.h", + "tenant_id.h", ], deps = [ "//source/common/config:datasource_lib", diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.cc b/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.cc index 9e410e75ea0c8..e19943aa92317 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.cc +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.cc @@ -1,4 +1,6 @@ -#include "config.h" +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/config.h" + +#include #include "envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.pb.validate.h" @@ -16,11 +18,14 @@ DynatraceSamplerFactory::createSampler(const Protobuf::Message& config, Server::Configuration::TracerFactoryContext& context) { auto mptr = Envoy::Config::Utility::translateAnyToFactoryConfig( dynamic_cast(config), context.messageValidationVisitor(), *this); - return std::make_shared( - MessageUtil::downcastAndValidate< - const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig&>( - *mptr, context.messageValidationVisitor()), - context); + + const auto& proto_config = MessageUtil::downcastAndValidate< + const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig&>( + *mptr, context.messageValidationVisitor()); + + SamplerConfigFetcherPtr cf = std::make_unique( + context, proto_config.http_uri(), proto_config.token()); + return std::make_shared(proto_config, context, std::move(cf)); } /** diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.h b/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.h index bc1baf6c34515..672040818b651 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.h +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/config.h @@ -18,7 +18,7 @@ namespace OpenTelemetry { class DynatraceSamplerFactory : public SamplerFactory { public: /** - * @brief Create a Sampler which samples every span + * @brief Create a Dynatrace sampler * * @param context * @return SamplerSharedPtr diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.cc b/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.cc index 47777bc35aee6..ebf54c6dbd64e 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.cc +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.cc @@ -4,36 +4,110 @@ #include #include +#include "source/common/common/hash.h" #include "source/common/config/datasource.h" +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id.h" #include "source/extensions/tracers/opentelemetry/samplers/sampler.h" #include "source/extensions/tracers/opentelemetry/span_context.h" #include "source/extensions/tracers/opentelemetry/trace_state.h" +#include "absl/strings/str_cat.h" + namespace Envoy { namespace Extensions { namespace Tracers { namespace OpenTelemetry { -static const char* SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME = - "sampling_extrapolation_set_in_sampler"; +namespace { + +constexpr std::chrono::minutes SAMPLING_UPDATE_TIMER_DURATION{1}; +const char* SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME = "sampling_extrapolation_set_in_sampler"; + +class DynatraceTag { +public: + static DynatraceTag createInvalid() { return {false, false, 0, 0}; } + + static DynatraceTag create(bool ignored, uint32_t sampling_exponent, uint32_t path_info) { + return {true, ignored, sampling_exponent, path_info}; + } + + static DynatraceTag create(const std::string& value) { + std::vector tracestate_components = + absl::StrSplit(value, ';', absl::AllowEmpty()); + if (tracestate_components.size() < 8) { + return createInvalid(); + } + + if (tracestate_components[0] != "fw4") { + return createInvalid(); + } + bool ignored = tracestate_components[5] == "1"; + uint32_t sampling_exponent; + uint32_t path_info; + if (absl::SimpleAtoi(tracestate_components[6], &sampling_exponent) && + absl::SimpleHexAtoi(tracestate_components[7], &path_info)) { + return {true, ignored, sampling_exponent, path_info}; + } + return createInvalid(); + } + + std::string asString() const { + std::string ret = absl::StrCat("fw4;0;0;0;0;", ignored_ ? "1" : "0", ";", sampling_exponent_, + ";", absl::Hex(path_info_)); + return ret; + } + + bool isValid() const { return valid_; }; + bool isIgnored() const { return ignored_; }; + int getSamplingExponent() const { return sampling_exponent_; }; + uint32_t getPathInfo() const { return path_info_; }; + +private: + DynatraceTag(bool valid, bool ignored, uint32_t sampling_exponent, uint32_t path_info) + : valid_(valid), ignored_(ignored), sampling_exponent_(sampling_exponent), + path_info_(path_info) {} + + bool valid_; + bool ignored_; + uint32_t sampling_exponent_; + uint32_t path_info_; +}; + +} // namespace DynatraceSampler::DynatraceSampler( const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig& config, - Server::Configuration::TracerFactoryContext& context) - : tenant_id_(config.tenant_id()), cluster_id_(config.cluster_id()), - dt_tracestate_key_(absl::StrCat(absl::string_view(config.tenant_id()), "-", + Server::Configuration::TracerFactoryContext& context, + SamplerConfigFetcherPtr sampler_config_fetcher) + : dt_tracestate_key_(absl::StrCat(calculateTenantId(config.tenant()), "-", absl::string_view(config.cluster_id()), "@dt")), - sampler_config_fetcher_(context, config.http_uri(), config.token()), counter_(0) {} + sampling_controller_(std::move(sampler_config_fetcher)) { + + timer_ = context.serverFactoryContext().mainThreadDispatcher().createTimer([this]() -> void { + sampling_controller_.update(); + timer_->enableTimer(SAMPLING_UPDATE_TIMER_DURATION); + }); + timer_->enableTimer(SAMPLING_UPDATE_TIMER_DURATION); +} SamplingResult DynatraceSampler::shouldSample(const absl::optional parent_context, - const std::string& /*trace_id*/, + const std::string& trace_id, const std::string& /*name*/, OTelSpanKind /*kind*/, - OptRef /*trace_context*/, + OptRef trace_context, const std::vector& /*links*/) { SamplingResult result; std::map att; + // trace_context->path() returns path and query. query part is removed in getSamplingKey() + const std::string sampling_key = + trace_context.has_value() + ? sampling_controller_.getSamplingKey(trace_context->path(), trace_context->method()) + : ""; + + // add it to stream summary containing the number of requests + sampling_controller_.offer(sampling_key); + auto trace_state = TraceState::fromHeader(parent_context.has_value() ? parent_context->tracestate() : ""); @@ -41,30 +115,28 @@ SamplingResult DynatraceSampler::shouldSample(const absl::optional if (trace_state->get(dt_tracestate_key_, trace_state_value)) { // we found a DT trace decision in tracestate header - if (FW4Tag fw4_tag = FW4Tag::create(trace_state_value); fw4_tag.isValid()) { - result.decision = fw4_tag.isIgnored() ? Decision::Drop : Decision::RecordAndSample; + if (DynatraceTag dynatrace_tag = DynatraceTag::create(trace_state_value); + dynatrace_tag.isValid()) { + result.decision = dynatrace_tag.isIgnored() ? Decision::Drop : Decision::RecordAndSample; + // TODO: change attribute name and value in scope of OA-26680 att[SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME] = - std::to_string(fw4_tag.getSamplingExponent()); + std::to_string(dynatrace_tag.getSamplingExponent()); result.tracestate = parent_context->tracestate(); } - } else { // make a sampling decision - // this is just a demo, we sample every second request here - uint32_t current_counter = ++counter_; - bool sample; - int sampling_exponent; - if (current_counter % 2) { - sample = true; - sampling_exponent = 1; - } else { - sample = false; - sampling_exponent = 0; - } - + } else { + // do a decision based on the calculated exponent + // at the moment we use a hash of the trace_id as random number + const auto hash = MurmurHash::murmurHash2(trace_id); + const auto sampling_state = sampling_controller_.getSamplingState(sampling_key); + const bool sample = sampling_state.shouldSample(hash); + const auto sampling_exponent = sampling_state.getExponent(); + // TODO: change attribute name and value in scope of OA-26680 att[SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME] = std::to_string(sampling_exponent); result.decision = sample ? Decision::RecordAndSample : Decision::Drop; // create new forward tag and add it to tracestate - FW4Tag new_tag = FW4Tag::create(!sample, sampling_exponent); + DynatraceTag new_tag = + DynatraceTag::create(!sample, sampling_exponent, static_cast(hash)); trace_state = trace_state->set(dt_tracestate_key_, new_tag.asString()); result.tracestate = trace_state->toHeader(); } diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.h b/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.h index 9c12458b1357e..4044fe65164d8 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.h +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.h @@ -1,66 +1,33 @@ #pragma once +#include + #include "envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.pb.h" #include "envoy/server/factory_context.h" #include "source/common/common/logger.h" #include "source/common/config/datasource.h" #include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h" +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h" +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h" #include "source/extensions/tracers/opentelemetry/samplers/sampler.h" +#include "absl/synchronization/mutex.h" + namespace Envoy { namespace Extensions { namespace Tracers { namespace OpenTelemetry { -class FW4Tag { -public: - static FW4Tag createInvalid() { return {false, false, 0}; } - - static FW4Tag create(bool ignored, int sampling_exponent) { - return {true, ignored, sampling_exponent}; - } - - static FW4Tag create(const std::string& value) { - std::vector tracestate_components = - absl::StrSplit(value, ';', absl::AllowEmpty()); - if (tracestate_components.size() < 7) { - return createInvalid(); - } - - if (tracestate_components[0] != "fw4") { - return createInvalid(); - } - bool ignored = tracestate_components[5] == "1"; - int sampling_exponent = std::stoi(std::string(tracestate_components[6])); - return {true, ignored, sampling_exponent}; - } - - std::string asString() const { - return absl::StrCat("fw4;0;0;0;0;", ignored_ ? "1" : "0", ";", sampling_exponent_, ";0"); - } - - bool isValid() const { return valid_; }; - bool isIgnored() const { return ignored_; }; - int getSamplingExponent() const { return sampling_exponent_; }; - -private: - FW4Tag(bool valid, bool ignored, int sampling_exponent) - : valid_(valid), ignored_(ignored), sampling_exponent_(sampling_exponent) {} - - bool valid_; - bool ignored_; - int sampling_exponent_; -}; - /** - * @brief A Dynatrace specific sampler * + * @brief A Dynatrace specific sampler */ class DynatraceSampler : public Sampler, Logger::Loggable { public: DynatraceSampler( const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig& config, - Server::Configuration::TracerFactoryContext& context); + Server::Configuration::TracerFactoryContext& context, + SamplerConfigFetcherPtr sampler_config_fetcher); SamplingResult shouldSample(const absl::optional parent_context, const std::string& trace_id, const std::string& name, @@ -71,11 +38,9 @@ class DynatraceSampler : public Sampler, Logger::Loggable { std::string getDescription() const override; private: - std::string tenant_id_; - std::string cluster_id_; std::string dt_tracestate_key_; - SamplerConfigFetcher sampler_config_fetcher_; - std::atomic counter_; // request counter for dummy sampling + Event::TimerPtr timer_; + SamplingController sampling_controller_; }; } // namespace OpenTelemetry diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config.cc b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config.cc new file mode 100644 index 0000000000000..ccfb956bb6ced --- /dev/null +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config.cc @@ -0,0 +1,28 @@ +#include + +#include "source/common/json/json_loader.h" +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h" + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +void SamplerConfig::parse(const std::string& json) { + const auto result = Envoy::Json::Factory::loadFromStringNoThrow(json); + if (result.ok()) { + const auto& obj = result.value(); + if (obj->hasObject("rootSpansPerMinute")) { + const auto value = obj->getInteger("rootSpansPerMinute", ROOT_SPANS_PER_MINUTE_DEFAULT); + root_spans_per_minute_.store(value); + return; + } + } + // didn't get a value, reset to default + root_spans_per_minute_.store(ROOT_SPANS_PER_MINUTE_DEFAULT); +} + +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config.h b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config.h index 4b0ec5a9a1a31..af75cbc555a5f 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config.h +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config.h @@ -3,9 +3,6 @@ #include #include #include -#include - -#include "source/common/json/json_loader.h" namespace Envoy { namespace Extensions { @@ -14,25 +11,14 @@ namespace OpenTelemetry { class SamplerConfig { public: - static constexpr uint64_t ROOT_SPANS_PER_MINUTE_DEFAULT = 1000; + static constexpr uint32_t ROOT_SPANS_PER_MINUTE_DEFAULT = 1000; - void parse(const std::string& json) { - root_spans_per_minute_.store(ROOT_SPANS_PER_MINUTE_DEFAULT); // reset to default - auto result = Envoy::Json::Factory::loadFromStringNoThrow(json); - if (result.ok()) { - auto obj = result.value(); - if (obj->hasObject("rootSpansPerMinute")) { - auto value = obj->getInteger("rootSpansPerMinute", ROOT_SPANS_PER_MINUTE_DEFAULT); - root_spans_per_minute_.store(value); - } - (void)obj; - } - } + void parse(const std::string& json); - uint64_t getRootSpansPerMinute() const { return root_spans_per_minute_.load(); } + uint32_t getRootSpansPerMinute() const { return root_spans_per_minute_.load(); } private: - std::atomic root_spans_per_minute_ = ROOT_SPANS_PER_MINUTE_DEFAULT; + std::atomic root_spans_per_minute_{ROOT_SPANS_PER_MINUTE_DEFAULT}; }; } // namespace OpenTelemetry diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.cc b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.cc index 7c73c058ee9f2..67197f5ecc1ed 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.cc +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.cc @@ -11,9 +11,9 @@ namespace OpenTelemetry { static constexpr std::chrono::seconds INITIAL_TIMER_DURATION{10}; static constexpr std::chrono::minutes TIMER_INTERVAL{5}; -SamplerConfigFetcher::SamplerConfigFetcher(Server::Configuration::TracerFactoryContext& context, - const envoy::config::core::v3::HttpUri& http_uri, - const std::string& token) +SamplerConfigFetcherImpl::SamplerConfigFetcherImpl( + Server::Configuration::TracerFactoryContext& context, + const envoy::config::core::v3::HttpUri& http_uri, const std::string& token) : cluster_manager_(context.serverFactoryContext().clusterManager()), http_uri_(http_uri), parsed_authorization_header_to_add_( {Http::LowerCaseString("authorization"), absl::StrCat("Api-Token ", token)}), @@ -22,7 +22,7 @@ SamplerConfigFetcher::SamplerConfigFetcher(Server::Configuration::TracerFactoryC timer_ = context.serverFactoryContext().mainThreadDispatcher().createTimer([this]() -> void { const auto thread_local_cluster = cluster_manager_.getThreadLocalCluster(http_uri_.cluster()); if (thread_local_cluster == nullptr) { - ENVOY_LOG(error, "SamplerConfigFetcher failed: [cluster = {}] is not configured", + ENVOY_LOG(error, "SamplerConfigFetcherImpl failed: [cluster = {}] is not configured", http_uri_.cluster()); } else { Http::RequestMessagePtr message = Http::Utility::prepareHeaders(http_uri_); @@ -40,33 +40,32 @@ SamplerConfigFetcher::SamplerConfigFetcher(Server::Configuration::TracerFactoryC timer_->enableTimer(std::chrono::seconds(INITIAL_TIMER_DURATION)); } -SamplerConfigFetcher::~SamplerConfigFetcher() { +SamplerConfigFetcherImpl::~SamplerConfigFetcherImpl() { if (active_request_) { active_request_->cancel(); } } -void SamplerConfigFetcher::onSuccess(const Http::AsyncClient::Request& /*request*/, - Http::ResponseMessagePtr&& http_response) { +void SamplerConfigFetcherImpl::onSuccess(const Http::AsyncClient::Request& /*request*/, + Http::ResponseMessagePtr&& http_response) { onRequestDone(); const auto response_code = Http::Utility::getResponseStatus(http_response->headers()); - if (response_code != enumToInt(Http::Code::OK)) { - ENVOY_LOG(error, "SamplerConfigFetcher received a non-success status code: {}", response_code); - } else { - ENVOY_LOG(info, "SamplerConfigFetcher received success status code: {}", response_code); + if (response_code == enumToInt(Http::Code::OK)) { + ENVOY_LOG(debug, "Received sampling configuration from Dynatrace: {}", + http_response->bodyAsString()); sampler_config_.parse(http_response->bodyAsString()); + } else { + ENVOY_LOG(warn, "Failed to get sampling configuration from Dynatrace: {}", response_code); } } -void SamplerConfigFetcher::onFailure(const Http::AsyncClient::Request& /*request*/, - Http::AsyncClient::FailureReason reason) { +void SamplerConfigFetcherImpl::onFailure(const Http::AsyncClient::Request& /*request*/, + Http::AsyncClient::FailureReason reason) { onRequestDone(); ENVOY_LOG(info, "The OTLP export request failed. Reason {}", enumToInt(reason)); } -void SamplerConfigFetcher::onRequestDone() { - // TODO: should we re-enable timer after send() to avoid having the request duration added to the - // timer? If so, we would need a list containing the active requests (not a single pointer) +void SamplerConfigFetcherImpl::onRequestDone() { active_request_ = nullptr; timer_->enableTimer(std::chrono::seconds(TIMER_INTERVAL)); } diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h index b8a0a50085b5f..9e88e34d06943 100644 --- a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -21,11 +22,20 @@ namespace Extensions { namespace Tracers { namespace OpenTelemetry { -class SamplerConfigFetcher : public Logger::Loggable, - public Http::AsyncClient::Callbacks { +class SamplerConfigFetcher { public: - SamplerConfigFetcher(Server::Configuration::TracerFactoryContext& context, - const envoy::config::core::v3::HttpUri& http_uri, const std::string& token); + virtual ~SamplerConfigFetcher() = default; + + virtual const SamplerConfig& getSamplerConfig() const = 0; +}; + +class SamplerConfigFetcherImpl : public SamplerConfigFetcher, + public Logger::Loggable, + public Http::AsyncClient::Callbacks { +public: + SamplerConfigFetcherImpl(Server::Configuration::TracerFactoryContext& context, + const envoy::config::core::v3::HttpUri& http_uri, + const std::string& token); void onSuccess(const Http::AsyncClient::Request& request, Http::ResponseMessagePtr&& response) override; @@ -35,9 +45,9 @@ class SamplerConfigFetcher : public Logger::Loggable, void onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span& /*span*/, const Http::ResponseHeaderMap* /*response_headers*/) override{}; - const SamplerConfig& getSamplerConfig() const { return sampler_config_; } + const SamplerConfig& getSamplerConfig() const override { return sampler_config_; } - ~SamplerConfigFetcher() override; + ~SamplerConfigFetcherImpl() override; private: Event::TimerPtr timer_; @@ -50,6 +60,8 @@ class SamplerConfigFetcher : public Logger::Loggable, void onRequestDone(); }; +using SamplerConfigFetcherPtr = std::unique_ptr; + } // namespace OpenTelemetry } // namespace Tracers } // namespace Extensions diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.cc b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.cc new file mode 100644 index 0000000000000..17043609ce2a6 --- /dev/null +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.cc @@ -0,0 +1,170 @@ +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h" + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +namespace {} + +void SamplingController::update() { + absl::WriterMutexLock lock{&stream_summary_mutex_}; + const auto top_k = stream_summary_->getTopK(); + const auto last_period_count = stream_summary_->getN(); + + // update sampling exponents + update(top_k, last_period_count, + sampler_config_fetcher_->getSamplerConfig().getRootSpansPerMinute()); + // Note: getTopK() returns references to values in StreamSummary. + // Do not destroy it while top_k is used! + stream_summary_ = std::make_unique(STREAM_SUMMARY_SIZE); +} + +void SamplingController::update(const TopKListT& top_k, uint64_t last_period_count, + const uint32_t total_wanted) { + + SamplingExponentsT new_sampling_exponents; + // start with sampling exponent 0, which means multiplicity == 1 (every span is sampled) + for (auto const& counter : top_k) { + new_sampling_exponents[counter.getItem()] = SamplingState(0); + } + + // use the last entry as "rest bucket", which is used for new/unknown requests + rest_bucket_key_ = (!top_k.empty()) ? top_k.back().getItem() : ""; + + calculateSamplingExponents(top_k, total_wanted, new_sampling_exponents); + last_effective_count_ = calculateEffectiveCount(top_k, new_sampling_exponents); + logSamplingInfo(top_k, new_sampling_exponents, last_period_count, total_wanted); + + absl::WriterMutexLock lock{&sampling_exponents_mutex_}; + sampling_exponents_ = std::move(new_sampling_exponents); +} + +uint64_t SamplingController::getEffectiveCount() const { + absl::ReaderMutexLock lock{&stream_summary_mutex_}; + return last_effective_count_; +} + +void SamplingController::offer(const std::string& sampling_key) { + if (!sampling_key.empty()) { + absl::WriterMutexLock lock{&stream_summary_mutex_}; + stream_summary_->offer(sampling_key); + } +} + +SamplingState SamplingController::getSamplingState(const std::string& sampling_key) const { + { // scope for lock + absl::ReaderMutexLock sax_lock{&sampling_exponents_mutex_}; + auto iter = sampling_exponents_.find(sampling_key); + if (iter != sampling_exponents_.end()) { + return iter->second; + } + + // try to use "rest bucket" + auto rest_bucket_iter = sampling_exponents_.find(rest_bucket_key_); + if (rest_bucket_iter != sampling_exponents_.end()) { + return rest_bucket_iter->second; + } + } + + // If we can't find a sampling exponent, we calculate it based on the total number of requests + // in this period. This should also handle the "warmup phase" where no top_k is available + const auto divisor = sampler_config_fetcher_->getSamplerConfig().getRootSpansPerMinute() / 2; + if (divisor == 0) { + return SamplingState{MAX_SAMPLING_EXPONENT}; + } + absl::ReaderMutexLock ss_lock{&stream_summary_mutex_}; + const uint32_t exp = stream_summary_->getN() / divisor; + return SamplingState{exp}; +} + +std::string SamplingController::getSamplingKey(const absl::string_view path_query, + const absl::string_view method) { + const size_t query_offset = path_query.find('?'); + auto path = + path_query.substr(0, query_offset != path_query.npos ? query_offset : path_query.size()); + return absl::StrCat(method, "_", path); +} + +void SamplingController::logSamplingInfo(const TopKListT& top_k, + const SamplingExponentsT& new_sampling_exponents, + uint64_t last_period_count, + const uint32_t total_wanted) const { + ENVOY_LOG(debug, + "Updating sampling info. top_k.size(): {}, last_period_count: {}, total_wanted: {}", + top_k.size(), last_period_count, total_wanted); + for (auto const& counter : top_k) { + auto sampling_state = new_sampling_exponents.find(counter.getItem()); + ENVOY_LOG(debug, "- {}: value: {}, exponent: {}", counter.getItem(), counter.getValue(), + sampling_state->second.getExponent()); + } +} + +uint64_t SamplingController::calculateEffectiveCount(const TopKListT& top_k, + const SamplingExponentsT& sampling_exponents) { + uint64_t cnt = 0; + for (auto const& counter : top_k) { + auto sampling_state = sampling_exponents.find(counter.getItem()); + if (sampling_state == sampling_exponents.end()) { + continue; + } + auto counterVal = counter.getValue(); + auto mul = sampling_state->second.getMultiplicity(); + auto res = counterVal / mul; + cnt += res; + } + return cnt; +} + +void SamplingController::calculateSamplingExponents( + const TopKListT& top_k, const uint32_t total_wanted, + SamplingExponentsT& new_sampling_exponents) const { + const auto top_k_size = top_k.size(); + if (top_k_size == 0 || total_wanted == 0) { + return; + } + + // number of requests which are allowed for every entry + const uint32_t allowed_per_entry = total_wanted / top_k_size; + + for (auto& counter : top_k) { + // allowed multiplicity for this entry + auto wanted_multiplicity = counter.getValue() / allowed_per_entry; + if (wanted_multiplicity < 0) { + wanted_multiplicity = 1; + } + auto sampling_state = new_sampling_exponents.find(counter.getItem()); + // sampling exponent has to be a power of 2. Find the exponent to have multiplicity near to + // wanted_multiplicity + while (wanted_multiplicity > sampling_state->second.getMultiplicity() && + sampling_state->second.getExponent() < MAX_SAMPLING_EXPONENT) { + sampling_state->second.increaseExponent(); + } + if (wanted_multiplicity < sampling_state->second.getMultiplicity()) { + // we want to have multiplicity <= wanted_multiplicity, therefore exponent is decrease once. + sampling_state->second.decreaseExponent(); + } + } + + auto effective_count = calculateEffectiveCount(top_k, new_sampling_exponents); + // There might be entries where allowed_per_entry is greater than their count. + // Therefore, we would sample nubmer of total_wanted requests + // To avoid this, we decrease the exponent of other entries if possible + if (effective_count < total_wanted) { + for (int i = 0; i < 5; i++) { // max tries + for (auto reverse_it = top_k.rbegin(); reverse_it != top_k.rend(); + ++reverse_it) { // start with lowest frequency + auto rev_sampling_state = new_sampling_exponents.find(reverse_it->getItem()); + rev_sampling_state->second.decreaseExponent(); + effective_count = calculateEffectiveCount(top_k, new_sampling_exponents); + if (effective_count >= total_wanted) { // we are done + return; + } + } + } + } +} +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h new file mode 100644 index 0000000000000..8a2294d23501d --- /dev/null +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h @@ -0,0 +1,87 @@ +#pragma once + +#include +#include + +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h" +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h" + +#include "absl/synchronization/mutex.h" + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +class SamplingState { +public: + [[nodiscard]] static uint32_t toMultiplicity(uint32_t exponent) { return 1 << exponent; } + [[nodiscard]] uint32_t getExponent() const { return exponent_; } + [[nodiscard]] uint32_t getMultiplicity() const { return toMultiplicity(exponent_); } + void increaseExponent() { exponent_++; } + void decreaseExponent() { + if (exponent_ > 0) { + exponent_--; + } + } + + explicit SamplingState(uint32_t exponent) : exponent_(exponent){}; + + SamplingState() = default; + + bool shouldSample(const uint64_t random_nr) const { return (random_nr % getMultiplicity() == 0); } + +private: + uint32_t exponent_{0}; +}; + +using StreamSummaryT = StreamSummary; +using TopKListT = std::list>; + +class SamplingController : public Logger::Loggable { + +public: + explicit SamplingController(SamplerConfigFetcherPtr sampler_config_fetcher) + : stream_summary_(std::make_unique(STREAM_SUMMARY_SIZE)), + sampler_config_fetcher_(std::move(sampler_config_fetcher)) {} + + void update(); + + SamplingState getSamplingState(const std::string& sampling_key) const; + + uint64_t getEffectiveCount() const; + + void offer(const std::string& sampling_key); + + static std::string getSamplingKey(const absl::string_view path_query, + const absl::string_view method); + + static constexpr size_t STREAM_SUMMARY_SIZE{100}; + +private: + using SamplingExponentsT = absl::flat_hash_map; + SamplingExponentsT sampling_exponents_; + mutable absl::Mutex sampling_exponents_mutex_{}; + std::string rest_bucket_key_{}; + static constexpr uint32_t MAX_SAMPLING_EXPONENT = (1 << 4) - 1; // 15 + std::unique_ptr stream_summary_; + uint64_t last_effective_count_{}; + mutable absl::Mutex stream_summary_mutex_{}; + SamplerConfigFetcherPtr sampler_config_fetcher_; + + void logSamplingInfo(const TopKListT& top_k, const SamplingExponentsT& new_sampling_exponents, + uint64_t last_period_count, const uint32_t total_wanted) const; + + static uint64_t calculateEffectiveCount(const TopKListT& top_k, + const SamplingExponentsT& sampling_exponents); + + void calculateSamplingExponents(const TopKListT& top_k, const uint32_t total_wanted, + SamplingExponentsT& new_sampling_exponents) const; + + void update(const TopKListT& top_k, uint64_t last_period_count, const uint32_t total_wanted); +}; + +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h b/source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h new file mode 100644 index 0000000000000..15332497296e1 --- /dev/null +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h @@ -0,0 +1,206 @@ +#pragma once + +#include +#include +#include + +#include "source/common/common/assert.h" + +#include "absl/container/flat_hash_map.h" +#include "absl/status/status.h" +#include "absl/types/optional.h" + +// port of https://github.com/fzakaria/space-saving/tree/master + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +namespace detail { + +template struct Bucket; + +template using BucketIterator = typename std::list>::iterator; + +template struct Counter { + BucketIterator bucket; + absl::optional item{}; + uint64_t value{}; + uint64_t error{}; + + explicit Counter(BucketIterator bucket) : bucket(bucket) {} + Counter(Counter const&) = delete; + Counter& operator=(Counter const&) = delete; +}; + +template using CounterIterator = typename std::list>::iterator; + +template struct Bucket { + uint64_t value; + std::list> children{}; + + explicit Bucket(uint64_t value) : value(value) {} + Bucket(Bucket const&) = delete; + Bucket& operator=(Bucket const&) = delete; +}; + +} // namespace detail + +template class Counter { +private: + T const& item_; + uint64_t value_; + uint64_t error_; + +public: + Counter(detail::Counter const& c) : item_(*c.item), value_(c.value), error_(c.error) {} + + T const& getItem() const { return item_; } + uint64_t getValue() const { return value_; } + uint64_t getError() const { return error_; } +}; + +template class StreamSummary { +private: + const size_t capacity_; + uint64_t n_{}; + absl::flat_hash_map> cache_{}; + std::list> buckets_{}; + + typename detail::CounterIterator incrementCounter(detail::CounterIterator counter_iter, + const uint64_t increment) { + auto const bucket = counter_iter->bucket; + auto bucket_next = std::prev(bucket); + counter_iter->value += increment; + + detail::CounterIterator elem; + if (bucket_next != buckets_.end() && counter_iter->value == bucket_next->value) { + counter_iter->bucket = bucket_next; + bucket_next->children.splice(bucket_next->children.end(), bucket->children, counter_iter); + elem = std::prev(bucket_next->children.end()); + } else { + auto bucket_new = buckets_.emplace(bucket, counter_iter->value); + counter_iter->bucket = bucket_new; + bucket_new->children.splice(bucket_new->children.end(), bucket->children, counter_iter); + elem = std::prev(bucket_new->children.end()); + } + if (bucket->children.empty()) { + buckets_.erase(bucket); + } + return elem; + } + + absl::Status validateInternal() const { + auto cache_copy = cache_; + auto current_bucket = buckets_.begin(); + uint64_t value_sum = 0; + while (current_bucket != buckets_.end()) { + auto prev = std::prev(current_bucket); + if (prev != buckets_.end() && prev->value <= current_bucket->value) { + return absl::InternalError("buckets should be in descending order."); + } + auto current_child = current_bucket->children.begin(); + while (current_child != current_bucket->children.end()) { + if (current_child->bucket != current_bucket) { + return absl::InternalError("entry should point to its bucket."); + } + if (current_child->value != current_bucket->value) { + return absl::InternalError("entry and bucket should have the same value."); + } + if (current_child->item) { + auto old_iter = cache_copy.find(*current_child->item); + if (old_iter != cache_copy.end()) { + cache_copy.erase(old_iter); + } + } + value_sum += current_child->value; + current_child++; + } + current_bucket++; + } + if (!cache_copy.empty()) { + return absl::InternalError("there should be no dead cached entries."); + } + if (cache_.size() > capacity_) { + return absl::InternalError("cache size must not exceed capacity"); + } + if (value_sum != n_) { + return absl::InternalError("sum of all counter->value() must be equal to n"); + } + return absl::OkStatus(); + } + + inline void validateDbg() { +#if !defined(NDEBUG) + ASSERT(validate().ok()); +#endif + } + +public: + explicit StreamSummary(const size_t capacity) : capacity_(capacity) { + auto& new_bucket = buckets_.emplace_back(0); + for (size_t i = 0; i < capacity; ++i) { + // initialize with empty counters, optional item will not be set + new_bucket.children.emplace_back(buckets_.begin()); + } + validateDbg(); + } + + size_t getCapacity() const { return capacity_; } + + absl::Status validate() const { return validateInternal(); } + + Counter offer(T const& item, const uint64_t increment = 1) { + ++n_; + auto iter = cache_.find(item); + if (iter != cache_.end()) { + iter->second = incrementCounter(iter->second, increment); + validateDbg(); + return *iter->second; + } else { + auto min_element = std::prev(buckets_.back().children.end()); + auto original_min_value = min_element->value; + if (min_element + ->item) { // element was already used (otherwise optional item would be not set) + // remove old from cache + auto old_iter = cache_.find(*min_element->item); + if (old_iter != cache_.end()) { + cache_.erase(old_iter); + } + } + min_element->item = item; + min_element = incrementCounter(min_element, increment); + cache_[item] = min_element; + if (cache_.size() <= capacity_) { + // should always be true, but keep it to be aligned to reference implementation + // originalMinValue will be 0 if element wasn't already used + min_element->error = original_min_value; + } + validateDbg(); + return *min_element; + } + } + + uint64_t getN() const { return n_; } + + typename std::list> getTopK(const size_t k = SIZE_MAX) const { + std::list> r; + for (auto const& bucket : buckets_) { + for (auto const& child : bucket.children) { + if (child.item) { + r.emplace_back(child); + if (r.size() == k) { + return r; + } + } + } + } + return r; + } +}; + +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id.h b/source/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id.h new file mode 100644 index 0000000000000..28da1b3bbd5ea --- /dev/null +++ b/source/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include + +#include "absl/strings/str_cat.h" +#include "openssl/md5.h" + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +namespace { + +absl::Hex calculateTenantId(std::string tenant_uuid) { + if (tenant_uuid.empty()) { + return absl::Hex(0); + } + + for (char& c : tenant_uuid) { + if (c & 0x80) { + c = 0x3f; // '?' + } + } + + uint8_t digest[16]; + MD5(reinterpret_cast(tenant_uuid.data()), tenant_uuid.size(), digest); + + int32_t hash = 0; + for (int i = 0; i < 16; i++) { + const int shift_for_target_byte = (3 - (i % 4)) * 8; + // 24, 16, 8, 0 respectively + hash ^= + (static_cast(digest[i]) << shift_for_target_byte) & (0xff << shift_for_target_byte); + } + return absl::Hex(hash); +} +} // namespace +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/tracers/opentelemetry/tracer.cc b/source/extensions/tracers/opentelemetry/tracer.cc index 7f03a8ddcaefd..a9eb59b84e6ae 100644 --- a/source/extensions/tracers/opentelemetry/tracer.cc +++ b/source/extensions/tracers/opentelemetry/tracer.cc @@ -31,7 +31,7 @@ void callSampler(SamplerSharedPtr sampler, const absl::optional spa return; } const auto sampling_result = - sampler->shouldSample(span_context, operation_name, new_span.getTraceIdAsHex(), + sampler->shouldSample(span_context, new_span.getTraceIdAsHex(), operation_name, new_span.spankind(), trace_context, {}); new_span.setSampled(sampling_result.isSampled()); diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD b/test/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD index b989c97dc11d2..fbbdba66448f2 100644 --- a/test/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/BUILD @@ -30,6 +30,9 @@ envoy_extension_cc_test( "dynatrace_sampler_test.cc", "sampler_config_fetcher_test.cc", "sampler_config_test.cc", + "sampling_controller_test.cc", + "stream_summary_test.cc", + "tenant_id_test.cc", ], extension_names = ["envoy.tracers.opentelemetry.samplers.dynatrace"], deps = [ diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc b/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc index 998ce7fab4e55..163d95e3310bf 100644 --- a/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_integration_test.cc @@ -37,7 +37,7 @@ class DynatraceSamplerIntegrationTest : public Envoy::HttpIntegrationTest, name: envoy.tracers.opentelemetry.samplers.dynatrace typed_config: "@type": type.googleapis.com/envoy.extensions.tracers.opentelemetry.samplers.v3.DynatraceSamplerConfig - tenant_id: "9712ad40" + tenant: "abc12345" cluster_id: "980df25c" )EOF"; @@ -82,7 +82,11 @@ TEST_P(DynatraceSamplerIntegrationTest, TestWithTraceparentAndTracestate) { .get(Http::LowerCaseString("tracestate"))[0] ->value() .getStringView(); - EXPECT_EQ("9712ad40-980df25c@dt=fw4;0;0;0;0;0;1;0,key=value", tracestate_value); + // use StartsWith because pathinfo (last element in trace state contains a random value) + EXPECT_TRUE(absl::StartsWith(tracestate_value, "5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;")) + << "Received tracestate: " << tracestate_value; + EXPECT_TRUE(absl::StrContains(tracestate_value, ",key=value")) + << "Received tracestate: " << tracestate_value; } // Sends a request with traceparent but no tracestate header. @@ -110,7 +114,9 @@ TEST_P(DynatraceSamplerIntegrationTest, TestWithTraceparentOnly) { .get(Http::LowerCaseString("tracestate"))[0] ->value() .getStringView(); - EXPECT_EQ("9712ad40-980df25c@dt=fw4;0;0;0;0;0;1;0", tracestate_value); + // use StartsWith because pathinfo (last element in trace state contains a random value) + EXPECT_TRUE(absl::StartsWith(tracestate_value, "5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;")) + << "Received tracestate: " << tracestate_value; } // Sends a request without traceparent and tracestate header. @@ -133,7 +139,8 @@ TEST_P(DynatraceSamplerIntegrationTest, TestWithoutTraceparentAndTracestate) { .get(Http::LowerCaseString("tracestate"))[0] ->value() .getStringView(); - EXPECT_EQ("9712ad40-980df25c@dt=fw4;0;0;0;0;0;1;0", tracestate_value); + EXPECT_TRUE(absl::StartsWith(tracestate_value, "5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;")) + << "Received tracestate: " << tracestate_value; } } // namespace diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_test.cc b/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_test.cc index 7f1cb68297122..8d98000ab2072 100644 --- a/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_test.cc +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler_test.cc @@ -16,57 +16,260 @@ namespace Extensions { namespace Tracers { namespace OpenTelemetry { +namespace { + +const char* trace_id = "67a9a23155e1741b5b35368e08e6ece5"; + +const char* parent_span_id = "9d83def9a4939b7b"; + +const char* dt_tracestate_ignored = + "5b3f9fed-980df25c@dt=fw4;4;4af38366;0;0;1;2;123;8eae;2h01;3h4af38366;4h00;5h01;" + "6h67a9a23155e1741b5b35368e08e6ece5;7h9d83def9a4939b7b"; +const char* dt_tracestate_sampled = + "5b3f9fed-980df25c@dt=fw4;4;4af38366;0;0;0;0;123;8eae;2h01;3h4af38366;4h00;5h01;" + "6h67a9a23155e1741b5b35368e08e6ece5;7h9d83def9a4939b7b"; +const char* dt_tracestate_ignored_different_tenant = + "6666ad40-980df25c@dt=fw4;4;4af38366;0;0;1;2;123;8eae;2h01;3h4af38366;4h00;5h01;" + "6h67a9a23155e1741b5b35368e08e6ece5;7h9d83def9a4939b7b"; + +} // namespace + +class MockSamplerConfigFetcher : public SamplerConfigFetcher { +public: + MOCK_METHOD(const SamplerConfig&, getSamplerConfig, (), (const override)); +}; + class DynatraceSamplerTest : public testing::Test { const std::string yaml_string_ = R"EOF( - tenant_id: "9712ad40" + tenant: "abc12345" cluster_id: "980df25c" )EOF"; public: DynatraceSamplerTest() { - TestUtility::loadFromYaml(yaml_string_, config_); - NiceMock context; - sampler_ = std::make_unique(config_, context); - EXPECT_STREQ(sampler_->getDescription().c_str(), "DynatraceSampler"); + TestUtility::loadFromYaml(yaml_string_, proto_config_); + auto scf = std::make_unique>(); + ON_CALL(*scf, getSamplerConfig()).WillByDefault(testing::ReturnRef(sampler_config_)); + + timer_ = new NiceMock( + &tracer_factory_context_.server_factory_context_.dispatcher_); + ON_CALL(tracer_factory_context_.server_factory_context_.dispatcher_, createTimer_(_)) + .WillByDefault(Invoke([this](Event::TimerCb) { return timer_; })); + sampler_ = + std::make_unique(proto_config_, tracer_factory_context_, std::move(scf)); } protected: - envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig config_; + NiceMock tracer_factory_context_; + envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig proto_config_; + SamplerConfig sampler_config_; + NiceMock* timer_; std::unique_ptr sampler_; }; +// Verify getDescription +TEST_F(DynatraceSamplerTest, TestGetDescription) { + EXPECT_STREQ(sampler_->getDescription().c_str(), "DynatraceSampler"); +} + // Verify sampler being invoked with an invalid/empty span context TEST_F(DynatraceSamplerTest, TestWithoutParentContext) { - auto sampling_result = - sampler_->shouldSample(absl::nullopt, "operation_name", "12345", + sampler_->shouldSample(absl::nullopt, trace_id, "operation_name", ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, {}, {}); EXPECT_EQ(sampling_result.decision, Decision::RecordAndSample); EXPECT_EQ(sampling_result.attributes->size(), 1); - EXPECT_STREQ(sampling_result.tracestate.c_str(), "9712ad40-980df25c@dt=fw4;0;0;0;0;0;1;0"); + EXPECT_STREQ(sampling_result.tracestate.c_str(), "5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;95"); EXPECT_TRUE(sampling_result.isRecording()); EXPECT_TRUE(sampling_result.isSampled()); } // Verify sampler being invoked with existing Dynatrace trace state tag set TEST_F(DynatraceSamplerTest, TestWithParentContext) { - SpanContext parent_context = - SpanContext("00", "0af7651916cd43dd8448eb211c80319c", "b7ad6b7169203331", true, - "ot=foo:bar,9712ad40-980df25c@dt=fw4;0;0;0;0;0;1;0"); + SpanContext parent_context = SpanContext("00", trace_id, "b7ad6b7169203331", true, + "ot=foo:bar,5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;ad"); SamplingResult sampling_result = - sampler_->shouldSample(parent_context, "operation_name", "parent_span", + sampler_->shouldSample(parent_context, trace_id, "parent_span", ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, {}, {}); + EXPECT_EQ(sampling_result.decision, Decision::RecordAndSample); + EXPECT_EQ(sampling_result.attributes->size(), 1); + EXPECT_STREQ(sampling_result.tracestate.c_str(), + "ot=foo:bar,5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;ad"); + EXPECT_TRUE(sampling_result.isRecording()); + EXPECT_TRUE(sampling_result.isSampled()); +} + +// Verify sampler being invoked with parent span context +TEST_F(DynatraceSamplerTest, TestWithUnknownParentContext) { + SpanContext parent_context("00", trace_id, parent_span_id, true, "some_vendor=some_value"); + auto sampling_result = + sampler_->shouldSample(parent_context, trace_id, "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, {}, {}); EXPECT_EQ(sampling_result.decision, Decision::RecordAndSample); EXPECT_EQ(sampling_result.attributes->size(), 1); EXPECT_STREQ(sampling_result.tracestate.c_str(), - "ot=foo:bar,9712ad40-980df25c@dt=fw4;0;0;0;0;0;1;0"); + "5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;95,some_vendor=some_value"); EXPECT_TRUE(sampling_result.isRecording()); EXPECT_TRUE(sampling_result.isSampled()); } +// Verify sampler being invoked with dynatrace trace parent +TEST_F(DynatraceSamplerTest, TestWithDynatraceParentContextSampled) { + SpanContext parent_context("00", trace_id, parent_span_id, true, dt_tracestate_sampled); + + auto sampling_result = + sampler_->shouldSample(parent_context, trace_id, "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, {}, {}); + EXPECT_EQ(sampling_result.decision, Decision::RecordAndSample); + EXPECT_EQ(sampling_result.attributes->size(), 1); + EXPECT_STREQ(sampling_result.tracestate.c_str(), dt_tracestate_sampled); + EXPECT_TRUE(sampling_result.isRecording()); + EXPECT_TRUE(sampling_result.isSampled()); +} + +// Verify sampler being invoked with dynatrace trace parent where ignored flag is set +TEST_F(DynatraceSamplerTest, TestWithDynatraceParentContextIgnored) { + SpanContext parent_context("00", trace_id, parent_span_id, true, dt_tracestate_ignored); + + auto sampling_result = + sampler_->shouldSample(parent_context, trace_id, "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, {}, {}); + EXPECT_EQ(sampling_result.decision, Decision::Drop); + EXPECT_EQ(sampling_result.attributes->size(), 1); + EXPECT_STREQ(sampling_result.tracestate.c_str(), dt_tracestate_ignored); + EXPECT_FALSE(sampling_result.isRecording()); + EXPECT_FALSE(sampling_result.isSampled()); +} + +// Verify sampler being invoked with dynatrace trace parent from a different tenant +TEST_F(DynatraceSamplerTest, TestWithDynatraceParentContextFromDifferentTenant) { + SpanContext parent_context("00", trace_id, parent_span_id, true, + dt_tracestate_ignored_different_tenant); + + auto sampling_result = + sampler_->shouldSample(parent_context, trace_id, "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, {}, {}); + // sampling decision on tracestate should be ignored because it is from a different tenant. + EXPECT_EQ(sampling_result.decision, Decision::RecordAndSample); + EXPECT_EQ(sampling_result.attributes->size(), 1); + const char* exptected = + "5b3f9fed-980df25c@dt=fw4;0;0;0;0;0;0;95,6666ad40-980df25c@dt=fw4;4;4af38366;0;0;1;2;123;" + "8eae;2h01;3h4af38366;4h00;5h01;6h67a9a23155e1741b5b35368e08e6ece5;7h9d83def9a4939b7b"; + EXPECT_STREQ(sampling_result.tracestate.c_str(), exptected); + EXPECT_TRUE(sampling_result.isRecording()); + EXPECT_TRUE(sampling_result.isSampled()); +} + +// Verify sampler being called during warmup phase (no recent top_k available) +TEST_F(DynatraceSamplerTest, TestWarmup) { + // config should allow 200 root spans per minute + sampler_config_.parse("{\n \"rootSpansPerMinute\" : 200 \n }"); + + Tracing::TestTraceContextImpl trace_context_1{}; + trace_context_1.context_method_ = "GET"; + trace_context_1.context_path_ = "/path"; + + // timer is not invoked, because we want to test warm up phase. + // we use 200 as threshold. As long as number of requests is < (threshold/2), exponent should be 0 + uint32_t ignored = 0; + uint32_t sampled = 0; + for (int i = 0; i < 99; i++) { + auto result = sampler_->shouldSample({}, std::to_string(1000 + i), "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_1, {}); + result.isSampled() ? sampled++ : ignored++; + } + EXPECT_EQ(ignored, 0); + EXPECT_EQ(sampled, 99); + + // next (threshold/2) spans will get exponent 1, every second span will be sampled + for (int i = 0; i < 100; i++) { + auto result = sampler_->shouldSample({}, std::to_string(1000 + i), "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_1, {}); + result.isSampled() ? sampled++ : ignored++; + } + // should be 50, but the used "random" in shouldSample does not produce the same odd/even numbers. + EXPECT_EQ(ignored, 41); + EXPECT_EQ(sampled, 158); + + for (int i = 0; i < 100; i++) { + auto result = sampler_->shouldSample({}, std::to_string(1000 + i), "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_1, {}); + result.isSampled() ? sampled++ : ignored++; + } + EXPECT_EQ(ignored, 113); + EXPECT_EQ(sampled, 186); + + for (int i = 0; i < 700; i++) { + auto result = sampler_->shouldSample({}, std::to_string(1000 + i), "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_1, {}); + result.isSampled() ? sampled++ : ignored++; + } + EXPECT_EQ(ignored, 791); + EXPECT_EQ(sampled, 208); +} + +TEST_F(DynatraceSamplerTest, TestSampling) { + // config should allow 200 root spans per minute + sampler_config_.parse("{\n \"rootSpansPerMinute\" : 200 \n }"); + + Tracing::TestTraceContextImpl trace_context_1{}; + trace_context_1.context_method_ = "GET"; + trace_context_1.context_path_ = "/path"; + Tracing::TestTraceContextImpl trace_context_2{}; + trace_context_2.context_method_ = "POST"; + trace_context_2.context_path_ = "/path"; + Tracing::TestTraceContextImpl trace_context_3{}; + trace_context_3.context_method_ = "POST"; + trace_context_3.context_path_ = "/another_path"; + + // send requests + for (int i = 0; i < 180; i++) { + sampler_->shouldSample({}, trace_id, "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_1, {}); + sampler_->shouldSample({}, trace_id, "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_2, {}); + } + + sampler_->shouldSample({}, trace_id, "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, trace_context_3, + {}); + + // sampler should read update sampling exponents + timer_->invokeCallback(); + + // the sampler should not sample every span for 'trace_context_1' + // we call it again 10 times. This should be enough to get at least one ignored span + // 'i' is used as 'random trace_id' + bool ignored = false; + for (int i = 0; i < 10; i++) { + auto result = sampler_->shouldSample({}, std::to_string(i), "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_1, {}); + if (!result.isSampled()) { + ignored = true; + break; + } + } + EXPECT_TRUE(ignored); + + // trace_context_3 should be sampled + for (int i = 0; i < 10; i++) { + auto result = sampler_->shouldSample({}, std::to_string(i), "operation_name", + ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER, + trace_context_2, {}); + EXPECT_TRUE(result.isSampled()); + } +} + } // namespace OpenTelemetry } // namespace Tracers } // namespace Extensions diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher_test.cc b/test/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher_test.cc index 709f42624da28..04650a62040ce 100644 --- a/test/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher_test.cc +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher_test.cc @@ -26,7 +26,7 @@ using testing::ReturnRef; class SamplerConfigFetcherTest : public testing::Test { public: SamplerConfigFetcherTest() - : request_(&tracerFactoryContext_.server_factory_context_.cluster_manager_ + : request_(&tracer_factory_context_.server_factory_context_.cluster_manager_ .thread_local_cluster_.async_client_) { const std::string yaml_string = R"EOF( cluster: "cluster_name" @@ -35,18 +35,18 @@ class SamplerConfigFetcherTest : public testing::Test { )EOF"; TestUtility::loadFromYaml(yaml_string, http_uri_); - ON_CALL(tracerFactoryContext_.server_factory_context_.cluster_manager_, + ON_CALL(tracer_factory_context_.server_factory_context_.cluster_manager_, getThreadLocalCluster(_)) - .WillByDefault(Return( - &tracerFactoryContext_.server_factory_context_.cluster_manager_.thread_local_cluster_)); - timer_ = - new NiceMock(&tracerFactoryContext_.server_factory_context_.dispatcher_); - ON_CALL(tracerFactoryContext_.server_factory_context_.dispatcher_, createTimer_(_)) + .WillByDefault(Return(&tracer_factory_context_.server_factory_context_.cluster_manager_ + .thread_local_cluster_)); + timer_ = new NiceMock( + &tracer_factory_context_.server_factory_context_.dispatcher_); + ON_CALL(tracer_factory_context_.server_factory_context_.dispatcher_, createTimer_(_)) .WillByDefault(Invoke([this](Event::TimerCb) { return timer_; })); } protected: - NiceMock tracerFactoryContext_; + NiceMock tracer_factory_context_; envoy::config::core::v3::HttpUri http_uri_; NiceMock* timer_; Http::MockAsyncClientRequest request_; @@ -65,64 +65,83 @@ MATCHER_P(MessageMatcher, unusedArg, "") { // Test a request is sent if timer fires TEST_F(SamplerConfigFetcherTest, TestRequestIsSent) { - EXPECT_CALL(tracerFactoryContext_.server_factory_context_.cluster_manager_.thread_local_cluster_ + EXPECT_CALL(tracer_factory_context_.server_factory_context_.cluster_manager_.thread_local_cluster_ .async_client_, send_(MessageMatcher("unused-but-machtes-requires-an-arg"), _, _)); - SamplerConfigFetcher configFetcher(tracerFactoryContext_, http_uri_, "tokenval"); + SamplerConfigFetcherImpl config_fetcher(tracer_factory_context_, http_uri_, "tokenval"); timer_->invokeCallback(); } // Test receiving a response with code 200 and valid json TEST_F(SamplerConfigFetcherTest, TestResponseOk) { - SamplerConfigFetcher configFetcher(tracerFactoryContext_, http_uri_, "tokenXASSD"); + SamplerConfigFetcherImpl config_fetcher(tracer_factory_context_, http_uri_, "tokenXASSD"); timer_->invokeCallback(); Http::ResponseMessagePtr message(new Http::ResponseMessageImpl( Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}})); message->body().add("{\n \"rootSpansPerMinute\" : 4356 \n }"); - configFetcher.onSuccess(request_, std::move(message)); - EXPECT_EQ(configFetcher.getSamplerConfig().getRootSpansPerMinute(), 4356); + config_fetcher.onSuccess(request_, std::move(message)); + EXPECT_EQ(config_fetcher.getSamplerConfig().getRootSpansPerMinute(), 4356); EXPECT_TRUE(timer_->enabled()); } // Test receiving a response with code 200 and unexpected json TEST_F(SamplerConfigFetcherTest, TestResponseOkInvalidJson) { - SamplerConfigFetcher configFetcher(tracerFactoryContext_, http_uri_, "tokenXASSD"); + SamplerConfigFetcherImpl config_fetcher(tracer_factory_context_, http_uri_, "tokenXASSD"); timer_->invokeCallback(); Http::ResponseMessagePtr message(new Http::ResponseMessageImpl( Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}})); message->body().add("{\n "); - configFetcher.onSuccess(request_, std::move(message)); - EXPECT_EQ(configFetcher.getSamplerConfig().getRootSpansPerMinute(), + config_fetcher.onSuccess(request_, std::move(message)); + EXPECT_EQ(config_fetcher.getSamplerConfig().getRootSpansPerMinute(), SamplerConfig::ROOT_SPANS_PER_MINUTE_DEFAULT); EXPECT_TRUE(timer_->enabled()); } // Test receiving a response with code != 200 TEST_F(SamplerConfigFetcherTest, TestResponseErrorCode) { - SamplerConfigFetcher configFetcher(tracerFactoryContext_, http_uri_, "tokenXASSD"); + SamplerConfigFetcherImpl config_fetcher(tracer_factory_context_, http_uri_, "tokenXASSD"); timer_->invokeCallback(); Http::ResponseMessagePtr message(new Http::ResponseMessageImpl( Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "401"}}})); message->body().add("{\n \"rootSpansPerMinute\" : 4356 \n }"); - configFetcher.onSuccess(request_, std::move(message)); - EXPECT_EQ(configFetcher.getSamplerConfig().getRootSpansPerMinute(), + config_fetcher.onSuccess(request_, std::move(message)); + EXPECT_EQ(config_fetcher.getSamplerConfig().getRootSpansPerMinute(), SamplerConfig::ROOT_SPANS_PER_MINUTE_DEFAULT); EXPECT_TRUE(timer_->enabled()); } // Test sending failed TEST_F(SamplerConfigFetcherTest, TestOnFailure) { - SamplerConfigFetcher configFetcher(tracerFactoryContext_, http_uri_, "tokenXASSD"); + SamplerConfigFetcherImpl config_fetcher(tracer_factory_context_, http_uri_, "tokenXASSD"); timer_->invokeCallback(); - configFetcher.onFailure(request_, Http::AsyncClient::FailureReason::Reset); - EXPECT_EQ(configFetcher.getSamplerConfig().getRootSpansPerMinute(), + config_fetcher.onFailure(request_, Http::AsyncClient::FailureReason::Reset); + EXPECT_EQ(config_fetcher.getSamplerConfig().getRootSpansPerMinute(), SamplerConfig::ROOT_SPANS_PER_MINUTE_DEFAULT); EXPECT_TRUE(timer_->enabled()); } +// Test calling onBeforeFinalizeUpstreamSpan +TEST_F(SamplerConfigFetcherTest, TestOnBeforeFinalizeUpstreamSpan) { + Tracing::MockSpan child_span_; + SamplerConfigFetcherImpl config_fetcher(tracer_factory_context_, http_uri_, "tokenXASSD"); + // onBeforeFinalizeUpstreamSpan() is an empty method, nothing should happen + config_fetcher.onBeforeFinalizeUpstreamSpan(child_span_, nullptr); +} + +// Test invoking the timer if no cluster can be found +TEST_F(SamplerConfigFetcherTest, TestNoCluster) { + // simulate no configured cluster, return nullptr. + ON_CALL(tracer_factory_context_.server_factory_context_.cluster_manager_, + getThreadLocalCluster(_)) + .WillByDefault(Return(nullptr)); + SamplerConfigFetcherImpl config_fetcher(tracer_factory_context_, http_uri_, "tokenXASSD"); + timer_->invokeCallback(); + // should not crash or throw. +} + } // namespace OpenTelemetry } // namespace Tracers } // namespace Extensions diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller_test.cc b/test/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller_test.cc new file mode 100644 index 0000000000000..12668ee59a6ba --- /dev/null +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller_test.cc @@ -0,0 +1,262 @@ +#include +#include +#include +#include +#include +#include + +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/dynatrace_sampler.h" +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +namespace { + +void offerEntry(SamplingController& sc, const std::string& value, int count) { + for (int i = 0; i < count; i++) { + sc.offer(value); + } +} + +} // namespace + +class TestSamplerConfigFetcher : public SamplerConfigFetcher { +public: + const SamplerConfig& getSamplerConfig() const { return config; } + SamplerConfig config; +}; + +class SamplingControllerTest : public testing::Test {}; + +TEST_F(SamplingControllerTest, TestManyDifferentRequests) { + auto scf = std::make_unique(); + SamplingController sc(std::move(scf)); + + offerEntry(sc, "1", 2000); + offerEntry(sc, "2", 1000); + offerEntry(sc, "3", 750); + offerEntry(sc, "4", 100); + offerEntry(sc, "5", 50); + for (int64_t i = 0; i < 2100; i++) { + sc.offer(std::to_string(i + 1000000)); + } + + sc.update(); + + EXPECT_EQ(sc.getEffectiveCount(), 1110); + EXPECT_EQ(sc.getSamplingState("1").getMultiplicity(), 128); + EXPECT_EQ(sc.getSamplingState("2").getMultiplicity(), 64); + EXPECT_EQ(sc.getSamplingState("3").getMultiplicity(), 64); + EXPECT_EQ(sc.getSamplingState("4").getMultiplicity(), 8); + EXPECT_EQ(sc.getSamplingState("5").getMultiplicity(), 4); + EXPECT_EQ(sc.getSamplingState("1000000").getMultiplicity(), 2); + EXPECT_EQ(sc.getSamplingState("1000001").getMultiplicity(), 2); + EXPECT_EQ(sc.getSamplingState("1000002").getMultiplicity(), 2); +} + +TEST_F(SamplingControllerTest, TestManyRequests) { + auto scf = std::make_unique(); + SamplingController sc(std::move(scf)); + + offerEntry(sc, "1", 8600); + offerEntry(sc, "2", 5000); + offerEntry(sc, "3", 4000); + offerEntry(sc, "4", 4000); + offerEntry(sc, "5", 3000); + offerEntry(sc, "6", 30); + offerEntry(sc, "7", 3); + offerEntry(sc, "8", 1); + + sc.update(); + + EXPECT_EQ(sc.getEffectiveCount(), 1074); + EXPECT_EQ(sc.getSamplingState("1").getMultiplicity(), 64); + EXPECT_EQ(sc.getSamplingState("2").getMultiplicity(), 32); + EXPECT_EQ(sc.getSamplingState("3").getMultiplicity(), 32); + EXPECT_EQ(sc.getSamplingState("4").getMultiplicity(), 16); + EXPECT_EQ(sc.getSamplingState("5").getMultiplicity(), 8); + EXPECT_EQ(sc.getSamplingState("6").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("7").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("8").getMultiplicity(), 1); +} + +TEST_F(SamplingControllerTest, TestSomeRequests) { + auto scf = std::make_unique(); + SamplingController sc(std::move(scf)); + + offerEntry(sc, "1", 7500); + offerEntry(sc, "2", 1000); + offerEntry(sc, "3", 1); + offerEntry(sc, "4", 1); + offerEntry(sc, "5", 1); + for (int64_t i = 0; i < 11; i++) { + sc.offer(std::to_string(i + 1000000)); + } + + sc.update(); + + EXPECT_EQ(sc.getEffectiveCount(), 1451); + EXPECT_EQ(sc.getSamplingState("1").getMultiplicity(), 8); + EXPECT_EQ(sc.getSamplingState("2").getMultiplicity(), 2); + EXPECT_EQ(sc.getSamplingState("3").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("4").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("5").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("1000000").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("1000001").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("1000002").getMultiplicity(), 1); + EXPECT_EQ(sc.getSamplingState("1000003").getMultiplicity(), 1); +} + +TEST_F(SamplingControllerTest, TestSimple) { + auto scf = std::make_unique(); + scf->config.parse("{\n \"rootSpansPerMinute\" : 100 \n }"); + SamplingController sc(std::move(scf)); + + offerEntry(sc, "GET_xxxx", 300); + offerEntry(sc, "POST_asdf", 200); + offerEntry(sc, "GET_asdf", 100); + + sc.update(); + + EXPECT_EQ(sc.getSamplingState("GET_xxxx").getExponent(), 3); + EXPECT_EQ(sc.getSamplingState("GET_xxxx").getMultiplicity(), 8); + + EXPECT_EQ(sc.getSamplingState("POST_asdf").getExponent(), 2); + EXPECT_EQ(sc.getSamplingState("POST_asdf").getMultiplicity(), 4); + + EXPECT_EQ(sc.getSamplingState("GET_asdf").getExponent(), 1); + EXPECT_EQ(sc.getSamplingState("GET_asdf").getMultiplicity(), 2); +} + +TEST_F(SamplingControllerTest, TestWarmup) { + auto scf = std::make_unique(); + SamplingController sc(std::move(scf)); + + // offer entries, but don't call update(); + // sampling exponents table will be empty + // exponent will be calculated based on count. + offerEntry(sc, "GET_0", 10); + EXPECT_EQ(sc.getSamplingState("GET_1").getExponent(), 0); + EXPECT_EQ(sc.getSamplingState("GET_2").getExponent(), 0); + EXPECT_EQ(sc.getSamplingState("GET_3").getExponent(), 0); + + offerEntry(sc, "GET_1", 540); + EXPECT_EQ(sc.getSamplingState("GET_1").getExponent(), 1); + EXPECT_EQ(sc.getSamplingState("GET_2").getExponent(), 1); + EXPECT_EQ(sc.getSamplingState("GET_3").getExponent(), 1); + + offerEntry(sc, "GET_0", 300); + EXPECT_EQ(sc.getSamplingState("GET_0").getExponent(), 1); + EXPECT_EQ(sc.getSamplingState("GET_1").getExponent(), 1); + EXPECT_EQ(sc.getSamplingState("GET_10").getExponent(), 1); + + offerEntry(sc, "GET_4", 550); + EXPECT_EQ(sc.getSamplingState("GET_1").getExponent(), 2); + EXPECT_EQ(sc.getSamplingState("GET_2").getExponent(), 2); + EXPECT_EQ(sc.getSamplingState("GET_3").getExponent(), 2); + + offerEntry(sc, "GET_5", 1000); + EXPECT_EQ(sc.getSamplingState("GET_1").getExponent(), 4); + EXPECT_EQ(sc.getSamplingState("GET_2").getExponent(), 4); + EXPECT_EQ(sc.getSamplingState("GET_3").getExponent(), 4); + + offerEntry(sc, "GET_7", 2000); + EXPECT_EQ(sc.getSamplingState("GET_1").getExponent(), 8); + EXPECT_EQ(sc.getSamplingState("GET_2").getExponent(), 8); + EXPECT_EQ(sc.getSamplingState("GET_3").getExponent(), 8); +} + +TEST_F(SamplingControllerTest, TestEmpty) { + auto scf = std::make_unique(); + SamplingController sc(std::move(scf)); + + sc.update(); + + EXPECT_EQ(sc.getSamplingState("GET_something").getExponent(), 0); + EXPECT_EQ(sc.getSamplingState("GET_something").getMultiplicity(), 1); +} + +TEST_F(SamplingControllerTest, TestNonExisting) { + auto scf = std::make_unique(); + SamplingController sc(std::move(scf)); + + sc.offer("key1"); + sc.update(); + + EXPECT_EQ(sc.getSamplingState("key2").getExponent(), 0); + EXPECT_EQ(sc.getSamplingState("key2").getMultiplicity(), 1); +} + +TEST(SamplingStateTest, TestIncreaseDecrease) { + SamplingState sst{}; + EXPECT_EQ(sst.getExponent(), 0); + EXPECT_EQ(sst.getMultiplicity(), 1); + + sst.increaseExponent(); + EXPECT_EQ(sst.getExponent(), 1); + EXPECT_EQ(sst.getMultiplicity(), 2); + + sst.increaseExponent(); + EXPECT_EQ(sst.getExponent(), 2); + EXPECT_EQ(sst.getMultiplicity(), 4); + + for (int i = 0; i < 6; i++) { + sst.increaseExponent(); + } + EXPECT_EQ(sst.getExponent(), 8); + EXPECT_EQ(sst.getMultiplicity(), 256); + + sst.decreaseExponent(); + EXPECT_EQ(sst.getExponent(), 7); + EXPECT_EQ(sst.getMultiplicity(), 128); +} + +TEST(SamplingStateTest, TestShouldSample) { + // default sampling state should sample + SamplingState sst{}; + EXPECT_TRUE(sst.shouldSample(1234)); + EXPECT_TRUE(sst.shouldSample(3456)); + EXPECT_TRUE(sst.shouldSample(12345)); + + // exponent 2, multiplicity 1, even (=not odd) random numbers should be sampled + sst.increaseExponent(); + EXPECT_TRUE(sst.shouldSample(22)); + EXPECT_TRUE(sst.shouldSample(4444444)); + EXPECT_FALSE(sst.shouldSample(21)); + EXPECT_FALSE(sst.shouldSample(111111)); + + for (int i = 0; i < 9; i++) { + sst.increaseExponent(); + } + // exponent 10, multiplicity 1024, + EXPECT_TRUE(sst.shouldSample(1024)); + EXPECT_TRUE(sst.shouldSample(2048)); + EXPECT_TRUE(sst.shouldSample(4096)); + EXPECT_TRUE(sst.shouldSample(10240000000)); + EXPECT_FALSE(sst.shouldSample(1023)); + EXPECT_FALSE(sst.shouldSample(1025)); + EXPECT_FALSE(sst.shouldSample(2047)); + EXPECT_FALSE(sst.shouldSample(2049)); +} + +TEST_F(SamplingControllerTest, TestGetSamplingKey) { + std::string key = SamplingController::getSamplingKey("somepath", "GET"); + EXPECT_STREQ(key.c_str(), "GET_somepath"); + + key = SamplingController::getSamplingKey("somepath?withquery", "POST"); + EXPECT_STREQ(key.c_str(), "POST_somepath"); + + key = SamplingController::getSamplingKey("anotherpath", "PUT"); + EXPECT_STREQ(key.c_str(), "PUT_anotherpath"); +} + +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary_test.cc b/test/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary_test.cc new file mode 100644 index 0000000000000..e610bf10b6521 --- /dev/null +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary_test.cc @@ -0,0 +1,152 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +namespace { + +template +void CompareCounter(typename std::list>::iterator counter, T item, uint64_t value, + uint64_t error, int line_num) { + SCOPED_TRACE(absl::StrCat(__FUNCTION__, " called from line ", line_num)); + EXPECT_EQ(counter->getValue(), value); + EXPECT_EQ(counter->getItem(), item); + EXPECT_EQ(counter->getError(), error); +} + +} // namespace + +TEST(StreamSummaryTest, TestEmpty) { + StreamSummary summary(4); + EXPECT_EQ(summary.getN(), 0); + auto top_k = summary.getTopK(); + EXPECT_EQ(top_k.size(), 0); + EXPECT_EQ(top_k.begin(), top_k.end()); + EXPECT_TRUE(summary.validate().ok()); +} + +TEST(StreamSummaryTest, TestSimple) { + StreamSummary summary(4); + summary.offer('a'); + summary.offer('a'); + summary.offer('b'); + summary.offer('a'); + summary.offer('c'); + summary.offer('b'); + summary.offer('a'); + summary.offer('d'); + + EXPECT_TRUE(summary.validate().ok()); + EXPECT_EQ(summary.getN(), 8); + + auto top_k = summary.getTopK(); + EXPECT_EQ(top_k.size(), 4); + auto it = top_k.begin(); + CompareCounter(it, 'a', 4, 0, __LINE__); + CompareCounter(++it, 'b', 2, 0, __LINE__); + CompareCounter(++it, 'c', 1, 0, __LINE__); + CompareCounter(++it, 'd', 1, 0, __LINE__); +} + +// TODO: remove +template std::string toString(T const& list) { + std::ostringstream oss; + for (auto const& counter : list) { + oss << counter.getItem() << ":(" << counter.getValue() << "/" << counter.getError() << ")" + << " "; + } + return oss.str(); +} + +TEST(StreamSummaryTest, TestExtendCapacity) { + StreamSummary summary(3); + EXPECT_TRUE(summary.validate().ok()); + summary.offer('d'); + summary.offer('a'); + summary.offer('b'); + summary.offer('a'); + summary.offer('a'); + summary.offer('a'); + summary.offer('b'); + summary.offer('c'); + summary.offer('b'); + summary.offer('c'); + EXPECT_TRUE(summary.validate().ok()); + + { + auto top_k = summary.getTopK(); + auto it = top_k.begin(); + EXPECT_EQ(top_k.size(), 3); + CompareCounter(it, 'a', 4, 0, __LINE__); + CompareCounter(++it, 'b', 3, 0, __LINE__); + CompareCounter(++it, 'c', 3, 1, __LINE__); + } + + // add item 'e', 'c' should be removed. + summary.offer('e'); + { + auto top_k = summary.getTopK(); + auto it = top_k.begin(); + EXPECT_EQ(top_k.size(), 3); + CompareCounter(it, 'a', 4, 0, __LINE__); + CompareCounter(++it, 'e', 4, 3, __LINE__); + CompareCounter(++it, 'b', 3, 0, __LINE__); + } +} + +TEST(StreamSummaryTest, TestRandomInsertOrder) { + std::vector v{'a', 'a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b', + 'c', 'c', 'c', 'c', 'd', 'd', 'd', 'e', 'e', 'f'}; + for (int i = 0; i < 5; ++i) { + // insert order should not matter if all items have a different count in input stream + std::shuffle(v.begin(), v.end(), std::default_random_engine()); + StreamSummary summary(10); + for (auto const c : v) { + summary.offer(c); + } + auto top_k = summary.getTopK(); + auto it = top_k.begin(); + CompareCounter(it, 'a', 6, 0, __LINE__); + CompareCounter(++it, 'b', 5, 0, __LINE__); + CompareCounter(++it, 'c', 4, 0, __LINE__); + CompareCounter(++it, 'd', 3, 0, __LINE__); + CompareCounter(++it, 'e', 2, 0, __LINE__); + CompareCounter(++it, 'f', 1, 0, __LINE__); + } +} + +TEST(StreamSummaryTest, TestGetK) { + std::vector v{'a', 'a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b', + 'c', 'c', 'c', 'c', 'd', 'd', 'd', 'e', 'e', 'f'}; + std::shuffle(v.begin(), v.end(), std::default_random_engine()); + StreamSummary summary(20); + for (auto const c : v) { + summary.offer(c); + } + EXPECT_EQ(summary.getTopK().size(), 6); + EXPECT_EQ(summary.getTopK(1).size(), 1); + EXPECT_EQ(summary.getTopK(2).size(), 2); + EXPECT_EQ(summary.getTopK(3).size(), 3); + EXPECT_EQ(summary.getTopK(4).size(), 4); + EXPECT_EQ(summary.getTopK(5).size(), 5); + EXPECT_EQ(summary.getTopK(6).size(), 6); + EXPECT_EQ(summary.getTopK(7).size(), 6); +} + +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id_test.cc b/test/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id_test.cc new file mode 100644 index 0000000000000..244e949b06ef5 --- /dev/null +++ b/test/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id_test.cc @@ -0,0 +1,30 @@ +#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id.h" + +#include "absl/strings/str_cat.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace Tracers { +namespace OpenTelemetry { + +// Test calculation using an empty string +TEST(TenantIdTest, testEmpty) { EXPECT_EQ(absl::StrCat(calculateTenantId("")), "0"); } + +// Test calculation using strings with whitespace +TEST(TenantIdTest, testWhitespace) { + EXPECT_EQ(absl::StrCat(calculateTenantId("abc 1234")), "182ccac"); + EXPECT_EQ(absl::StrCat(calculateTenantId(" ")), "b173ef2e"); +} + +// Test calculation using some expected strings +TEST(TenantIdTest, testValues) { + EXPECT_EQ(absl::StrCat(calculateTenantId("jmw13303")), "4d10bede"); + EXPECT_EQ(absl::StrCat(calculateTenantId("abc12345")), "5b3f9fed"); + EXPECT_EQ(absl::StrCat(calculateTenantId("?pfel")), "7712d29d"); +} + +} // namespace OpenTelemetry +} // namespace Tracers +} // namespace Extensions +} // namespace Envoy