diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index d86148d83ac8d..7acd462ce677a 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -60,13 +60,13 @@ void Utility::translateApiConfigSource( } void Utility::checkCluster(absl::string_view error_prefix, absl::string_view cluster_name, - Upstream::ClusterManager& cm) { + Upstream::ClusterManager& cm, bool allow_added_via_api) { Upstream::ThreadLocalCluster* cluster = cm.get(cluster_name); if (cluster == nullptr) { throw EnvoyException(fmt::format("{}: unknown cluster '{}'", error_prefix, cluster_name)); } - if (cluster->info()->addedViaApi()) { + if (!allow_added_via_api && cluster->info()->addedViaApi()) { throw EnvoyException(fmt::format("{}: invalid cluster '{}': currently only " "static (non-CDS) clusters are supported", error_prefix, cluster_name)); diff --git a/source/common/config/utility.h b/source/common/config/utility.h index ce7b0040fd61e..7ad472d779fa9 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -114,9 +114,11 @@ class Utility { * @param error_prefix supplies the prefix to use in error messages. * @param cluster_name supplies the cluster name to check. * @param cm supplies the cluster manager. + * @param allow_added_via_api indicates whether a cluster is allowed to be added via api + * rather than be a static resource from the bootstrap config. */ static void checkCluster(absl::string_view error_prefix, absl::string_view cluster_name, - Upstream::ClusterManager& cm); + Upstream::ClusterManager& cm, bool allow_added_via_api = false); /** * Check cluster/local info for API config sanity. Throws on error. diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index ff34518138d56..5687eac985080 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -75,6 +75,15 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "cluster_update_tracker_lib", + srcs = ["cluster_update_tracker.cc"], + hdrs = ["cluster_update_tracker.h"], + deps = [ + "//include/envoy/upstream:cluster_manager_interface", + ], +) + envoy_cc_library( name = "conn_pool_map", hdrs = ["conn_pool_map.h"], diff --git a/source/common/upstream/cluster_update_tracker.cc b/source/common/upstream/cluster_update_tracker.cc new file mode 100644 index 0000000000000..42ee974ee0ae3 --- /dev/null +++ b/source/common/upstream/cluster_update_tracker.cc @@ -0,0 +1,28 @@ +#include "common/upstream/cluster_update_tracker.h" + +namespace Envoy { +namespace Upstream { + +ClusterUpdateTracker::ClusterUpdateTracker(ClusterManager& cm, const std::string& cluster_name) + : cluster_name_(cluster_name), + cluster_update_callbacks_handle_(cm.addThreadLocalClusterUpdateCallbacks(*this)) { + Upstream::ThreadLocalCluster* cluster = cm.get(cluster_name_); + cluster_info_ = cluster ? cluster->info() : nullptr; +} + +void ClusterUpdateTracker::onClusterAddOrUpdate(ThreadLocalCluster& cluster) { + if (cluster.info()->name() != cluster_name_) { + return; + } + cluster_info_ = cluster.info(); +} + +void ClusterUpdateTracker::onClusterRemoval(const std::string& cluster) { + if (cluster != cluster_name_) { + return; + } + cluster_info_.reset(); +} + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/cluster_update_tracker.h b/source/common/upstream/cluster_update_tracker.h new file mode 100644 index 0000000000000..b55d1b0d54831 --- /dev/null +++ b/source/common/upstream/cluster_update_tracker.h @@ -0,0 +1,33 @@ +#pragma once + +#include "envoy/upstream/cluster_manager.h" + +namespace Envoy { +namespace Upstream { + +/** + * Keeps track of cluster updates in order to spot addition and removal. + * + * Use this class as a performance optimization to avoid going through ClusterManager::get() + * on the hot path. + */ +class ClusterUpdateTracker : public ClusterUpdateCallbacks { +public: + ClusterUpdateTracker(ClusterManager& cm, const std::string& cluster_name); + + bool exists() { return cluster_info_ != nullptr; } + ClusterInfoConstSharedPtr info() { return cluster_info_; } + + // ClusterUpdateCallbacks + void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override; + void onClusterRemoval(const std::string& cluster) override; + +private: + const std::string cluster_name_; + const ClusterUpdateCallbacksHandlePtr cluster_update_callbacks_handle_; + + ClusterInfoConstSharedPtr cluster_info_; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/source/extensions/tracers/datadog/BUILD b/source/extensions/tracers/datadog/BUILD index 0402595135267..ea38e6dfc7783 100644 --- a/source/extensions/tracers/datadog/BUILD +++ b/source/extensions/tracers/datadog/BUILD @@ -24,6 +24,7 @@ envoy_cc_library( "//source/common/config:utility_lib", "//source/common/http:async_client_utility_lib", "//source/common/tracing:http_tracer_lib", + "//source/common/upstream:cluster_update_tracker_lib", "//source/extensions/tracers:well_known_names", "//source/extensions/tracers/common/ot:opentracing_driver_lib", "@envoy_api//envoy/config/trace/v3:pkg_cc_proto", diff --git a/source/extensions/tracers/datadog/datadog_tracer_impl.cc b/source/extensions/tracers/datadog/datadog_tracer_impl.cc index c3d659bc121ff..4b529d5bf4595 100644 --- a/source/extensions/tracers/datadog/datadog_tracer_impl.cc +++ b/source/extensions/tracers/datadog/datadog_tracer_impl.cc @@ -29,9 +29,9 @@ Driver::Driver(const envoy::config::trace::v3::DatadogConfig& datadog_config, POOL_COUNTER_PREFIX(scope, "tracing.datadog."))}, tls_(tls.allocateSlot()), runtime_(runtime) { - Config::Utility::checkCluster(TracerNames::get().Datadog, datadog_config.collector_cluster(), - cm_); - cluster_ = cm_.get(datadog_config.collector_cluster())->info(); + Config::Utility::checkCluster(TracerNames::get().Datadog, datadog_config.collector_cluster(), cm_, + /* allow_added_via_api */ true); + cluster_ = datadog_config.collector_cluster(); // Default tracer options. tracer_options_.operation_name_override = "envoy.proxy"; @@ -60,7 +60,8 @@ opentracing::Tracer& Driver::tracer() { return *tls_->getTyped().trac TraceReporter::TraceReporter(TraceEncoderSharedPtr encoder, Driver& driver, Event::Dispatcher& dispatcher) - : driver_(driver), encoder_(encoder) { + : driver_(driver), encoder_(encoder), + collector_cluster_(driver_.clusterManager(), driver_.cluster()) { flush_timer_ = dispatcher.createTimer([this]() -> void { for (auto& h : encoder_->headers()) { lower_case_headers_.emplace(h.first, Http::LowerCaseString{h.first}); @@ -89,7 +90,7 @@ void TraceReporter::flushTraces() { Http::RequestMessagePtr message(new Http::RequestMessageImpl()); message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post); message->headers().setReferencePath(encoder_->path()); - message->headers().setReferenceHost(driver_.cluster()->name()); + message->headers().setReferenceHost(driver_.cluster()); for (auto& h : encoder_->headers()) { message->headers().setReferenceKey(lower_case_headers_.at(h.first), h.second); } @@ -100,13 +101,19 @@ void TraceReporter::flushTraces() { ENVOY_LOG(debug, "submitting {} trace(s) to {} with payload size {}", pendingTraces, encoder_->path(), encoder_->payload().size()); - Http::AsyncClient::Request* request = - driver_.clusterManager() - .httpAsyncClientForCluster(driver_.cluster()->name()) - .send(std::move(message), *this, + if (collector_cluster_.exists()) { + Http::AsyncClient::Request* request = + driver_.clusterManager() + .httpAsyncClientForCluster(collector_cluster_.info()->name()) + .send( + std::move(message), *this, Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(1000U))); - if (request) { - active_requests_.add(*request); + if (request) { + active_requests_.add(*request); + } + } else { + ENVOY_LOG(debug, "collector cluster '{}' does not exist", driver_.cluster()); + driver_.tracerStats().reports_skipped_no_cluster_.inc(); } encoder_->clearTraces(); diff --git a/source/extensions/tracers/datadog/datadog_tracer_impl.h b/source/extensions/tracers/datadog/datadog_tracer_impl.h index 8a6f9d382770f..774e34665a85d 100644 --- a/source/extensions/tracers/datadog/datadog_tracer_impl.h +++ b/source/extensions/tracers/datadog/datadog_tracer_impl.h @@ -12,6 +12,7 @@ #include "common/http/async_client_utility.h" #include "common/http/header_map_impl.h" #include "common/json/json_loader.h" +#include "common/upstream/cluster_update_tracker.h" #include "extensions/tracers/common/ot/opentracing_driver_impl.h" @@ -23,6 +24,7 @@ namespace Datadog { #define DATADOG_TRACER_STATS(COUNTER) \ COUNTER(traces_sent) \ COUNTER(timer_flushed) \ + COUNTER(reports_skipped_no_cluster) \ COUNTER(reports_sent) \ COUNTER(reports_dropped) \ COUNTER(reports_failed) @@ -49,7 +51,7 @@ class Driver : public Common::Ot::OpenTracingDriver { // Getters to return the DatadogDriver's key members. Upstream::ClusterManager& clusterManager() { return cm_; } - Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; } + const std::string& cluster() { return cluster_; } Runtime::Loader& runtime() { return runtime_; } DatadogTracerStats& tracerStats() { return tracer_stats_; } const datadog::opentracing::TracerOptions& tracerOptions() { return tracer_options_; } @@ -74,7 +76,7 @@ class Driver : public Common::Ot::OpenTracingDriver { }; Upstream::ClusterManager& cm_; - Upstream::ClusterInfoConstSharedPtr cluster_; + std::string cluster_; DatadogTracerStats tracer_stats_; datadog::opentracing::TracerOptions tracer_options_; ThreadLocal::SlotPtr tls_; @@ -129,6 +131,7 @@ class TraceReporter : public Http::AsyncClient::Callbacks, std::map lower_case_headers_; + Upstream::ClusterUpdateTracker collector_cluster_; // Track active HTTP requests to be able to cancel them on destruction. Http::AsyncClientRequestTracker active_requests_; }; diff --git a/source/extensions/tracers/lightstep/BUILD b/source/extensions/tracers/lightstep/BUILD index a92a3a6f5ddde..a72d39b37376a 100644 --- a/source/extensions/tracers/lightstep/BUILD +++ b/source/extensions/tracers/lightstep/BUILD @@ -25,6 +25,7 @@ envoy_cc_library( "//source/common/grpc:context_lib", "//source/common/stats:symbol_table_lib", "//source/common/tracing:http_tracer_lib", + "//source/common/upstream:cluster_update_tracker_lib", "//source/extensions/tracers:well_known_names", "//source/extensions/tracers/common/ot:opentracing_driver_lib", "@envoy_api//envoy/config/trace/v3:pkg_cc_proto", diff --git a/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc b/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc index fe4ff0cc5013d..baabe750c60cb 100644 --- a/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc +++ b/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc @@ -60,7 +60,7 @@ void LightStepLogger::operator()(lightstep::LogLevel level, const size_t LightStepDriver::DefaultMinFlushSpans = 200U; LightStepDriver::LightStepTransporter::LightStepTransporter(LightStepDriver& driver) - : driver_(driver) {} + : driver_(driver), collector_cluster_(driver_.clusterManager(), driver_.cluster()) {} LightStepDriver::LightStepTransporter::~LightStepTransporter() { if (active_request_ != nullptr) { @@ -70,14 +70,14 @@ LightStepDriver::LightStepTransporter::~LightStepTransporter() { void LightStepDriver::LightStepTransporter::onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& /*response*/) { - driver_.grpc_context_.chargeStat(*driver_.cluster(), driver_.request_stat_names_, true); + driver_.grpc_context_.chargeStat(*active_cluster_, driver_.request_stat_names_, true); active_callback_->OnSuccess(*active_report_); reset(); } void LightStepDriver::LightStepTransporter::onFailure( const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason /*failure_reason*/) { - driver_.grpc_context_.chargeStat(*driver_.cluster(), driver_.request_stat_names_, false); + driver_.grpc_context_.chargeStat(*active_cluster_, driver_.request_stat_names_, false); active_callback_->OnFailure(*active_report_); reset(); } @@ -95,24 +95,31 @@ void LightStepDriver::LightStepTransporter::Send(std::unique_ptrname(), lightstep::CollectorServiceFullName(), - lightstep::CollectorMethodName(), absl::optional(timeout)); - message->body() = serializeGrpcMessage(*active_report_); - - active_request_ = - driver_.clusterManager() - .httpAsyncClientForCluster(driver_.cluster()->name()) - .send(std::move(message), *this, - Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(timeout))); + driver_.cluster(), lightstep::CollectorServiceFullName(), lightstep::CollectorMethodName(), + absl::optional(timeout)); + message->body() = serializeGrpcMessage(*report); + + if (collector_cluster_.exists()) { + active_report_ = std::move(report); + active_callback_ = &callback; + active_cluster_ = collector_cluster_.info(); + active_request_ = driver_.clusterManager() + .httpAsyncClientForCluster(collector_cluster_.info()->name()) + .send(std::move(message), *this, + Http::AsyncClient::RequestOptions().setTimeout( + std::chrono::milliseconds(timeout))); + } else { + ENVOY_LOG(debug, "collector cluster '{}' does not exist", driver_.cluster()); + driver_.tracerStats().reports_skipped_no_cluster_.inc(); + } } void LightStepDriver::LightStepTransporter::reset() { + active_cluster_ = nullptr; active_request_ = nullptr; active_callback_ = nullptr; active_report_ = nullptr; @@ -164,12 +171,12 @@ LightStepDriver::LightStepDriver(const envoy::config::trace::v3::LightstepConfig pool_.add(lightstep::CollectorMethodName())} { Config::Utility::checkCluster(TracerNames::get().Lightstep, lightstep_config.collector_cluster(), - cm_); - cluster_ = cm_.get(lightstep_config.collector_cluster())->info(); + cm_, /* allow_added_via_api */ true); + cluster_ = lightstep_config.collector_cluster(); - if (!(cluster_->features() & Upstream::ClusterInfo::Features::HTTP2)) { + if (!(cm_.get(cluster_)->info()->features() & Upstream::ClusterInfo::Features::HTTP2)) { throw EnvoyException( - fmt::format("{} collector cluster must support http2 for gRPC calls", cluster_->name())); + fmt::format("{} collector cluster must support http2 for gRPC calls", cluster_)); } tls_->set([this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { diff --git a/source/extensions/tracers/lightstep/lightstep_tracer_impl.h b/source/extensions/tracers/lightstep/lightstep_tracer_impl.h index 629ecb8d16763..ee572f47f6eb6 100644 --- a/source/extensions/tracers/lightstep/lightstep_tracer_impl.h +++ b/source/extensions/tracers/lightstep/lightstep_tracer_impl.h @@ -17,6 +17,7 @@ #include "common/json/json_loader.h" #include "common/protobuf/protobuf.h" #include "common/stats/symbol_table_impl.h" +#include "common/upstream/cluster_update_tracker.h" #include "extensions/tracers/common/ot/opentracing_driver_impl.h" @@ -33,7 +34,8 @@ namespace Lightstep { #define LIGHTSTEP_TRACER_STATS(COUNTER) \ COUNTER(spans_sent) \ COUNTER(spans_dropped) \ - COUNTER(timer_flushed) + COUNTER(timer_flushed) \ + COUNTER(reports_skipped_no_cluster) struct LightstepTracerStats { LIGHTSTEP_TRACER_STATS(GENERATE_COUNTER_STRUCT) @@ -62,7 +64,7 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver { PropagationMode propagation_mode, Grpc::Context& grpc_context); Upstream::ClusterManager& clusterManager() { return cm_; } - Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; } + const std::string& cluster() { return cluster_; } Runtime::Loader& runtime() { return runtime_; } LightstepTracerStats& tracerStats() { return tracer_stats_; } @@ -75,7 +77,9 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver { PropagationMode propagationMode() const override { return propagation_mode_; } private: - class LightStepTransporter : public lightstep::AsyncTransporter, Http::AsyncClient::Callbacks { + class LightStepTransporter : Logger::Loggable, + public lightstep::AsyncTransporter, + public Http::AsyncClient::Callbacks { public: explicit LightStepTransporter(LightStepDriver& driver); @@ -95,8 +99,10 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver { private: std::unique_ptr active_report_; Callback* active_callback_ = nullptr; + Upstream::ClusterInfoConstSharedPtr active_cluster_; Http::AsyncClient::Request* active_request_ = nullptr; LightStepDriver& driver_; + Upstream::ClusterUpdateTracker collector_cluster_; void reset(); }; @@ -129,7 +135,7 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver { }; Upstream::ClusterManager& cm_; - Upstream::ClusterInfoConstSharedPtr cluster_; + std::string cluster_; LightstepTracerStats tracer_stats_; ThreadLocal::SlotPtr tls_; Runtime::Loader& runtime_; diff --git a/source/extensions/tracers/zipkin/BUILD b/source/extensions/tracers/zipkin/BUILD index a514c5bbd15e6..f2321bab87101 100644 --- a/source/extensions/tracers/zipkin/BUILD +++ b/source/extensions/tracers/zipkin/BUILD @@ -57,6 +57,7 @@ envoy_cc_library( "//source/common/network:address_lib", "//source/common/singleton:const_singleton", "//source/common/tracing:http_tracer_lib", + "//source/common/upstream:cluster_update_tracker_lib", "//source/extensions/tracers:well_known_names", "@com_github_openzipkin_zipkinapi//:zipkin_cc_proto", "@envoy_api//envoy/config/trace/v3:pkg_cc_proto", diff --git a/source/extensions/tracers/zipkin/zipkin_tracer_impl.cc b/source/extensions/tracers/zipkin/zipkin_tracer_impl.cc index da52eae352b51..6b0fc0240666d 100644 --- a/source/extensions/tracers/zipkin/zipkin_tracer_impl.cc +++ b/source/extensions/tracers/zipkin/zipkin_tracer_impl.cc @@ -74,8 +74,9 @@ Driver::Driver(const envoy::config::trace::v3::ZipkinConfig& zipkin_config, POOL_COUNTER_PREFIX(scope, "tracing.zipkin."))}, tls_(tls.allocateSlot()), runtime_(runtime), local_info_(local_info), time_source_(time_source) { - Config::Utility::checkCluster(TracerNames::get().Zipkin, zipkin_config.collector_cluster(), cm_); - cluster_ = cm_.get(zipkin_config.collector_cluster())->info(); + Config::Utility::checkCluster(TracerNames::get().Zipkin, zipkin_config.collector_cluster(), cm_, + /* allow_added_via_api */ true); + cluster_ = zipkin_config.collector_cluster(); CollectorInfo collector; if (!zipkin_config.collector_endpoint().empty()) { @@ -133,7 +134,8 @@ ReporterImpl::ReporterImpl(Driver& driver, Event::Dispatcher& dispatcher, const CollectorInfo& collector) : driver_(driver), collector_(collector), span_buffer_{std::make_unique( - collector.version_, collector.shared_span_context_)} { + collector.version_, collector.shared_span_context_)}, + collector_cluster_(driver_.clusterManager(), driver_.cluster()) { flush_timer_ = dispatcher.createTimer([this]() -> void { driver_.tracerStats().timer_flushed_.inc(); flushSpans(); @@ -176,7 +178,7 @@ void ReporterImpl::flushSpans() { Http::RequestMessagePtr message = std::make_unique(); message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post); message->headers().setPath(collector_.endpoint_); - message->headers().setHost(driver_.cluster()->name()); + message->headers().setHost(driver_.cluster()); message->headers().setReferenceContentType( collector_.version_ == envoy::config::trace::v3::ZipkinConfig::HTTP_PROTO ? Http::Headers::get().ContentTypeValues.Protobuf @@ -188,13 +190,20 @@ void ReporterImpl::flushSpans() { const uint64_t timeout = driver_.runtime().snapshot().getInteger("tracing.zipkin.request_timeout", 5000U); - Http::AsyncClient::Request* request = driver_.clusterManager() - .httpAsyncClientForCluster(driver_.cluster()->name()) - .send(std::move(message), *this, - Http::AsyncClient::RequestOptions().setTimeout( - std::chrono::milliseconds(timeout))); - if (request) { - active_requests_.add(*request); + + if (collector_cluster_.exists()) { + Http::AsyncClient::Request* request = + driver_.clusterManager() + .httpAsyncClientForCluster(collector_cluster_.info()->name()) + .send(std::move(message), *this, + Http::AsyncClient::RequestOptions().setTimeout( + std::chrono::milliseconds(timeout))); + if (request) { + active_requests_.add(*request); + } + } else { + ENVOY_LOG(debug, "collector cluster '{}' does not exist", driver_.cluster()); + driver_.tracerStats().reports_skipped_no_cluster_.inc(); } span_buffer_->clear(); diff --git a/source/extensions/tracers/zipkin/zipkin_tracer_impl.h b/source/extensions/tracers/zipkin/zipkin_tracer_impl.h index e09eeb2fe89f9..36866fd52b9e1 100644 --- a/source/extensions/tracers/zipkin/zipkin_tracer_impl.h +++ b/source/extensions/tracers/zipkin/zipkin_tracer_impl.h @@ -10,6 +10,7 @@ #include "common/http/async_client_utility.h" #include "common/http/header_map_impl.h" #include "common/json/json_loader.h" +#include "common/upstream/cluster_update_tracker.h" #include "extensions/tracers/zipkin/span_buffer.h" #include "extensions/tracers/zipkin/tracer.h" @@ -23,6 +24,7 @@ namespace Zipkin { #define ZIPKIN_TRACER_STATS(COUNTER) \ COUNTER(spans_sent) \ COUNTER(timer_flushed) \ + COUNTER(reports_skipped_no_cluster) \ COUNTER(reports_sent) \ COUNTER(reports_dropped) \ COUNTER(reports_failed) @@ -116,7 +118,7 @@ class Driver : public Tracing::Driver { // Getters to return the ZipkinDriver's key members. Upstream::ClusterManager& clusterManager() { return cm_; } - Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; } + const std::string& cluster() { return cluster_; } Runtime::Loader& runtime() { return runtime_; } ZipkinTracerStats& tracerStats() { return tracer_stats_; } @@ -132,7 +134,7 @@ class Driver : public Tracing::Driver { }; Upstream::ClusterManager& cm_; - Upstream::ClusterInfoConstSharedPtr cluster_; + std::string cluster_; ZipkinTracerStats tracer_stats_; ThreadLocal::SlotPtr tls_; Runtime::Loader& runtime_; @@ -171,7 +173,9 @@ struct CollectorInfo { * * The default values for the runtime parameters are 5 spans and 5000ms. */ -class ReporterImpl : public Reporter, Http::AsyncClient::Callbacks { +class ReporterImpl : Logger::Loggable, + public Reporter, + public Http::AsyncClient::Callbacks { public: /** * Constructor. @@ -227,6 +231,7 @@ class ReporterImpl : public Reporter, Http::AsyncClient::Callbacks { Event::TimerPtr flush_timer_; const CollectorInfo collector_; SpanBufferPtr span_buffer_; + Upstream::ClusterUpdateTracker collector_cluster_; // Track active HTTP requests to be able to cancel them on destruction. Http::AsyncClientRequestTracker active_requests_; }; diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index c53b8368e9ed9..e88728115d486 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -42,6 +42,15 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "cluster_update_tracker_test", + srcs = ["cluster_update_tracker_test.cc"], + deps = [ + "//source/common/upstream:cluster_update_tracker_lib", + "//test/mocks/upstream:upstream_mocks", + ], +) + envoy_cc_test( name = "conn_pool_map_impl_test", srcs = ["conn_pool_map_impl_test.cc"], diff --git a/test/common/upstream/cluster_update_tracker_test.cc b/test/common/upstream/cluster_update_tracker_test.cc new file mode 100644 index 0000000000000..3d3dc9c56e5bb --- /dev/null +++ b/test/common/upstream/cluster_update_tracker_test.cc @@ -0,0 +1,91 @@ +#include "common/upstream/cluster_update_tracker.h" + +#include "test/mocks/upstream/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::NiceMock; +using testing::Return; + +namespace Envoy { +namespace Upstream { +namespace { + +class ClusterUpdateTrackerTest : public testing::Test { +public: + ClusterUpdateTrackerTest() { + expected_.cluster_.info_->name_ = cluster_name_; + irrelevant_.cluster_.info_->name_ = "unrelated_cluster"; + } + + NiceMock cm_; + NiceMock expected_; + NiceMock irrelevant_; + const std::string cluster_name_{"fake_cluster"}; +}; + +TEST_F(ClusterUpdateTrackerTest, ClusterDoesNotExistAtConstructionTime) { + EXPECT_CALL(cm_, get(cluster_name_)).WillOnce(Return(nullptr)); + + ClusterUpdateTracker cluster_tracker(cm_, cluster_name_); + + EXPECT_FALSE(cluster_tracker.exists()); + EXPECT_EQ(cluster_tracker.info(), nullptr); +} + +TEST_F(ClusterUpdateTrackerTest, ClusterDoesExistAtConstructionTime) { + EXPECT_CALL(cm_, get(cluster_name_)).WillOnce(Return(&expected_)); + + ClusterUpdateTracker cluster_tracker(cm_, cluster_name_); + + EXPECT_TRUE(cluster_tracker.exists()); + EXPECT_EQ(cluster_tracker.info(), expected_.cluster_.info_); +} + +TEST_F(ClusterUpdateTrackerTest, ShouldProperlyHandleUpdateCallbacks) { + EXPECT_CALL(cm_, get(cluster_name_)).WillOnce(Return(nullptr)); + + ClusterUpdateTracker cluster_tracker(cm_, cluster_name_); + + { + EXPECT_FALSE(cluster_tracker.exists()); + EXPECT_EQ(cluster_tracker.info(), nullptr); + } + + { + // Simulate addition of an irrelevant cluster. + cluster_tracker.onClusterAddOrUpdate(irrelevant_); + + EXPECT_FALSE(cluster_tracker.exists()); + EXPECT_EQ(cluster_tracker.info(), nullptr); + } + + { + // Simulate addition of the relevant cluster. + cluster_tracker.onClusterAddOrUpdate(expected_); + + EXPECT_TRUE(cluster_tracker.exists()); + EXPECT_EQ(cluster_tracker.info(), expected_.cluster_.info_); + } + + { + // Simulate removal of an irrelevant cluster. + cluster_tracker.onClusterRemoval(irrelevant_.cluster_.info_->name_); + + EXPECT_TRUE(cluster_tracker.exists()); + EXPECT_EQ(cluster_tracker.info(), expected_.cluster_.info_); + } + + { + // Simulate removal of the relevant cluster. + cluster_tracker.onClusterRemoval(cluster_name_); + + EXPECT_FALSE(cluster_tracker.exists()); + EXPECT_EQ(cluster_tracker.info(), nullptr); + } +} + +} // namespace +} // namespace Upstream +} // namespace Envoy diff --git a/test/extensions/tracers/datadog/datadog_tracer_impl_test.cc b/test/extensions/tracers/datadog/datadog_tracer_impl_test.cc index e2f4c6734dc29..d60f1bdf0fa91 100644 --- a/test/extensions/tracers/datadog/datadog_tracer_impl_test.cc +++ b/test/extensions/tracers/datadog/datadog_tracer_impl_test.cc @@ -29,6 +29,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::AnyNumber; using testing::DoAll; using testing::Eq; using testing::Invoke; @@ -47,6 +48,7 @@ namespace { class DatadogDriverTest : public testing::Test { public: void setup(envoy::config::trace::v3::DatadogConfig& datadog_config, bool init_timer) { + cm_.thread_local_cluster_.cluster_.info_->name_ = "fake_cluster"; ON_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) .WillByDefault(ReturnRef(cm_.async_client_)); @@ -125,6 +127,21 @@ TEST_F(DatadogDriverTest, InitializeDriver) { } } +TEST_F(DatadogDriverTest, AllowCollectorClusterToBeAddedViaApi) { + EXPECT_CALL(cm_, get(Eq("fake_cluster"))).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()) + .WillByDefault(Return(Upstream::ClusterInfo::Features::HTTP2)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, addedViaApi()).WillByDefault(Return(true)); + + const std::string yaml_string = R"EOF( + collector_cluster: fake_cluster + )EOF"; + envoy::config::trace::v3::DatadogConfig datadog_config; + TestUtility::loadFromYaml(yaml_string, datadog_config); + + setup(datadog_config, true); +} + TEST_F(DatadogDriverTest, FlushSpansTimer) { setupValidDriver(); @@ -163,11 +180,139 @@ TEST_F(DatadogDriverTest, FlushSpansTimer) { callback->onSuccess(request, std::move(msg)); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_skipped_no_cluster").value()); EXPECT_EQ(1U, stats_.counter("tracing.datadog.reports_sent").value()); EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_dropped").value()); EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_failed").value()); } +TEST_F(DatadogDriverTest, SkipReportIfCollectorClusterHasBeenRemoved) { + Upstream::ClusterUpdateCallbacks* cluster_update_callbacks; + EXPECT_CALL(cm_, addThreadLocalClusterUpdateCallbacks_(_)) + .WillOnce(DoAll(SaveArgAddress(&cluster_update_callbacks), Return(nullptr))); + + setupValidDriver(); + + EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(900), _)).Times(AnyNumber()); + + // Verify the effect of onClusterAddOrUpdate()/onClusterRemoval() on reporting logic, + // keeping in mind that they will be called both for relevant and irrelevant clusters. + + { + // Simulate removal of the relevant cluster. + cluster_update_callbacks->onClusterRemoval("fake_cluster"); + + // Verify that no report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster(_)).Times(0); + EXPECT_CALL(cm_.async_client_, send_(_, _, _)).Times(0); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + timer_->invokeCallback(); + + // Verify observability. + EXPECT_EQ(1U, stats_.counter("tracing.datadog.timer_flushed").value()); + EXPECT_EQ(1U, stats_.counter("tracing.datadog.traces_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.datadog.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_dropped").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_failed").value()); + } + + { + // Simulate addition of an irrelevant cluster. + NiceMock unrelated_cluster; + unrelated_cluster.cluster_.info_->name_ = "unrelated_cluster"; + cluster_update_callbacks->onClusterAddOrUpdate(unrelated_cluster); + + // Verify that no report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster(_)).Times(0); + EXPECT_CALL(cm_.async_client_, send_(_, _, _)).Times(0); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + timer_->invokeCallback(); + + // Verify observability. + EXPECT_EQ(2U, stats_.counter("tracing.datadog.timer_flushed").value()); + EXPECT_EQ(2U, stats_.counter("tracing.datadog.traces_sent").value()); + EXPECT_EQ(2U, stats_.counter("tracing.datadog.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_dropped").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_failed").value()); + } + + { + // Simulate addition of the relevant cluster. + cluster_update_callbacks->onClusterAddOrUpdate(cm_.thread_local_cluster_); + + // Verify that report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback{}; + EXPECT_CALL(cm_.async_client_, send_(_, _, _)) + .WillOnce(DoAll(WithArg<1>(SaveArgAddress(&callback)), Return(&request))); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + timer_->invokeCallback(); + + // Complete in-flight request. + callback->onFailure(request, Http::AsyncClient::FailureReason::Reset); + + // Verify observability. + EXPECT_EQ(3U, stats_.counter("tracing.datadog.timer_flushed").value()); + EXPECT_EQ(3U, stats_.counter("tracing.datadog.traces_sent").value()); + EXPECT_EQ(2U, stats_.counter("tracing.datadog.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_dropped").value()); + EXPECT_EQ(1U, stats_.counter("tracing.datadog.reports_failed").value()); + } + + { + // Simulate removal of an irrelevant cluster. + cluster_update_callbacks->onClusterRemoval("unrelated_cluster"); + + // Verify that report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback{}; + EXPECT_CALL(cm_.async_client_, send_(_, _, _)) + .WillOnce(DoAll(WithArg<1>(SaveArgAddress(&callback)), Return(&request))); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + timer_->invokeCallback(); + + // Complete in-flight request. + Http::ResponseMessagePtr msg(new Http::ResponseMessageImpl( + Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "404"}}})); + callback->onSuccess(request, std::move(msg)); + + // Verify observability. + EXPECT_EQ(4U, stats_.counter("tracing.datadog.timer_flushed").value()); + EXPECT_EQ(4U, stats_.counter("tracing.datadog.traces_sent").value()); + EXPECT_EQ(2U, stats_.counter("tracing.datadog.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.datadog.reports_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.datadog.reports_dropped").value()); + EXPECT_EQ(1U, stats_.counter("tracing.datadog.reports_failed").value()); + } +} + TEST_F(DatadogDriverTest, CancelInflightRequestsOnDestruction) { setupValidDriver(); diff --git a/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc b/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc index 0234b1a579b27..29aaba9ab6bd3 100644 --- a/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc +++ b/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc @@ -33,11 +33,13 @@ using testing::_; using testing::AtLeast; +using testing::DoAll; using testing::Eq; using testing::Invoke; using testing::NiceMock; using testing::Return; using testing::ReturnRef; +using testing::WithArg; namespace Envoy { namespace Extensions { @@ -71,6 +73,7 @@ class LightStepDriverTest : public testing::Test { opts->access_token = "sample_token"; opts->component_name = "component"; + cm_.thread_local_cluster_.cluster_.info_->name_ = "fake_cluster"; ON_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) .WillByDefault(ReturnRef(cm_.async_client_)); @@ -187,6 +190,21 @@ TEST_F(LightStepDriverTest, InitializeDriver) { } } +TEST_F(LightStepDriverTest, AllowCollectorClusterToBeAddedViaApi) { + EXPECT_CALL(cm_, get(Eq("fake_cluster"))).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()) + .WillByDefault(Return(Upstream::ClusterInfo::Features::HTTP2)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, addedViaApi()).WillByDefault(Return(true)); + + const std::string yaml_string = R"EOF( + collector_cluster: fake_cluster + )EOF"; + envoy::config::trace::v3::LightstepConfig lightstep_config; + TestUtility::loadFromYaml(yaml_string, lightstep_config); + + setup(lightstep_config, true); +} + TEST_F(LightStepDriverTest, FlushSeveralSpans) { setupValidDriver(2); @@ -239,6 +257,123 @@ TEST_F(LightStepDriverTest, FlushSeveralSpans) { .counter("grpc.lightstep.collector.CollectorService.Report.total") .value()); EXPECT_EQ(2U, stats_.counter("tracing.lightstep.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); +} + +TEST_F(LightStepDriverTest, SkipReportIfCollectorClusterHasBeenRemoved) { + Upstream::ClusterUpdateCallbacks* cluster_update_callbacks; + EXPECT_CALL(cm_, addThreadLocalClusterUpdateCallbacks_(_)) + .WillOnce(DoAll(SaveArgAddress(&cluster_update_callbacks), Return(nullptr))); + + setupValidDriver(1); + + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.lightstep.request_timeout", 5000U)) + .WillRepeatedly(Return(5000U)); + + // Verify the effect of onClusterAddOrUpdate()/onClusterRemoval() on reporting logic, + // keeping in mind that they will be called both for relevant and irrelevant clusters. + + { + // Simulate removal of the relevant cluster. + cluster_update_callbacks->onClusterRemoval("fake_cluster"); + + // Verify that no report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster(_)).Times(0); + EXPECT_CALL(cm_.async_client_, send_(_, _, _)).Times(0); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + driver_->flush(); + + // Verify observability. + EXPECT_EQ(1U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.spans_dropped").value()); + } + + { + // Simulate addition of an irrelevant cluster. + NiceMock unrelated_cluster; + unrelated_cluster.cluster_.info_->name_ = "unrelated_cluster"; + cluster_update_callbacks->onClusterAddOrUpdate(unrelated_cluster); + + // Verify that no report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster(_)).Times(0); + EXPECT_CALL(cm_.async_client_, send_(_, _, _)).Times(0); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + driver_->flush(); + + // Verify observability. + EXPECT_EQ(2U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.spans_dropped").value()); + } + + { + // Simulate addition of the relevant cluster. + cluster_update_callbacks->onClusterAddOrUpdate(cm_.thread_local_cluster_); + + // Verify that report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback{}; + EXPECT_CALL(cm_.async_client_, send_(_, _, _)) + .WillOnce(DoAll(WithArg<1>(SaveArgAddress(&callback)), Return(&request))); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + driver_->flush(); + + // Complete in-flight request. + callback->onFailure(request, Http::AsyncClient::FailureReason::Reset); + + // Verify observability. + EXPECT_EQ(2U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.spans_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_dropped").value()); + } + + { + // Simulate removal of an irrelevant cluster. + cluster_update_callbacks->onClusterRemoval("unrelated_cluster"); + + // Verify that report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback{}; + EXPECT_CALL(cm_.async_client_, send_(_, _, _)) + .WillOnce(DoAll(WithArg<1>(SaveArgAddress(&callback)), Return(&request))); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + driver_->flush(); + + // Complete in-flight request. + Http::ResponseMessagePtr msg(new Http::ResponseMessageImpl( + Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}})); + callback->onSuccess(request, std::move(msg)); + + // Verify observability. + EXPECT_EQ(2U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); + EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_dropped").value()); + } } TEST_F(LightStepDriverTest, FlushOneFailure) { @@ -286,6 +421,7 @@ TEST_F(LightStepDriverTest, FlushOneFailure) { .counter("grpc.lightstep.collector.CollectorService.Report.total") .value()); EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_dropped").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); } TEST_F(LightStepDriverTest, FlushWithActiveReport) { @@ -327,6 +463,7 @@ TEST_F(LightStepDriverTest, FlushWithActiveReport) { driver_->flush(); EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_dropped").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); EXPECT_CALL(request, cancel()); @@ -375,6 +512,7 @@ TEST_F(LightStepDriverTest, OnFullWithActiveReport) { ->finishSpan(); EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_dropped").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); EXPECT_CALL(request, cancel()); @@ -415,6 +553,7 @@ TEST_F(LightStepDriverTest, FlushSpansTimer) { EXPECT_EQ(1U, stats_.counter("tracing.lightstep.timer_flushed").value()); EXPECT_EQ(1U, stats_.counter("tracing.lightstep.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.lightstep.reports_skipped_no_cluster").value()); } TEST_F(LightStepDriverTest, CancelRequestOnDestruction) { diff --git a/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc b/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc index 899b0d04eaec7..f0b160ac56c13 100644 --- a/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc +++ b/test/extensions/tracers/zipkin/zipkin_tracer_impl_test.cc @@ -49,6 +49,7 @@ class ZipkinDriverTest : public testing::Test { ZipkinDriverTest() : time_source_(test_time_.timeSystem()) {} void setup(envoy::config::trace::v3::ZipkinConfig& zipkin_config, bool init_timer) { + cm_.thread_local_cluster_.cluster_.info_->name_ = "fake_cluster"; ON_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) .WillByDefault(ReturnRef(cm_.async_client_)); @@ -117,6 +118,7 @@ class ZipkinDriverTest : public testing::Test { callback->onSuccess(request, std::move(msg)); EXPECT_EQ(2U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_skipped_no_cluster").value()); EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_sent").value()); EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_dropped").value()); EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_failed").value()); @@ -161,7 +163,7 @@ TEST_F(ZipkinDriverTest, InitializeDriver) { } { - // Valid config but not valid cluster. + // Valid config but collector cluster doesn't exists. EXPECT_CALL(cm_, get(Eq("fake_cluster"))).WillOnce(Return(nullptr)); const std::string yaml_string = R"EOF( collector_cluster: fake_cluster @@ -189,6 +191,21 @@ TEST_F(ZipkinDriverTest, InitializeDriver) { } } +TEST_F(ZipkinDriverTest, AllowCollectorClusterToBeAddedViaApi) { + EXPECT_CALL(cm_, get(Eq("fake_cluster"))).WillRepeatedly(Return(&cm_.thread_local_cluster_)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, features()).WillByDefault(Return(0)); + ON_CALL(*cm_.thread_local_cluster_.cluster_.info_, addedViaApi()).WillByDefault(Return(true)); + + const std::string yaml_string = R"EOF( + collector_cluster: fake_cluster + collector_endpoint: /api/v1/spans + )EOF"; + envoy::config::trace::v3::ZipkinConfig zipkin_config; + TestUtility::loadFromYaml(yaml_string, zipkin_config); + + setup(zipkin_config, true); +} + TEST_F(ZipkinDriverTest, FlushSeveralSpans) { expectValidFlushSeveralSpans("HTTP_JSON_V1", "application/json"); } @@ -242,11 +259,134 @@ TEST_F(ZipkinDriverTest, FlushOneSpanReportFailure) { callback->onSuccess(request, std::move(msg)); EXPECT_EQ(1U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_skipped_no_cluster").value()); EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_sent").value()); EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_dropped").value()); EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_failed").value()); } +TEST_F(ZipkinDriverTest, SkipReportIfCollectorClusterHasBeenRemoved) { + Upstream::ClusterUpdateCallbacks* cluster_update_callbacks; + EXPECT_CALL(cm_, addThreadLocalClusterUpdateCallbacks_(_)) + .WillOnce(DoAll(SaveArgAddress(&cluster_update_callbacks), Return(nullptr))); + + setupValidDriver("HTTP_JSON_V1"); + + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.min_flush_spans", 5)) + .WillRepeatedly(Return(1)); + EXPECT_CALL(runtime_.snapshot_, getInteger("tracing.zipkin.request_timeout", 5000U)) + .WillRepeatedly(Return(5000U)); + + // Verify the effect of onClusterAddOrUpdate()/onClusterRemoval() on reporting logic, + // keeping in mind that they will be called both for relevant and irrelevant clusters. + + { + // Simulate removal of the relevant cluster. + cluster_update_callbacks->onClusterRemoval("fake_cluster"); + + // Verify that no report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster(_)).Times(0); + EXPECT_CALL(cm_.async_client_, send_(_, _, _)).Times(0); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + + // Verify observability. + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_dropped").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_failed").value()); + } + + { + // Simulate addition of an irrelevant cluster. + NiceMock unrelated_cluster; + unrelated_cluster.cluster_.info_->name_ = "unrelated_cluster"; + cluster_update_callbacks->onClusterAddOrUpdate(unrelated_cluster); + + // Verify that no report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster(_)).Times(0); + EXPECT_CALL(cm_.async_client_, send_(_, _, _)).Times(0); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + + // Verify observability. + EXPECT_EQ(2U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(2U, stats_.counter("tracing.zipkin.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_dropped").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_failed").value()); + } + + { + // Simulate addition of the relevant cluster. + cluster_update_callbacks->onClusterAddOrUpdate(cm_.thread_local_cluster_); + + // Verify that report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback{}; + EXPECT_CALL(cm_.async_client_, send_(_, _, _)) + .WillOnce(DoAll(WithArg<1>(SaveArgAddress(&callback)), Return(&request))); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + + // Complete in-flight request. + callback->onFailure(request, Http::AsyncClient::FailureReason::Reset); + + // Verify observability. + EXPECT_EQ(3U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(2U, stats_.counter("tracing.zipkin.reports_skipped_no_cluster").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_dropped").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_failed").value()); + } + + { + // Simulate removal of an irrelevant cluster. + cluster_update_callbacks->onClusterRemoval("unrelated_cluster"); + + // Verify that report will be sent. + EXPECT_CALL(cm_, httpAsyncClientForCluster("fake_cluster")) + .WillOnce(ReturnRef(cm_.async_client_)); + Http::MockAsyncClientRequest request(&cm_.async_client_); + Http::AsyncClient::Callbacks* callback{}; + EXPECT_CALL(cm_.async_client_, send_(_, _, _)) + .WillOnce(DoAll(WithArg<1>(SaveArgAddress(&callback)), Return(&request))); + + // Trigger flush of a span. + driver_ + ->startSpan(config_, request_headers_, operation_name_, start_time_, + {Tracing::Reason::Sampling, true}) + ->finishSpan(); + + // Complete in-flight request. + Http::ResponseMessagePtr msg(new Http::ResponseMessageImpl( + Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "202"}}})); + callback->onSuccess(request, std::move(msg)); + + // Verify observability. + EXPECT_EQ(4U, stats_.counter("tracing.zipkin.spans_sent").value()); + EXPECT_EQ(2U, stats_.counter("tracing.zipkin.reports_skipped_no_cluster").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_sent").value()); + EXPECT_EQ(0U, stats_.counter("tracing.zipkin.reports_dropped").value()); + EXPECT_EQ(1U, stats_.counter("tracing.zipkin.reports_failed").value()); + } +} + TEST_F(ZipkinDriverTest, CancelInflightRequestsOnDestruction) { setupValidDriver("HTTP_JSON_V1");