Skip to content

Commit

Permalink
Refactor async client (#12)
Browse files Browse the repository at this point in the history
This change refactors async client so that it is owned by the cluster
manager. Now users can just send requests and fire and forget if they
don't need to cancel. This makes a lot of the consumers much simpler.
The fire and forget functionality will be used for router request
shadowing (similar to the HTTP tracer).
  • Loading branch information
mattklein123 authored Aug 15, 2016
1 parent 16b266f commit 7654e26
Show file tree
Hide file tree
Showing 33 changed files with 532 additions and 430 deletions.
13 changes: 6 additions & 7 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,18 @@ class AsyncClient {
virtual void cancel() PURE;
};

typedef std::unique_ptr<Request> RequestPtr;

virtual ~AsyncClient() {}

/**
* Send an HTTP request asynchronously
* @param request the request to send
* @param callbacks the callbacks to be notified of request status
* @param request the request to send.
* @param callbacks the callbacks to be notified of request status.
* @return a request handle or nullptr if no request could be created. NOTE: In this case
* onFailure() has already been called inline.
* onFailure() has already been called inline. The client owns the request and the
* handle should just be used to cancel.
*/
virtual RequestPtr send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
virtual Request* send(MessagePtr&& request, Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) PURE;
};

typedef std::unique_ptr<AsyncClient> AsyncClientPtr;
Expand Down
17 changes: 6 additions & 11 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ class ClusterManager {
*/
virtual const Cluster* get(const std::string& cluster) PURE;

/**
* @return whether the cluster manager knows about a particular cluster by name.
*/
virtual bool has(const std::string& cluster) PURE;

/**
* Allocate a load balanced HTTP connection pool for a cluster. This is *per-thread* so that
* callers do not need to worry about per thread synchronization. The load balancing policy that
* is used is the one defined on the cluster when it was created.
*
* Can return nullptr if there is no host available in the cluster or the cluster name is not
* valid.
* Can return nullptr if there is no host available in the cluster.
*/
virtual Http::ConnectionPool::Instance* httpConnPoolForCluster(const std::string& cluster) PURE;

Expand All @@ -52,15 +46,16 @@ class ClusterManager {
* load balancing policy that is used is the one defined on the cluster when it was created.
*
* Returns both a connection and the host that backs the connection. Both can be nullptr if there
* is no host available in the cluster or the cluster name is not valid.
* is no host available in the cluster.
*/
virtual Host::CreateConnectionData tcpConnForCluster(const std::string& cluster) PURE;

/**
* Returns a client that can be used to make async HTTP calls against the given cluster. The
* client may be backed by a connection pool or by a multiplexed connection.
* Returns a client that can be used to make async HTTP calls against the given cluster. The
* client may be backed by a connection pool or by a multiplexed connection. The cluster manager
* owns the client.
*/
virtual Http::AsyncClientPtr httpAsyncClientForCluster(const std::string& cluster) PURE;
virtual Http::AsyncClient& httpAsyncClientForCluster(const std::string& cluster) PURE;

/**
* Shutdown the cluster prior to destroying connection pools and other thread local data.
Expand Down
3 changes: 1 addition & 2 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ class Cluster : public virtual HostSet {
virtual ResourceManager& resourceManager() const PURE;

/**
* Shutdown the cluster manager prior to destroying connection pools and other thread local
* data.
* Shutdown the cluster prior to destroying connection pools and other thread local data.
*/
virtual void shutdown() PURE;

Expand Down
5 changes: 5 additions & 0 deletions source/common/common/linked_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ template <class T> class LinkedObject {
return entry_;
}

/**
* @return whether the object is currently inserted into a list.
*/
bool inserted() { return inserted_; }

/**
* Move a linked item between 2 lists.
* @param list1 supplies the first list.
Expand Down
16 changes: 3 additions & 13 deletions source/common/filter/auth/client_ssl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Config::Config(const Json::Object& config, ThreadLocal::Instance& tls, Upstream:
ip_white_list_(config), stats_(generateStats(stats_store, config.getString("stat_prefix"))),
runtime_(runtime), local_address_(local_address) {

if (!cm_.has(auth_api_cluster_)) {
if (!cm_.get(auth_api_cluster_)) {
throw EnvoyException(
fmt::format("unknown cluster '{}' in client ssl auth config", auth_api_cluster_));
}
Expand Down Expand Up @@ -83,29 +83,19 @@ void Config::onFailure(Http::AsyncClient::FailureReason) {
}

void Config::refreshPrincipals() {
ASSERT(!active_request_);
active_request_.reset(new ActiveRequest());
active_request_->client_ = cm_.httpAsyncClientForCluster(auth_api_cluster_);
if (!active_request_->client_) {
onFailure(Http::AsyncClient::FailureReason::Reset);
return;
}

Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "GET");
message->headers().addViaMoveValue(Http::Headers::get().Path, "/v1/certs/list/approved");
message->headers().addViaCopy(Http::Headers::get().Host, auth_api_cluster_);
message->headers().addViaCopy(Http::Headers::get().ForwardedFor, local_address_);
active_request_->request_ = active_request_->client_->send(std::move(message), *this,
Optional<std::chrono::milliseconds>());
cm_.httpAsyncClientForCluster(auth_api_cluster_)
.send(std::move(message), *this, Optional<std::chrono::milliseconds>());
}

void Config::requestComplete() {
std::chrono::milliseconds interval(
runtime_.snapshot().getInteger("auth.clientssl.refresh_interval_ms", 60000));

active_request_.reset();
interval_timer_->enableTimer(interval);
}

Expand Down
8 changes: 0 additions & 8 deletions source/common/filter/auth/client_ssl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ class Config : public Http::AsyncClient::Callbacks {
void onFailure(Http::AsyncClient::FailureReason reason) override;

private:
struct ActiveRequest {
Http::AsyncClientPtr client_;
Http::AsyncClient::RequestPtr request_;
};

typedef std::unique_ptr<ActiveRequest> ActiveRequestPtr;

static GlobalStats generateStats(Stats::Store& store, const std::string& prefix);
AllowedPrincipalsPtr parseAuthResponse(Http::Message& message);
void refreshPrincipals();
Expand All @@ -94,7 +87,6 @@ class Config : public Http::AsyncClient::Callbacks {
uint32_t tls_slot_;
Upstream::ClusterManager& cm_;
const std::string auth_api_cluster_;
ActiveRequestPtr active_request_;
Event::TimerPtr interval_timer_;
Network::IpWhiteList ip_white_list_;
GlobalStats stats_;
Expand Down
2 changes: 1 addition & 1 deletion source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TcpProxyConfig::TcpProxyConfig(const Json::Object& config,
Upstream::ClusterManager& cluster_manager, Stats::Store& stats_store)
: cluster_name_(config.getString("cluster")),
stats_(generateStats(config.getString("stat_prefix"), stats_store)) {
if (!cluster_manager.has(cluster_name_)) {
if (!cluster_manager.get(cluster_name_)) {
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}'", cluster_name_));
}
}
Expand Down
11 changes: 1 addition & 10 deletions source/common/grpc/rpc_channel_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp
// here for clarity.
ASSERT(cm_.get(cluster_)->features() & Upstream::Cluster::Features::HTTP2);

client_ = cm_.httpAsyncClientForCluster(cluster_);
if (!client_) {
onFailureWorker(Optional<uint64_t>(), "http request failure");
return;
}

Http::MessagePtr message(new Http::RequestMessageImpl());
message->headers().addViaMoveValue(Http::Headers::get().Scheme, "http");
message->headers().addViaMoveValue(Http::Headers::get().Method, "POST");
Expand All @@ -46,10 +40,7 @@ void RpcChannelImpl::CallMethod(const proto::MethodDescriptor* method, proto::Rp
message->headers().addViaCopy(Http::Headers::get().ContentType, Common::GRPC_CONTENT_TYPE);
message->body(serializeBody(*grpc_request));

http_request_ = client_->send(std::move(message), *this, timeout_);
if (!http_request_) {
onFailureWorker(Optional<uint64_t>(), "http request failure");
}
http_request_ = cm_.httpAsyncClientForCluster(cluster_).send(std::move(message), *this, timeout_);
}

void RpcChannelImpl::incStat(bool success) {
Expand Down
3 changes: 1 addition & 2 deletions source/common/grpc/rpc_channel_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class RpcChannelImpl : public RpcChannel, public Http::AsyncClient::Callbacks {

Upstream::ClusterManager& cm_;
const std::string cluster_;
Http::AsyncClientPtr client_;
Http::AsyncClient::RequestPtr http_request_;
Http::AsyncClient::Request* http_request_{};
const proto::MethodDescriptor* grpc_method_{};
proto::Message* grpc_response_{};
RpcChannelCallbacks& callbacks_;
Expand Down
81 changes: 48 additions & 33 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,47 @@

namespace Http {

const Http::HeaderMapImpl AsyncRequestImpl::SERVICE_UNAVAILABLE_HEADER{
{Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::ServiceUnavailable))}};
const HeaderMapImpl AsyncRequestImpl::SERVICE_UNAVAILABLE_HEADER{
{Headers::get().Status, std::to_string(enumToInt(Code::ServiceUnavailable))}};

const Http::HeaderMapImpl AsyncRequestImpl::REQUEST_TIMEOUT_HEADER{
{Http::Headers::get().Status, std::to_string(enumToInt(Http::Code::GatewayTimeout))}};
const HeaderMapImpl AsyncRequestImpl::REQUEST_TIMEOUT_HEADER{
{Headers::get().Status, std::to_string(enumToInt(Code::GatewayTimeout))}};

AsyncClientImpl::AsyncClientImpl(ConnectionPool::Instance& conn_pool, const std::string& cluster,
Stats::Store& stats_store, Event::Dispatcher& dispatcher)
: conn_pool_(conn_pool), stat_prefix_(fmt::format("cluster.{}.", cluster)),
stats_store_(stats_store), dispatcher_(dispatcher) {}
AsyncClientImpl::AsyncClientImpl(const Upstream::Cluster& cluster,
AsyncClientConnPoolFactory& factory, Stats::Store& stats_store,
Event::Dispatcher& dispatcher)
: cluster_(cluster), factory_(factory), stats_store_(stats_store), dispatcher_(dispatcher),
stat_prefix_(fmt::format("cluster.{}.", cluster.name())) {}

AsyncClientImpl::~AsyncClientImpl() { ASSERT(active_requests_.empty()); }

AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
ConnectionPool::Instance* conn_pool = factory_.connPool();
if (!conn_pool) {
callbacks.onFailure(AsyncClient::FailureReason::Reset);
return nullptr;
}

AsyncClient::RequestPtr AsyncClientImpl::send(MessagePtr&& request,
AsyncClient::Callbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncRequestImpl> new_request{
new AsyncRequestImpl(std::move(request), *this, callbacks, dispatcher_, timeout)};
new AsyncRequestImpl(std::move(request), *this, callbacks, dispatcher_, *conn_pool, timeout)};

// The request may get immediately failed. If so, we will return nullptr.
if (new_request->stream_encoder_) {
return std::move(new_request);
new_request->moveIntoList(std::move(new_request), active_requests_);
return active_requests_.front().get();
} else {
return nullptr;
}
}

AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks, Event::Dispatcher& dispatcher,
ConnectionPool::Instance& conn_pool,
const Optional<std::chrono::milliseconds>& timeout)
: request_(std::move(request)), parent_(parent), callbacks_(callbacks) {

stream_encoder_.reset(new PooledStreamEncoder(parent_.conn_pool_, *this, *this, 0, 0, *this));
stream_encoder_.reset(new PooledStreamEncoder(conn_pool, *this, *this, 0, 0, *this));
stream_encoder_->encodeHeaders(request_->headers(), !request_->body());

// We might have been immediately failed.
Expand Down Expand Up @@ -66,9 +76,9 @@ void AsyncRequestImpl::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
-> void { log_debug(" '{}':'{}'", key.get(), value); });
#endif

Http::CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
response_->headers(), true, EMPTY_STRING, EMPTY_STRING};
Http::CodeUtility::chargeResponseStat(info);
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
response_->headers(), true, EMPTY_STRING, EMPTY_STRING};
CodeUtility::chargeResponseStat(info);

if (end_stream) {
onComplete();
Expand Down Expand Up @@ -102,40 +112,45 @@ void AsyncRequestImpl::decodeTrailers(HeaderMapPtr&& trailers) {

void AsyncRequestImpl::onComplete() {
// TODO: Check host's canary status in addition to canary header.
Http::CodeUtility::ResponseTimingInfo info{
CodeUtility::ResponseTimingInfo info{
parent_.stats_store_, parent_.stat_prefix_, stream_encoder_->requestCompleteTime(),
response_->headers().get(Http::Headers::get().EnvoyUpstreamCanary) == "true", true,
EMPTY_STRING, EMPTY_STRING};
Http::CodeUtility::chargeResponseTiming(info);
response_->headers().get(Headers::get().EnvoyUpstreamCanary) == "true", true, EMPTY_STRING,
EMPTY_STRING};
CodeUtility::chargeResponseTiming(info);

cleanup();
callbacks_.onSuccess(std::move(response_));
cleanup();
}

void AsyncRequestImpl::onResetStream(StreamResetReason) {
Http::CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
SERVICE_UNAVAILABLE_HEADER, true, EMPTY_STRING,
EMPTY_STRING};
Http::CodeUtility::chargeResponseStat(info);
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
SERVICE_UNAVAILABLE_HEADER, true, EMPTY_STRING, EMPTY_STRING};
CodeUtility::chargeResponseStat(info);
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
cleanup();
callbacks_.onFailure(Http::AsyncClient::FailureReason::Reset);
}

void AsyncRequestImpl::onRequestTimeout() {
Http::CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
REQUEST_TIMEOUT_HEADER, true, EMPTY_STRING,
EMPTY_STRING};
Http::CodeUtility::chargeResponseStat(info);
parent_.stats_store_.counter(fmt::format("{}upstream_rq_timeout", parent_.stat_prefix_)).inc();
CodeUtility::ResponseStatInfo info{parent_.stats_store_, parent_.stat_prefix_,
REQUEST_TIMEOUT_HEADER, true, EMPTY_STRING, EMPTY_STRING};
CodeUtility::chargeResponseStat(info);
parent_.cluster_.stats().upstream_rq_timeout_.inc();
stream_encoder_->resetStream();
callbacks_.onFailure(AsyncClient::FailureReason::RequestTimemout);
cleanup();
callbacks_.onFailure(Http::AsyncClient::FailureReason::RequestTimemout);
}

void AsyncRequestImpl::cleanup() {
stream_encoder_.reset();
if (request_timeout_) {
request_timeout_->disableTimer();
}

// This will destroy us, but only do so if we are actually in a list. This does not happen in
// the immediate failure case.
if (inserted()) {
removeFromList(parent_.active_requests_);
}
}

} // Http
Loading

0 comments on commit 7654e26

Please sign in to comment.