Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contrib/rocketmq_proxy/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ Network::FilterFactoryCb RocketmqProxyFilterConfigFactory::createFilterFactoryFr
Server::Configuration::FactoryContext& context) {
std::shared_ptr<ConfigImpl> filter_config = std::make_shared<ConfigImpl>(proto_config, context);
return [filter_config, &context](Network::FilterManager& filter_manager) -> void {
filter_manager.addReadFilter(
std::make_shared<ConnectionManager>(*filter_config, context.dispatcher().timeSource()));
filter_manager.addReadFilter(std::make_shared<ConnectionManager>(
*filter_config, context.mainThreadDispatcher().timeSource()));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ActiveMessageTest : public testing::Test {
ActiveMessageTest()
: stats_(RocketmqFilterStats::generateStats("test.", store_)),
config_(rocketmq_proxy_config_, factory_context_),
connection_manager_(config_, factory_context_.dispatcher().timeSource()) {
connection_manager_(config_, factory_context_.mainThreadDispatcher().timeSource()) {
connection_manager_.initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class RocketmqConnectionManagerTest : public Event::TestUsingSimulatedTime, publ
TestUtility::validate(proto_config_);
}
config_ = std::make_unique<TestConfigImpl>(proto_config_, factory_context_, stats_);
conn_manager_ =
std::make_unique<ConnectionManager>(*config_, factory_context_.dispatcher().timeSource());
conn_manager_ = std::make_unique<ConnectionManager>(
*config_, factory_context_.mainThreadDispatcher().timeSource());
conn_manager_->initializeReadFilterCallbacks(filter_callbacks_);
conn_manager_->onNewConnection();
current_ = factory_context_.dispatcher().timeSource().monotonicTime();
current_ = factory_context_.mainThreadDispatcher().timeSource().monotonicTime();
}

void initializeCluster() {
Expand Down
2 changes: 1 addition & 1 deletion contrib/rocketmq_proxy/filters/network/test/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class RocketmqRouterTestBase {
cluster_info_(std::make_shared<Upstream::MockClusterInfo>()) {
context_.cluster_manager_.initializeThreadLocalClusters({"fake_cluster"});
conn_manager_ =
std::make_unique<ConnectionManager>(config_, context_.dispatcher().timeSource());
std::make_unique<ConnectionManager>(config_, context_.mainThreadDispatcher().timeSource());
conn_manager_->initializeReadFilterCallbacks(filter_callbacks_);
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/sxg/filters/http/test/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ TEST_F(FilterTest, SdsDynamicGenericSecret) {
NiceMock<Event::MockDispatcher> dispatcher;
EXPECT_CALL(secret_context, localInfo()).WillRepeatedly(ReturnRef(local_info));
EXPECT_CALL(secret_context, api()).WillRepeatedly(ReturnRef(*api));
EXPECT_CALL(secret_context, dispatcher()).WillRepeatedly(ReturnRef(dispatcher));
EXPECT_CALL(secret_context, mainThreadDispatcher()).WillRepeatedly(ReturnRef(dispatcher));
EXPECT_CALL(secret_context, stats()).WillRepeatedly(ReturnRef(stats));
EXPECT_CALL(secret_context, initManager()).WillRepeatedly(ReturnRef(init_manager));
EXPECT_CALL(init_manager, add(_))
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/factory_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class FactoryContextBase {
* @return Event::Dispatcher& the main thread's dispatcher. This dispatcher should be used
* for all singleton processing.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/**
* @return Api::Api& a reference to the api object.
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/health_checker_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class HealthCheckerFactoryContext {
* @return Event::Dispatcher& the main thread's dispatcher. This dispatcher should be used
* for all singleton processing.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/*
* @return Upstream::HealthCheckEventLoggerPtr the health check event logger for the
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/resource_monitor_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ResourceMonitorFactoryContext {
* @return Event::Dispatcher& the main thread's dispatcher. This dispatcher should be used
* for all singleton processing.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/**
* @return Server::Options& the command-line options that Envoy was started with.
Expand Down
2 changes: 1 addition & 1 deletion envoy/server/transport_socket_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TransportSocketFactoryContext {
/**
* @return Event::Dispatcher& the main thread's dispatcher.
*/
virtual Event::Dispatcher& dispatcher() PURE;
virtual Event::Dispatcher& mainThreadDispatcher() PURE;

/**
* @return Server::Options& the command-line options that Envoy was started with.
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ RouteEntryImplBase::RouteEntryImplBase(const VirtualHostImpl& vhost,
vhost_.globalRouteConfig().maxDirectResponseBodySizeBytes())),
per_filter_configs_(route.typed_per_filter_config(), optional_http_filters, factory_context,
validator),
route_name_(route.name()), time_source_(factory_context.dispatcher().timeSource()) {
route_name_(route.name()), time_source_(factory_context.mainThreadDispatcher().timeSource()) {
if (route.route().has_metadata_match()) {
const auto filter_it = route.route().metadata_match().filter_metadata().find(
Envoy::Config::MetadataFilters::get().ENVOY_LB);
Expand Down
8 changes: 5 additions & 3 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,11 @@ void RdsRouteConfigProviderImpl::requestVirtualHostsUpdate(
// execute the callback. still_alive shared_ptr will be deallocated when the current instance of
// the RdsRouteConfigProviderImpl is deallocated; we rely on a weak_ptr to still_alive flag to
// determine if the RdsRouteConfigProviderImpl instance is still valid.
factory_context_.dispatcher().post([this, maybe_still_alive = std::weak_ptr<bool>(still_alive_),
alias, &thread_local_dispatcher,
route_config_updated_cb]() -> void {
factory_context_.mainThreadDispatcher().post([this,
maybe_still_alive =
std::weak_ptr<bool>(still_alive_),
alias, &thread_local_dispatcher,
route_config_updated_cb]() -> void {
if (maybe_still_alive.lock()) {
subscription_->updateOnDemand(alias);
config_update_callbacks_.push_back({alias, thread_local_dispatcher, route_config_updated_cb});
Expand Down
4 changes: 2 additions & 2 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@ void ScopedRdsConfigSubscription::onDemandRdsUpdate(
std::shared_ptr<Router::ScopeKey> scope_key, Event::Dispatcher& thread_local_dispatcher,
Http::RouteConfigUpdatedCallback&& route_config_updated_cb,
std::weak_ptr<Envoy::Config::ConfigSubscriptionCommonBase> weak_subscription) {
factory_context_.dispatcher().post([this, &thread_local_dispatcher, scope_key,
route_config_updated_cb, weak_subscription]() {
factory_context_.mainThreadDispatcher().post([this, &thread_local_dispatcher, scope_key,
route_config_updated_cb, weak_subscription]() {
// If the subscription has been destroyed, return immediately.
if (!weak_subscription.lock()) {
thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); });
Expand Down
20 changes: 12 additions & 8 deletions source/common/secret/sds_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ class TlsCertificateSdsApi : public SdsApi, public TlsCertificateConfigProvider
Config::Utility::checkLocalInfo("TlsCertificateSdsApi", secret_provider_context.localInfo());
return std::make_shared<TlsCertificateSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}

TlsCertificateSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
Expand Down Expand Up @@ -226,9 +227,10 @@ class CertificateValidationContextSdsApi : public SdsApi,
secret_provider_context.localInfo());
return std::make_shared<CertificateValidationContextSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}
CertificateValidationContextSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
const std::string& sds_config_name,
Expand Down Expand Up @@ -320,9 +322,10 @@ class TlsSessionTicketKeysSdsApi : public SdsApi, public TlsSessionTicketKeysCon
secret_provider_context.localInfo());
return std::make_shared<TlsSessionTicketKeysSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}

TlsSessionTicketKeysSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
Expand Down Expand Up @@ -392,9 +395,10 @@ class GenericSecretSdsApi : public SdsApi, public GenericSecretConfigProvider {
Config::Utility::checkLocalInfo("GenericSecretSdsApi", secret_provider_context.localInfo());
return std::make_shared<GenericSecretSdsApi>(
sds_config, sds_config_name, secret_provider_context.clusterManager().subscriptionFactory(),
secret_provider_context.dispatcher().timeSource(),
secret_provider_context.mainThreadDispatcher().timeSource(),
secret_provider_context.messageValidationVisitor(), secret_provider_context.stats(),
destructor_cb, secret_provider_context.dispatcher(), secret_provider_context.api());
destructor_cb, secret_provider_context.mainThreadDispatcher(),
secret_provider_context.api());
}

GenericSecretSdsApi(const envoy::config::core::v3::ConfigSource& sds_config,
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ envoy_cc_library(
"//source/common/upstream:priority_conn_pool_map_impl_lib",
"//source/common/upstream:upstream_lib",
"//source/common/quic:quic_stat_names_lib",
"//source/server:factory_context_base_impl_lib",
"@envoy_api//envoy/admin/v3:pkg_cc_proto",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
Expand Down
8 changes: 4 additions & 4 deletions source/common/upstream/cluster_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ ClusterFactoryImplBase::selectDnsResolver(const envoy::config::cluster::v3::Clus
resolvers.push_back(Network::Address::resolveProtoAddress(resolver_addr));
}
}
return context.dispatcher().createDnsResolver(resolvers, dns_resolver_options);
return context.mainThreadDispatcher().createDnsResolver(resolvers, dns_resolver_options);
}

return context.dnsResolver();
Expand All @@ -127,7 +127,7 @@ ClusterFactoryImplBase::create(const envoy::config::cluster::v3::Cluster& cluste
transport_factory_context =
std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
context.admin(), context.sslContextManager(), *stats_scope, context.clusterManager(),
context.localInfo(), context.dispatcher(), context.stats(),
context.localInfo(), context.mainThreadDispatcher(), context.stats(),
context.singletonManager(), context.threadLocal(), context.messageValidationVisitor(),
context.api(), context.options());

Expand All @@ -141,13 +141,13 @@ ClusterFactoryImplBase::create(const envoy::config::cluster::v3::Cluster& cluste
} else {
new_cluster_pair.first->setHealthChecker(HealthCheckerFactory::create(
cluster.health_checks()[0], *new_cluster_pair.first, context.runtime(),
context.dispatcher(), context.logManager(), context.messageValidationVisitor(),
context.mainThreadDispatcher(), context.logManager(), context.messageValidationVisitor(),
context.api()));
}
}

new_cluster_pair.first->setOutlierDetector(Outlier::DetectorImplFactory::createForCluster(
*new_cluster_pair.first, cluster, context.dispatcher(), context.runtime(),
*new_cluster_pair.first, cluster, context.mainThreadDispatcher(), context.runtime(),
context.outlierEventLogger()));

new_cluster_pair.first->setTransportFactoryContext(std::move(transport_factory_context));
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ClusterFactoryContextImpl : public ClusterFactoryContext {
Network::DnsResolverSharedPtr dnsResolver() override { return dns_resolver_; }
Ssl::ContextManager& sslContextManager() override { return ssl_context_manager_; }
Runtime::Loader& runtime() override { return runtime_; }
Event::Dispatcher& dispatcher() override { return dispatcher_; }
Event::Dispatcher& mainThreadDispatcher() override { return dispatcher_; }
AccessLog::AccessLogManager& logManager() override { return log_manager_; }
const LocalInfo::LocalInfo& localInfo() const override { return local_info_; }
const Server::Options& options() override { return options_; }
Expand Down
44 changes: 23 additions & 21 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1643,8 +1643,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::tcpConnPoolIsIdle(
ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto(
const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
return ClusterManagerPtr{new ClusterManagerImpl(
bootstrap, *this, stats_, tls_, runtime_, local_info_, log_manager_, main_thread_dispatcher_,
admin_, validation_context_, api_, http_context_, grpc_context_, router_context_)};
bootstrap, *this, stats_, tls_, context_.runtime(), context_.localInfo(), log_manager_,
context_.mainThreadDispatcher(), context_.admin(), validation_context_, context_.api(),
http_context_, grpc_context_, router_context_)};
}

Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
Expand All @@ -1655,7 +1656,8 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
TimeSource& source, ClusterConnectivityState& state) {
if (protocols.size() == 3 && runtime_.snapshot().featureEnabled("upstream.use_http3", 100)) {
if (protocols.size() == 3 &&
context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
ASSERT(contains(protocols,
{Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}));
Http::AlternateProtocolsCacheSharedPtr alternate_protocols_cache;
Expand All @@ -1667,30 +1669,30 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
// TODO(RyanTheOptimist): Plumb an actual alternate protocols cache.
Envoy::Http::ConnectivityGrid::ConnectivityOptions coptions{protocols};
return std::make_unique<Http::ConnectivityGrid>(
dispatcher, api_.randomGenerator(), host, priority, options, transport_socket_options,
state, source, alternate_protocols_cache, std::chrono::milliseconds(300), coptions,
quic_stat_names_, stats_);
dispatcher, context_.api().randomGenerator(), host, priority, options,
transport_socket_options, state, source, alternate_protocols_cache,
std::chrono::milliseconds(300), coptions, quic_stat_names_, stats_);
#else
// Should be blocked by configuration checking at an earlier point.
NOT_REACHED_GCOVR_EXCL_LINE;
#endif
}
if (protocols.size() >= 2) {
ASSERT(contains(protocols, {Http::Protocol::Http11, Http::Protocol::Http2}));
return std::make_unique<Http::HttpConnPoolImplMixed>(dispatcher, api_.randomGenerator(), host,
priority, options,
transport_socket_options, state);
return std::make_unique<Http::HttpConnPoolImplMixed>(
dispatcher, context_.api().randomGenerator(), host, priority, options,
transport_socket_options, state);
}
if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 &&
runtime_.snapshot().featureEnabled("upstream.use_http2", 100)) {
return Http::Http2::allocateConnPool(dispatcher, api_.randomGenerator(), host, priority,
options, transport_socket_options, state);
context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) {
return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
priority, options, transport_socket_options, state);
}
if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http3 &&
runtime_.snapshot().featureEnabled("upstream.use_http3", 100)) {
context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
#ifdef ENVOY_ENABLE_QUIC
return Http::Http3::allocateConnPool(dispatcher, api_.randomGenerator(), host, priority,
options, transport_socket_options, state, source,
return Http::Http3::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
priority, options, transport_socket_options, state, source,
quic_stat_names_, stats_);
#else
UNREFERENCED_PARAMETER(source);
Expand All @@ -1699,8 +1701,8 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
#endif
}
ASSERT(protocols.size() == 1 && protocols[0] == Http::Protocol::Http11);
return Http::Http1::allocateConnPool(dispatcher, api_.randomGenerator(), host, priority, options,
transport_socket_options, state);
return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
options, transport_socket_options, state);
}

Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool(
Expand All @@ -1722,12 +1724,12 @@ std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> ProdClusterManagerFactor
const envoy::config::cluster::v3::Cluster& cluster, ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) {
return ClusterFactoryImplBase::create(
cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_,
main_thread_dispatcher_, log_manager_, local_info_, admin_, singleton_manager_,
outlier_event_logger, added_via_api,
cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_, context_.runtime(),
context_.mainThreadDispatcher(), log_manager_, context_.localInfo(), admin_,
singleton_manager_, outlier_event_logger, added_via_api,
added_via_api ? validation_context_.dynamicValidationVisitor()
: validation_context_.staticValidationVisitor(),
api_, options_);
context_.api(), context_.options());
}

CdsApiPtr
Expand Down
Loading