Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/envoy/api/v2/rds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ message RouteConfiguration {
string name = 1;

// An array of virtual hosts that make up the route table.
repeated route.VirtualHost virtual_hosts = 2 [(gogoproto.nullable) = false];
repeated route.VirtualHost virtual_hosts = 2; // [(gogoproto.nullable) = false];
Vhds vhds = 9;

// An array of virtual hosts will be dynamically loaded via the VHDS API.
// Both *virtual_hosts* and *vhds* fields will be used when present. *virtual_hosts* can be used
Expand Down
3 changes: 3 additions & 0 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/stats/stats_macros.h"

#include "common/protobuf/protobuf.h"
#include "common/common/assert.h"

namespace Envoy {
namespace Config {
Expand Down Expand Up @@ -81,6 +82,8 @@ class Subscription {
* @param resources vector of resource names to fetch.
*/
virtual void updateResources(const std::vector<std::string>& resources) PURE;

virtual void updateResourcesViaAliases(const std::vector<std::string>&) { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
};

/**
Expand Down
14 changes: 8 additions & 6 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,14 @@ class StreamFilterCallbacks {
*/
virtual Router::RouteConstSharedPtr route() PURE;

/**
* Returns the clusterInfo for the cached route.
* This method is to avoid multiple look ups in the filter chain, it also provides a consistent
* view of clusterInfo after a route is picked/repicked.
* NOTE: Cached clusterInfo and route will be updated the same time.
*/
virtual bool requestRouteConfigUpdate(std::function<void()> cb) PURE;

/**
* Returns the clusterInfo for the cached route.
* This method is to avoid multiple look ups in the filter chain, it also provides a consistent
* view of clusterInfo after a route is picked/repicked.
* NOTE: Cached clusterInfo and route will be updated the same time.
*/
virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE;

/**
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/router/rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class RouteConfigProvider {
* @return the last time this RouteConfigProvider was updated. Used for config dumps.
*/
virtual SystemTime lastUpdated() const PURE;

virtual bool requestConfigUpdate(const std::string for_domain, std::function<void()> cb) PURE;
};

typedef std::unique_ptr<RouteConfigProvider> RouteConfigProviderPtr;
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,8 @@ class Config {
* @return const std::string the RouteConfiguration name.
*/
virtual const std::string& name() const PURE;

virtual bool usesVhds() const PURE;
};

typedef std::shared_ptr<const Config> ConfigConstSharedPtr;
Expand Down
10 changes: 10 additions & 0 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class DeltaSubscriptionImpl
return; // The unpause will send this request.
}

request_.set_type_url(type_url_);
request_.mutable_node()->MergeFrom(local_info_.node());
request_.clear_resource_names_subscribe();
request_.clear_resource_names_unsubscribe();
std::copy(diff.added_.begin(), diff.added_.end(),
Expand Down Expand Up @@ -215,6 +217,14 @@ class DeltaSubscriptionImpl
stats_.update_attempt_.inc();
}

void updateResourcesViaAliases(const std::vector<std::string>& aliases) override {
ResourceNameDiff diff;
std::copy(aliases.begin(), aliases.end(), std::inserter(diff.added_, diff.added_.begin()));
queueDiscoveryRequest(diff);
//sendDiscoveryRequest(diff);
stats_.update_attempt_.inc();
}

private:
void disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
Expand Down
1 change: 1 addition & 0 deletions source/common/config/resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TypeUrlValues {
const std::string ClusterLoadAssignment{"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"};
const std::string Secret{"type.googleapis.com/envoy.api.v2.auth.Secret"};
const std::string RouteConfiguration{"type.googleapis.com/envoy.api.v2.RouteConfiguration"};
const std::string VirtualHost{"type.googleapis.com/envoy.api.v2.route.VirtualHost"};
};

typedef ConstSingleton<TypeUrlValues> TypeUrl;
Expand Down
5 changes: 5 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const AsyncClient::StreamOptions& options);

bool requestRouteConfigUpdate(std::function<void()>) override { return false; }

// Http::AsyncClient::Stream
void sendHeaders(HeaderMap& headers, bool end_stream) override;
void sendData(Buffer::Instance& data, bool end_stream) override;
Expand Down Expand Up @@ -172,6 +174,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
}

const std::string& name() const override { return EMPTY_STRING; }
bool usesVhds() const { return false; }

static const std::list<LowerCaseString> internal_only_headers_;
};
Expand Down Expand Up @@ -370,6 +373,8 @@ class AsyncRequestImpl final : public AsyncClient::Request,
AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
const AsyncClient::RequestOptions& options);

bool requestRouteConfigUpdate(std::function<void()>) override { return false; }

// AsyncClient::Request
virtual void cancel() override;

Expand Down
15 changes: 13 additions & 2 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_re

ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager)
: connection_manager_(connection_manager),
route_config_provider_(connection_manager.config_.routeConfigProvider()),
snapped_route_config_(connection_manager.config_.routeConfigProvider().config()),
stream_id_(connection_manager.random_generator_.random()),
request_response_timespan_(new Stats::Timespan(
Expand Down Expand Up @@ -1063,7 +1064,7 @@ void ConnectionManagerImpl::startDrainSequence() {
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
Router::RouteConstSharedPtr route;
if (request_headers_ != nullptr) {
route = snapped_route_config_->route(*request_headers_, stream_id_);
route = route_config_provider_.config()->route(*request_headers_, stream_id_);
}
stream_info_.route_entry_ = route ? route->routeEntry() : nullptr;
cached_route_ = std::move(route);
Expand All @@ -1076,6 +1077,12 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
}
}

bool ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate(std::function<void()> cb) {
// TODO check for an empty header?
auto host_header = Http::LowerCaseString(request_headers_->Host()->value().c_str()).get();
return route_config_provider_.requestConfigUpdate(host_header, cb);
}

void ConnectionManagerImpl::ActiveStream::sendLocalReply(
bool is_grpc_request, Code code, absl::string_view body,
const std::function<void(HeaderMap& headers)>& modify_headers, bool is_head_request,
Expand Down Expand Up @@ -1710,13 +1717,17 @@ Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBas
}

Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route() {
if (!parent_.cached_route_.has_value()) {
if (!parent_.cached_route_.has_value() || parent_.cached_route_.value() == nullptr) {
parent_.refreshCachedRoute();
}

return parent_.cached_route_.value();
}

bool ConnectionManagerImpl::ActiveStreamFilterBase::requestRouteConfigUpdate(std::function<void()> cb) {
return parent_.requestRouteConfigUpdate(cb);
}

void ConnectionManagerImpl::ActiveStreamFilterBase::clearRouteCache() {
parent_.cached_route_ = absl::optional<Router::RouteConstSharedPtr>();
parent_.cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>();
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
Event::Dispatcher& dispatcher() override;
void resetStream() override;
Router::RouteConstSharedPtr route() override;
bool requestRouteConfigUpdate(std::function<void()> cb) override;
Upstream::ClusterInfoConstSharedPtr clusterInfo() override;
void clearRouteCache() override;
uint64_t streamId() override;
Expand Down Expand Up @@ -351,6 +352,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void traceRequest();

void refreshCachedRoute();
bool requestRouteConfigUpdate(std::function<void()> cb);

// Pass on watermark callbacks to watermark subscribers. This boils down to passing watermark
// events for this stream and the downstream connection to the router filter.
Expand Down Expand Up @@ -420,6 +422,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void onRequestTimeout();

ConnectionManagerImpl& connection_manager_;
Router::RouteConfigProvider& route_config_provider_;
Router::ConfigConstSharedPtr snapped_route_config_;
Tracing::SpanPtr active_span_;
const uint64_t stream_id_;
Expand Down
28 changes: 28 additions & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,34 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "on_demand_update_lib",
srcs = ["on_demand_update.cc"],
hdrs = ["on_demand_update.h"],
deps = [
"//include/envoy/event:dispatcher_interface",
"//include/envoy/event:timer_interface",
"//include/envoy/http:filter_interface",
"//include/envoy/server:filter_config_interface",
"//source/common/access_log:access_log_lib",
"//source/common/buffer:watermark_buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:enum_to_int",
"//source/common/common:hash_lib",
"//source/common/common:hex_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/http:codes_lib",
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/http:message_lib",
"//source/common/http:utility_lib",
"@envoy_api//envoy/config/filter/http/router/v2:router_cc",
],
)

envoy_cc_library(
name = "router_ratelimit_lib",
srcs = ["router_ratelimit.cc"],
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ VirtualHostImpl::virtualClusterFromEntries(const Http::HeaderMap& headers) const
ConfigImpl::ConfigImpl(const envoy::api::v2::RouteConfiguration& config,
Server::Configuration::FactoryContext& factory_context,
bool validate_clusters_default)
: name_(config.name()) {
: name_(config.name()), uses_vhds_(config.has_vhds()) {
route_matcher_ = std::make_unique<RouteMatcher>(
config, *this, factory_context,
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, validate_clusters, validate_clusters_default));
Expand Down
5 changes: 4 additions & 1 deletion source/common/router/config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -768,12 +768,15 @@ class ConfigImpl : public Config {

const std::string& name() const override { return name_; }

bool usesVhds() const override { return uses_vhds_; }

private:
std::unique_ptr<RouteMatcher> route_matcher_;
std::list<Http::LowerCaseString> internal_only_headers_;
HeaderParserPtr request_headers_parser_;
HeaderParserPtr response_headers_parser_;
const std::string name_;
const bool uses_vhds_;
};

/**
Expand All @@ -789,7 +792,7 @@ class NullConfigImpl : public Config {
}

const std::string& name() const override { return name_; }

bool usesVhds() const override { return false; }
private:
std::list<Http::LowerCaseString> internal_only_headers_;
const std::string name_;
Expand Down
49 changes: 49 additions & 0 deletions source/common/router/on_demand_update.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "common/router/on_demand_update.h"

#include "common/common/assert.h"
#include "common/common/enum_to_int.h"
#include "common/http/codes.h"

#include "extensions/filters/http/well_known_names.h"

namespace Envoy {
namespace Router {

void OnDemandRouteUpdate::requestRouteConfigUpdate() {
if (callbacks_->route() != nullptr) {
filter_return_ = FilterReturn::ContinueDecoding;
} else {
auto configUpdateScheduled = callbacks_->requestRouteConfigUpdate([this]() -> void { onComplete(); });
filter_return_ = configUpdateScheduled ? FilterReturn::StopDecoding : FilterReturn::ContinueDecoding;
}
}

Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::HeaderMap&, bool) {
requestRouteConfigUpdate();
return filter_return_ == FilterReturn::StopDecoding ? Http::FilterHeadersStatus::StopIteration
: Http::FilterHeadersStatus::Continue;
}

Http::FilterDataStatus OnDemandRouteUpdate::decodeData(Buffer::Instance&, bool) {
return filter_return_ == FilterReturn::StopDecoding
? Http::FilterDataStatus::StopIterationAndWatermark
: Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus OnDemandRouteUpdate::decodeTrailers(Http::HeaderMap&) {
return filter_return_ == FilterReturn::StopDecoding ? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;
}

void OnDemandRouteUpdate::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
callbacks_ = &callbacks;
}

void OnDemandRouteUpdate::onComplete() {
filter_return_ = FilterReturn::ContinueDecoding;
// We got completion async. Let the filter chain continue.
callbacks_->continueDecoding();
}

} // namespace Router
} // namespace Envoy
54 changes: 54 additions & 0 deletions source/common/router/on_demand_update.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "envoy/http/filter.h"
#include "envoy/local_info/local_info.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/scope.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/assert.h"
#include "common/common/logger.h"
#include "common/common/matchers.h"
#include "common/http/header_map_impl.h"

namespace Envoy {
namespace Router {

class OnDemandRouteUpdate : public Logger::Loggable<Logger::Id::filter>,
public Http::StreamDecoderFilter {
public:
OnDemandRouteUpdate() {}

void requestRouteConfigUpdate();
void onComplete();

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap& trailers) override;
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override;
void onDestroy() override {}

private:
// State of this filter's communication with the external authorization service.
// The filter has either not started calling the external service, in the middle of calling
// it or has completed.
enum class State { NotStarted, Calling, Complete };

// FilterReturn is used to capture what the return code should be to the filter chain.
// if this filter is either in the middle of calling the service or the result is denied then
// the filter chain should stop. Otherwise the filter chain can continue to the next filter.
enum class FilterReturn { ContinueDecoding, StopDecoding };

Http::StreamDecoderFilterCallbacks* callbacks_{};
State state_{State::NotStarted};
FilterReturn filter_return_{FilterReturn::ContinueDecoding};
};

} // namespace Router
} // namespace Envoy
Loading