Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ void Utility::translateApiConfigSource(
}

void Utility::checkCluster(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm) {
Upstream::ClusterManager& cm, bool allow_added_via_api) {
Upstream::ThreadLocalCluster* cluster = cm.get(cluster_name);
if (cluster == nullptr) {
throw EnvoyException(fmt::format("{}: unknown cluster '{}'", error_prefix, cluster_name));
}

if (cluster->info()->addedViaApi()) {
if (!allow_added_via_api && cluster->info()->addedViaApi()) {
throw EnvoyException(fmt::format("{}: invalid cluster '{}': currently only "
"static (non-CDS) clusters are supported",
error_prefix, cluster_name));
Expand Down
4 changes: 3 additions & 1 deletion source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ class Utility {
* @param error_prefix supplies the prefix to use in error messages.
* @param cluster_name supplies the cluster name to check.
* @param cm supplies the cluster manager.
* @param allow_added_via_api indicates whether a cluster is allowed to be added via api
* rather than be a static resource from the bootstrap config.
*/
static void checkCluster(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm);
Upstream::ClusterManager& cm, bool allow_added_via_api = false);

/**
* Check cluster/local info for API config sanity. Throws on error.
Expand Down
9 changes: 9 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "cluster_update_tracker_lib",
srcs = ["cluster_update_tracker.cc"],
hdrs = ["cluster_update_tracker.h"],
deps = [
"//include/envoy/upstream:cluster_manager_interface",
],
)

envoy_cc_library(
name = "conn_pool_map",
hdrs = ["conn_pool_map.h"],
Expand Down
28 changes: 28 additions & 0 deletions source/common/upstream/cluster_update_tracker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "common/upstream/cluster_update_tracker.h"

namespace Envoy {
namespace Upstream {

ClusterUpdateTracker::ClusterUpdateTracker(ClusterManager& cm, const std::string& cluster_name)
: cluster_name_(cluster_name),
cluster_update_callbacks_handle_(cm.addThreadLocalClusterUpdateCallbacks(*this)) {
Upstream::ThreadLocalCluster* cluster = cm.get(cluster_name_);
cluster_info_ = cluster ? cluster->info() : nullptr;
}

void ClusterUpdateTracker::onClusterAddOrUpdate(ThreadLocalCluster& cluster) {
if (cluster.info()->name() != cluster_name_) {
return;
}
cluster_info_ = cluster.info();
}

void ClusterUpdateTracker::onClusterRemoval(const std::string& cluster) {
if (cluster != cluster_name_) {
return;
}
cluster_info_.reset();
}

} // namespace Upstream
} // namespace Envoy
33 changes: 33 additions & 0 deletions source/common/upstream/cluster_update_tracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "envoy/upstream/cluster_manager.h"

namespace Envoy {
namespace Upstream {

/**
* Keeps track of cluster updates in order to spot addition and removal.
*
* Use this class as a performance optimization to avoid going through ClusterManager::get()
* on the hot path.
*/
class ClusterUpdateTracker : public ClusterUpdateCallbacks {
public:
ClusterUpdateTracker(ClusterManager& cm, const std::string& cluster_name);

bool exists() { return cluster_info_ != nullptr; }
ClusterInfoConstSharedPtr info() { return cluster_info_; }

// ClusterUpdateCallbacks
void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override;
void onClusterRemoval(const std::string& cluster) override;

private:
const std::string cluster_name_;
const ClusterUpdateCallbacksHandlePtr cluster_update_callbacks_handle_;

ClusterInfoConstSharedPtr cluster_info_;
};

} // namespace Upstream
} // namespace Envoy
1 change: 1 addition & 0 deletions source/extensions/tracers/datadog/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ envoy_cc_library(
"//source/common/config:utility_lib",
"//source/common/http:async_client_utility_lib",
"//source/common/tracing:http_tracer_lib",
"//source/common/upstream:cluster_update_tracker_lib",
"//source/extensions/tracers:well_known_names",
"//source/extensions/tracers/common/ot:opentracing_driver_lib",
"@envoy_api//envoy/config/trace/v3:pkg_cc_proto",
Expand Down
29 changes: 18 additions & 11 deletions source/extensions/tracers/datadog/datadog_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ Driver::Driver(const envoy::config::trace::v3::DatadogConfig& datadog_config,
POOL_COUNTER_PREFIX(scope, "tracing.datadog."))},
tls_(tls.allocateSlot()), runtime_(runtime) {

Config::Utility::checkCluster(TracerNames::get().Datadog, datadog_config.collector_cluster(),
cm_);
cluster_ = cm_.get(datadog_config.collector_cluster())->info();
Config::Utility::checkCluster(TracerNames::get().Datadog, datadog_config.collector_cluster(), cm_,
/* allow_added_via_api */ true);
cluster_ = datadog_config.collector_cluster();

// Default tracer options.
tracer_options_.operation_name_override = "envoy.proxy";
Expand Down Expand Up @@ -60,7 +60,8 @@ opentracing::Tracer& Driver::tracer() { return *tls_->getTyped<TlsTracer>().trac

TraceReporter::TraceReporter(TraceEncoderSharedPtr encoder, Driver& driver,
Event::Dispatcher& dispatcher)
: driver_(driver), encoder_(encoder) {
: driver_(driver), encoder_(encoder),
collector_cluster_(driver_.clusterManager(), driver_.cluster()) {
flush_timer_ = dispatcher.createTimer([this]() -> void {
for (auto& h : encoder_->headers()) {
lower_case_headers_.emplace(h.first, Http::LowerCaseString{h.first});
Expand Down Expand Up @@ -89,7 +90,7 @@ void TraceReporter::flushTraces() {
Http::RequestMessagePtr message(new Http::RequestMessageImpl());
message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post);
message->headers().setReferencePath(encoder_->path());
message->headers().setReferenceHost(driver_.cluster()->name());
message->headers().setReferenceHost(driver_.cluster());
for (auto& h : encoder_->headers()) {
message->headers().setReferenceKey(lower_case_headers_.at(h.first), h.second);
}
Expand All @@ -100,13 +101,19 @@ void TraceReporter::flushTraces() {
ENVOY_LOG(debug, "submitting {} trace(s) to {} with payload size {}", pendingTraces,
encoder_->path(), encoder_->payload().size());

Http::AsyncClient::Request* request =
driver_.clusterManager()
.httpAsyncClientForCluster(driver_.cluster()->name())
.send(std::move(message), *this,
if (collector_cluster_.exists()) {
Http::AsyncClient::Request* request =
driver_.clusterManager()
.httpAsyncClientForCluster(collector_cluster_.info()->name())
.send(
std::move(message), *this,
Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(1000U)));
if (request) {
active_requests_.add(*request);
if (request) {
active_requests_.add(*request);
}
} else {
ENVOY_LOG(debug, "collector cluster '{}' does not exist", driver_.cluster());
driver_.tracerStats().reports_skipped_no_cluster_.inc();
}

encoder_->clearTraces();
Expand Down
7 changes: 5 additions & 2 deletions source/extensions/tracers/datadog/datadog_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common/http/async_client_utility.h"
#include "common/http/header_map_impl.h"
#include "common/json/json_loader.h"
#include "common/upstream/cluster_update_tracker.h"

#include "extensions/tracers/common/ot/opentracing_driver_impl.h"

Expand All @@ -23,6 +24,7 @@ namespace Datadog {
#define DATADOG_TRACER_STATS(COUNTER) \
COUNTER(traces_sent) \
COUNTER(timer_flushed) \
COUNTER(reports_skipped_no_cluster) \
COUNTER(reports_sent) \
COUNTER(reports_dropped) \
COUNTER(reports_failed)
Expand All @@ -49,7 +51,7 @@ class Driver : public Common::Ot::OpenTracingDriver {

// Getters to return the DatadogDriver's key members.
Upstream::ClusterManager& clusterManager() { return cm_; }
Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; }
const std::string& cluster() { return cluster_; }
Runtime::Loader& runtime() { return runtime_; }
DatadogTracerStats& tracerStats() { return tracer_stats_; }
const datadog::opentracing::TracerOptions& tracerOptions() { return tracer_options_; }
Expand All @@ -74,7 +76,7 @@ class Driver : public Common::Ot::OpenTracingDriver {
};

Upstream::ClusterManager& cm_;
Upstream::ClusterInfoConstSharedPtr cluster_;
std::string cluster_;
DatadogTracerStats tracer_stats_;
datadog::opentracing::TracerOptions tracer_options_;
ThreadLocal::SlotPtr tls_;
Expand Down Expand Up @@ -129,6 +131,7 @@ class TraceReporter : public Http::AsyncClient::Callbacks,

std::map<std::string, Http::LowerCaseString> lower_case_headers_;

Upstream::ClusterUpdateTracker collector_cluster_;
// Track active HTTP requests to be able to cancel them on destruction.
Http::AsyncClientRequestTracker active_requests_;
};
Expand Down
1 change: 1 addition & 0 deletions source/extensions/tracers/lightstep/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ envoy_cc_library(
"//source/common/grpc:context_lib",
"//source/common/stats:symbol_table_lib",
"//source/common/tracing:http_tracer_lib",
"//source/common/upstream:cluster_update_tracker_lib",
"//source/extensions/tracers:well_known_names",
"//source/extensions/tracers/common/ot:opentracing_driver_lib",
"@envoy_api//envoy/config/trace/v3:pkg_cc_proto",
Expand Down
43 changes: 25 additions & 18 deletions source/extensions/tracers/lightstep/lightstep_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void LightStepLogger::operator()(lightstep::LogLevel level,
const size_t LightStepDriver::DefaultMinFlushSpans = 200U;

LightStepDriver::LightStepTransporter::LightStepTransporter(LightStepDriver& driver)
: driver_(driver) {}
: driver_(driver), collector_cluster_(driver_.clusterManager(), driver_.cluster()) {}

LightStepDriver::LightStepTransporter::~LightStepTransporter() {
if (active_request_ != nullptr) {
Expand All @@ -70,14 +70,14 @@ LightStepDriver::LightStepTransporter::~LightStepTransporter() {

void LightStepDriver::LightStepTransporter::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& /*response*/) {
driver_.grpc_context_.chargeStat(*driver_.cluster(), driver_.request_stat_names_, true);
driver_.grpc_context_.chargeStat(*active_cluster_, driver_.request_stat_names_, true);
active_callback_->OnSuccess(*active_report_);
reset();
}

void LightStepDriver::LightStepTransporter::onFailure(
const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason /*failure_reason*/) {
driver_.grpc_context_.chargeStat(*driver_.cluster(), driver_.request_stat_names_, false);
driver_.grpc_context_.chargeStat(*active_cluster_, driver_.request_stat_names_, false);
active_callback_->OnFailure(*active_report_);
reset();
}
Expand All @@ -95,24 +95,31 @@ void LightStepDriver::LightStepTransporter::Send(std::unique_ptr<lightstep::Buff
callback.OnFailure(*report);
return;
}
active_report_ = std::move(report);
active_callback_ = &callback;

const uint64_t timeout =
driver_.runtime().snapshot().getInteger("tracing.lightstep.request_timeout", 5000U);
Http::RequestMessagePtr message = Grpc::Common::prepareHeaders(
driver_.cluster()->name(), lightstep::CollectorServiceFullName(),
lightstep::CollectorMethodName(), absl::optional<std::chrono::milliseconds>(timeout));
message->body() = serializeGrpcMessage(*active_report_);

active_request_ =
driver_.clusterManager()
.httpAsyncClientForCluster(driver_.cluster()->name())
.send(std::move(message), *this,
Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(timeout)));
driver_.cluster(), lightstep::CollectorServiceFullName(), lightstep::CollectorMethodName(),
absl::optional<std::chrono::milliseconds>(timeout));
message->body() = serializeGrpcMessage(*report);

if (collector_cluster_.exists()) {
active_report_ = std::move(report);
active_callback_ = &callback;
active_cluster_ = collector_cluster_.info();
active_request_ = driver_.clusterManager()
.httpAsyncClientForCluster(collector_cluster_.info()->name())
.send(std::move(message), *this,
Http::AsyncClient::RequestOptions().setTimeout(
std::chrono::milliseconds(timeout)));
} else {
ENVOY_LOG(debug, "collector cluster '{}' does not exist", driver_.cluster());
driver_.tracerStats().reports_skipped_no_cluster_.inc();
}
}

void LightStepDriver::LightStepTransporter::reset() {
active_cluster_ = nullptr;
active_request_ = nullptr;
active_callback_ = nullptr;
active_report_ = nullptr;
Expand Down Expand Up @@ -164,12 +171,12 @@ LightStepDriver::LightStepDriver(const envoy::config::trace::v3::LightstepConfig
pool_.add(lightstep::CollectorMethodName())} {

Config::Utility::checkCluster(TracerNames::get().Lightstep, lightstep_config.collector_cluster(),
cm_);
cluster_ = cm_.get(lightstep_config.collector_cluster())->info();
cm_, /* allow_added_via_api */ true);
cluster_ = lightstep_config.collector_cluster();

if (!(cluster_->features() & Upstream::ClusterInfo::Features::HTTP2)) {
if (!(cm_.get(cluster_)->info()->features() & Upstream::ClusterInfo::Features::HTTP2)) {
throw EnvoyException(
fmt::format("{} collector cluster must support http2 for gRPC calls", cluster_->name()));
fmt::format("{} collector cluster must support http2 for gRPC calls", cluster_));
}

tls_->set([this](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
Expand Down
14 changes: 10 additions & 4 deletions source/extensions/tracers/lightstep/lightstep_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/json/json_loader.h"
#include "common/protobuf/protobuf.h"
#include "common/stats/symbol_table_impl.h"
#include "common/upstream/cluster_update_tracker.h"

#include "extensions/tracers/common/ot/opentracing_driver_impl.h"

Expand All @@ -33,7 +34,8 @@ namespace Lightstep {
#define LIGHTSTEP_TRACER_STATS(COUNTER) \
COUNTER(spans_sent) \
COUNTER(spans_dropped) \
COUNTER(timer_flushed)
COUNTER(timer_flushed) \
COUNTER(reports_skipped_no_cluster)

struct LightstepTracerStats {
LIGHTSTEP_TRACER_STATS(GENERATE_COUNTER_STRUCT)
Expand Down Expand Up @@ -62,7 +64,7 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver {
PropagationMode propagation_mode, Grpc::Context& grpc_context);

Upstream::ClusterManager& clusterManager() { return cm_; }
Upstream::ClusterInfoConstSharedPtr cluster() { return cluster_; }
const std::string& cluster() { return cluster_; }
Runtime::Loader& runtime() { return runtime_; }
LightstepTracerStats& tracerStats() { return tracer_stats_; }

Expand All @@ -75,7 +77,9 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver {
PropagationMode propagationMode() const override { return propagation_mode_; }

private:
class LightStepTransporter : public lightstep::AsyncTransporter, Http::AsyncClient::Callbacks {
class LightStepTransporter : Logger::Loggable<Logger::Id::tracing>,
public lightstep::AsyncTransporter,
public Http::AsyncClient::Callbacks {
public:
explicit LightStepTransporter(LightStepDriver& driver);

Expand All @@ -95,8 +99,10 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver {
private:
std::unique_ptr<lightstep::BufferChain> active_report_;
Callback* active_callback_ = nullptr;
Upstream::ClusterInfoConstSharedPtr active_cluster_;
Http::AsyncClient::Request* active_request_ = nullptr;
LightStepDriver& driver_;
Upstream::ClusterUpdateTracker collector_cluster_;

void reset();
};
Expand Down Expand Up @@ -129,7 +135,7 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver {
};

Upstream::ClusterManager& cm_;
Upstream::ClusterInfoConstSharedPtr cluster_;
std::string cluster_;
LightstepTracerStats tracer_stats_;
ThreadLocal::SlotPtr tls_;
Runtime::Loader& runtime_;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/tracers/zipkin/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ envoy_cc_library(
"//source/common/network:address_lib",
"//source/common/singleton:const_singleton",
"//source/common/tracing:http_tracer_lib",
"//source/common/upstream:cluster_update_tracker_lib",
"//source/extensions/tracers:well_known_names",
"@com_github_openzipkin_zipkinapi//:zipkin_cc_proto",
"@envoy_api//envoy/config/trace/v3:pkg_cc_proto",
Expand Down
Loading