Skip to content
Merged
42 changes: 27 additions & 15 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ namespace Http {
*/
class AsyncClient {
public:
/**
* An in-flight HTTP request.
*/
class Request {
public:
virtual ~Request() = default;

/**
* Signals that the request should be cancelled.
*/
virtual void cancel() PURE;
};

/**
* Async Client failure reasons.
*/
Expand All @@ -30,21 +43,33 @@ class AsyncClient {

/**
* Notifies caller of async HTTP request status.
*
* To support a use case where a caller makes multiple requests in parallel,
* individual callback methods provide request context corresponding to that response.
*/
class Callbacks {
public:
virtual ~Callbacks() = default;

/**
* Called when the async HTTP request succeeds.
* @param request request handle.
* NOTE: request handle is passed for correlation purposes only, e.g.
* for client code to be able to exclude that handle from a list of
* requests in progress.
* @param response the HTTP response
*/
virtual void onSuccess(ResponseMessagePtr&& response) PURE;
virtual void onSuccess(const Request& request, ResponseMessagePtr&& response) PURE;

/**
* Called when the async HTTP request fails.
* @param request request handle.
* NOTE: request handle is passed for correlation purposes only, e.g.
* for client code to be able to exclude that handle from a list of
* requests in progress.
* @param reason failure reason
*/
virtual void onFailure(FailureReason reason) PURE;
virtual void onFailure(const Request& request, FailureReason reason) PURE;
};

/**
Expand Down Expand Up @@ -92,19 +117,6 @@ class AsyncClient {
virtual void onReset() PURE;
};

/**
* An in-flight HTTP request.
*/
class Request {
public:
virtual ~Request() = default;

/**
* Signals that the request should be cancelled.
*/
virtual void cancel() PURE;
};

/**
* An in-flight HTTP stream.
*/
Expand Down
6 changes: 4 additions & 2 deletions source/common/config/remote_data_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void RemoteDataFetcher::fetch() {
DurationUtil::durationToMilliseconds(uri_.timeout()))));
}

void RemoteDataFetcher::onSuccess(Http::ResponseMessagePtr&& response) {
void RemoteDataFetcher::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& response) {
const uint64_t status_code = Http::Utility::getResponseStatus(response->headers());
if (status_code == enumToInt(Http::Code::OK)) {
ENVOY_LOG(debug, "fetch remote data [uri = {}]: success", uri_.uri());
Expand All @@ -66,7 +67,8 @@ void RemoteDataFetcher::onSuccess(Http::ResponseMessagePtr&& response) {
request_ = nullptr;
}

void RemoteDataFetcher::onFailure(Http::AsyncClient::FailureReason reason) {
void RemoteDataFetcher::onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) {
ENVOY_LOG(debug, "fetch remote data [uri = {}]: network error {}", uri_.uri(), enumToInt(reason));
request_ = nullptr;
callback_.onFailure(FailureReason::Network);
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/remote_data_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class RemoteDataFetcher : public Logger::Loggable<Logger::Id::config>,
~RemoteDataFetcher() override;

// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&& response) override;
void onFailure(Http::AsyncClient::FailureReason reason) override;
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) override;
void onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) override;

/**
* Fetch data from remote.
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ void AsyncRequestImpl::onComplete() {
response_->trailers(), streamInfo(),
Tracing::EgressConfig::get());

callbacks_.onSuccess(std::move(response_));
callbacks_.onSuccess(*this, std::move(response_));
}

void AsyncRequestImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) {
Expand Down Expand Up @@ -302,7 +302,7 @@ void AsyncRequestImpl::onReset() {

if (!cancelled_) {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(AsyncClient::FailureReason::Reset);
callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
}
}

Expand Down
8 changes: 5 additions & 3 deletions source/common/http/rest_api_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ RestApiFetcher::~RestApiFetcher() {

void RestApiFetcher::initialize() { refresh(); }

void RestApiFetcher::onSuccess(Http::ResponseMessagePtr&& response) {
void RestApiFetcher::onSuccess(const Http::AsyncClient::Request& request,
Http::ResponseMessagePtr&& response) {
uint64_t response_code = Http::Utility::getResponseStatus(response->headers());
if (response_code == enumToInt(Http::Code::NotModified)) {
requestComplete();
return;
} else if (response_code != enumToInt(Http::Code::OK)) {
onFailure(Http::AsyncClient::FailureReason::Reset);
onFailure(request, Http::AsyncClient::FailureReason::Reset);
return;
}

Expand All @@ -47,7 +48,8 @@ void RestApiFetcher::onSuccess(Http::ResponseMessagePtr&& response) {
requestComplete();
}

void RestApiFetcher::onFailure(Http::AsyncClient::FailureReason reason) {
void RestApiFetcher::onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) {
// Currently Http::AsyncClient::FailureReason only has one value: "Reset".
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/rest_api_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ class RestApiFetcher : public Http::AsyncClient::Callbacks {
void requestComplete();

// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&& response) override;
void onFailure(Http::AsyncClient::FailureReason reason) override;
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) override;
void onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) override;

Runtime::RandomGenerator& random_;
const std::chrono::milliseconds refresh_interval_;
Expand Down
4 changes: 2 additions & 2 deletions source/common/router/shadow_writer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class ShadowWriterImpl : Logger::Loggable<Logger::Id::router>,
const Http::AsyncClient::RequestOptions& options) override;

// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&&) override {}
void onFailure(Http::AsyncClient::FailureReason) override {}
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override {}
void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override {}

private:
Upstream::ClusterManager& cm_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,16 @@ void RawHttpClientImpl::check(RequestCallbacks& callbacks,
}
}

void RawHttpClientImpl::onSuccess(Http::ResponseMessagePtr&& message) {
void RawHttpClientImpl::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& message) {
callbacks_->onComplete(toResponse(std::move(message)));
span_->finishSpan();
callbacks_ = nullptr;
span_ = nullptr;
}

void RawHttpClientImpl::onFailure(Http::AsyncClient::FailureReason reason) {
void RawHttpClientImpl::onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) {
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
callbacks_->onComplete(std::make_unique<Response>(errorResponse()));
span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ class RawHttpClientImpl : public Client,
Tracing::Span&) override;

// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&& message) override;
void onFailure(Http::AsyncClient::FailureReason reason) override;
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& message) override;
void onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) override;

private:
ResponsePtr toResponse(Http::ResponseMessagePtr message);
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/filters/http/common/jwks_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JwksFetcherImpl : public JwksFetcher,
}

// HTTP async receive methods
void onSuccess(Http::ResponseMessagePtr&& response) override {
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) override {
ENVOY_LOG(trace, "{}", __func__);
complete_ = true;
const uint64_t status_code = Http::Utility::getResponseStatus(response->headers());
Expand Down Expand Up @@ -93,7 +93,8 @@ class JwksFetcherImpl : public JwksFetcher,
reset();
}

void onFailure(Http::AsyncClient::FailureReason reason) override {
void onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) override {
ENVOY_LOG(debug, "{}: fetch pubkey [uri = {}]: network error {}", __func__, uri_->uri(),
enumToInt(reason));
complete_ = true;
Expand Down
8 changes: 5 additions & 3 deletions source/extensions/filters/http/lua/lua_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ int StreamHandleWrapper::luaHttpCallAsynchronous(lua_State* state) {
return 0;
}

void StreamHandleWrapper::onSuccess(Http::ResponseMessagePtr&& response) {
void StreamHandleWrapper::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& response) {
ASSERT(state_ == State::HttpCall || state_ == State::Running);
ENVOY_LOG(debug, "async HTTP response complete");
http_request_ = nullptr;
Expand Down Expand Up @@ -341,7 +342,8 @@ void StreamHandleWrapper::onSuccess(Http::ResponseMessagePtr&& response) {
}
}

void StreamHandleWrapper::onFailure(Http::AsyncClient::FailureReason) {
void StreamHandleWrapper::onFailure(const Http::AsyncClient::Request& request,
Http::AsyncClient::FailureReason) {
ASSERT(state_ == State::HttpCall || state_ == State::Running);
ENVOY_LOG(debug, "async HTTP failure");

Expand All @@ -351,7 +353,7 @@ void StreamHandleWrapper::onFailure(Http::AsyncClient::FailureReason) {
{{Http::Headers::get().Status,
std::to_string(enumToInt(Http::Code::ServiceUnavailable))}})));
response_message->body() = std::make_unique<Buffer::OwnedImpl>("upstream failure");
onSuccess(std::move(response_message));
onSuccess(request, std::move(response_message));
}

int StreamHandleWrapper::luaHeaders(lua_State* state) {
Expand Down
8 changes: 4 additions & 4 deletions source/extensions/filters/http/lua/lua_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan
}

// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&&) override;
void onFailure(Http::AsyncClient::FailureReason) override;
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override;
void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override;

Filters::Common::Lua::Coroutine& coroutine_;
Http::HeaderMap& headers_;
Expand Down Expand Up @@ -283,8 +283,8 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan
class NoopCallbacks : public Http::AsyncClient::Callbacks {
public:
// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&&) override {}
void onFailure(Http::AsyncClient::FailureReason) override {}
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override {}
void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override {}
};

/**
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/filters/http/squash/squash_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ class AsyncClientCallbackShim : public Http::AsyncClient::Callbacks {
std::function<void(Http::AsyncClient::FailureReason)>&& on_fail)
: on_success_(on_success), on_fail_(on_fail) {}
// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&& m) override {
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& m) override {
on_success_(std::forward<Http::ResponseMessagePtr>(m));
}
void onFailure(Http::AsyncClient::FailureReason f) override { on_fail_(f); }
void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason f) override {
on_fail_(f);
}

private:
const std::function<void(Http::ResponseMessagePtr&&)> on_success_;
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/tracers/datadog/datadog_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ void TraceReporter::flushTraces() {
}
}

void TraceReporter::onFailure(Http::AsyncClient::FailureReason) {
void TraceReporter::onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) {
ENVOY_LOG(debug, "failure submitting traces to datadog agent");
driver_.tracerStats().reports_failed_.inc();
}

void TraceReporter::onSuccess(Http::ResponseMessagePtr&& http_response) {
void TraceReporter::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& http_response) {
uint64_t responseStatus = Http::Utility::getResponseStatus(http_response->headers());
if (responseStatus != enumToInt(Http::Code::OK)) {
// TODO: Consider adding retries for failed submissions.
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/tracers/datadog/datadog_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ class TraceReporter : public Http::AsyncClient::Callbacks,
TraceReporter(TraceEncoderSharedPtr encoder, Driver& driver, Event::Dispatcher& dispatcher);

// Http::AsyncClient::Callbacks.
void onSuccess(Http::ResponseMessagePtr&&) override;
void onFailure(Http::AsyncClient::FailureReason) override;
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override;
void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override;

private:
/**
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/tracers/lightstep/lightstep_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ LightStepDriver::LightStepTransporter::~LightStepTransporter() {
}
}

void LightStepDriver::LightStepTransporter::onSuccess(Http::ResponseMessagePtr&& /*response*/) {
void LightStepDriver::LightStepTransporter::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& /*response*/) {
driver_.grpc_context_.chargeStat(*driver_.cluster(), driver_.request_names_, true);
active_callback_->OnSuccess(*active_report_);
reset();
}

void LightStepDriver::LightStepTransporter::onFailure(
Http::AsyncClient::FailureReason /*failure_reason*/) {
const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason /*failure_reason*/) {
driver_.grpc_context_.chargeStat(*driver_.cluster(), driver_.request_names_, false);
active_callback_->OnFailure(*active_report_);
reset();
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/tracers/lightstep/lightstep_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ class LightStepDriver : public Common::Ot::OpenTracingDriver {
Callback& callback) noexcept override;

// Http::AsyncClient::Callbacks
void onSuccess(Http::ResponseMessagePtr&& response) override;
void onFailure(Http::AsyncClient::FailureReason failure_reason) override;
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) override;
void onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason failure_reason) override;

private:
std::unique_ptr<lightstep::BufferChain> active_report_;
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/tracers/zipkin/zipkin_tracer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,12 @@ void ReporterImpl::flushSpans() {
}
}

void ReporterImpl::onFailure(Http::AsyncClient::FailureReason) {
void ReporterImpl::onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) {
driver_.tracerStats().reports_failed_.inc();
}

void ReporterImpl::onSuccess(Http::ResponseMessagePtr&& http_response) {
void ReporterImpl::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& http_response) {
if (Http::Utility::getResponseStatus(http_response->headers()) !=
enumToInt(Http::Code::Accepted)) {
driver_.tracerStats().reports_dropped_.inc();
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/tracers/zipkin/zipkin_tracer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ class ReporterImpl : public Reporter, Http::AsyncClient::Callbacks {

// Http::AsyncClient::Callbacks.
// The callbacks below record Zipkin-span-related stats.
void onSuccess(Http::ResponseMessagePtr&&) override;
void onFailure(Http::AsyncClient::FailureReason) override;
void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override;
void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override;

/**
* Creates a heap-allocated ZipkinReporter.
Expand Down
Loading