Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 52 additions & 37 deletions library/common/http/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,54 +9,53 @@ namespace Http {
Dispatcher::DirectStreamCallbacks::DirectStreamCallbacks(envoy_stream_t stream,
envoy_observer observer,
Dispatcher& http_dispatcher)
: stream_(stream), observer_(observer), http_dispatcher_(http_dispatcher) {}
: stream_handle_(stream), observer_(observer), http_dispatcher_(http_dispatcher) {}

void Dispatcher::DirectStreamCallbacks::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
ENVOY_LOG(debug, "response headers for stream (end_stream={}):\n{}", end_stream, *headers);
if (end_stream) {
http_dispatcher_.removeStream(stream_);
}
ENVOY_LOG(debug, "[S{}] response headers for stream (end_stream={}):\n{}", stream_handle_,
end_stream, *headers);
observer_.on_headers(Utility::transformHeaders(*headers), end_stream, observer_.context);
}

void Dispatcher::DirectStreamCallbacks::onData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug, "response data for stream (length={} end_stream={})", data.length(), end_stream);
if (end_stream) {
http_dispatcher_.removeStream(stream_);
}
ENVOY_LOG(debug, "[S{}] response data for stream (length={} end_stream={})", stream_handle_,
data.length(), end_stream);
observer_.on_data(Envoy::Buffer::Utility::transformData(data), end_stream, observer_.context);
}

void Dispatcher::DirectStreamCallbacks::onTrailers(HeaderMapPtr&& trailers) {
ENVOY_LOG(debug, "response trailers for stream:\n{}", *trailers);
http_dispatcher_.removeStream(stream_);
ENVOY_LOG(debug, "[S{}] response trailers for stream:\n{}", stream_handle_, *trailers);
observer_.on_trailers(Utility::transformHeaders(*trailers), observer_.context);
}

void Dispatcher::DirectStreamCallbacks::onComplete() {
// TODO: implement me
ENVOY_LOG(debug, "[S{}] complete stream", stream_handle_);
observer_.on_complete(observer_.context);
http_dispatcher_.cleanup(stream_handle_);
}

void Dispatcher::DirectStreamCallbacks::onReset() {
http_dispatcher_.removeStream(stream_);
ENVOY_LOG(debug, "[S{}] remote reset stream", stream_handle_);
observer_.on_error({ENVOY_STREAM_RESET, envoy_nodata}, observer_.context);
}

Dispatcher::DirectStream::DirectStream(AsyncClient::Stream& underlying_stream,
Dispatcher::DirectStream::DirectStream(envoy_stream_t stream_handle,
AsyncClient::Stream& underlying_stream,
DirectStreamCallbacksPtr&& callbacks)
: underlying_stream_(underlying_stream), callbacks_(std::move(callbacks)) {}
: stream_handle_(stream_handle), underlying_stream_(underlying_stream),
callbacks_(std::move(callbacks)) {}

Dispatcher::Dispatcher(Event::Dispatcher& event_dispatcher,
Upstream::ClusterManager& cluster_manager)
: current_stream_id_(0), event_dispatcher_(event_dispatcher),
: current_stream_handle_(0), event_dispatcher_(event_dispatcher),
cluster_manager_(cluster_manager) {}

envoy_stream_t Dispatcher::startStream(envoy_observer observer) {
envoy_stream_t new_stream_id = current_stream_id_++;
envoy_stream_t new_stream_handle = current_stream_handle_++;

event_dispatcher_.post([this, observer, new_stream_id]() -> void {
event_dispatcher_.post([this, observer, new_stream_handle]() -> void {
DirectStreamCallbacksPtr callbacks =
std::make_unique<DirectStreamCallbacks>(new_stream_id, observer, *this);
std::make_unique<DirectStreamCallbacks>(new_stream_handle, observer, *this);
AsyncClient& async_client = cluster_manager_.httpAsyncClientForCluster("egress_cluster");
AsyncClient::Stream* underlying_stream = async_client.start(*callbacks, {});

Expand All @@ -65,31 +64,31 @@ envoy_stream_t Dispatcher::startStream(envoy_observer observer) {
// Take this into account when thinking about stream cancellation.
callbacks->onReset();
} else {
DirectStreamPtr direct_stream =
std::make_unique<DirectStream>(*underlying_stream, std::move(callbacks));
streams_.emplace(new_stream_id, std::move(direct_stream));
ENVOY_LOG(debug, "started stream [{}]", new_stream_id);
DirectStreamPtr direct_stream = std::make_unique<DirectStream>(
new_stream_handle, *underlying_stream, std::move(callbacks));
streams_.emplace(new_stream_handle, std::move(direct_stream));
ENVOY_LOG(debug, "[S{}] start stream", new_stream_handle);
}
});

return new_stream_id;
return new_stream_handle;
}

envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream_id, envoy_headers headers,
envoy_status_t Dispatcher::sendHeaders(envoy_stream_t stream, envoy_headers headers,
bool end_stream) {
event_dispatcher_.post([this, stream_id, headers, end_stream]() -> void {
DirectStream* direct_stream = getStream(stream_id);
event_dispatcher_.post([this, stream, headers, end_stream]() -> void {
DirectStream* direct_stream = getStream(stream);
// If direct_stream is not found, it means the stream has already closed or been reset
// and the appropriate callback has been issued to the caller. There's nothing to do here
// except silently swallow this.
// TODO: handle potential race condition with cancellation or failure get a stream in the
// first place. Additionally it is possible to get a nullptr due to bogus stream_id
// first place. Additionally it is possible to get a nullptr due to bogus envoy_stream_t
// from the caller.
// https://github.com/lyft/envoy-mobile/issues/301
if (direct_stream != nullptr) {
direct_stream->headers_ = Utility::transformHeaders(headers);
ENVOY_LOG(debug, "request headers for stream [{}] (end_stream={}):\n{}", stream_id,
end_stream, *direct_stream->headers_);
ENVOY_LOG(debug, "[S{}] request headers for stream (end_stream={}):\n{}", stream, end_stream,
*direct_stream->headers_);
direct_stream->underlying_stream_.sendHeaders(*direct_stream->headers_, end_stream);
}
});
Expand All @@ -103,19 +102,35 @@ envoy_status_t Dispatcher::sendMetadata(envoy_stream_t, envoy_headers, bool) {
return ENVOY_FAILURE;
}
envoy_status_t Dispatcher::sendTrailers(envoy_stream_t, envoy_headers) { return ENVOY_FAILURE; }
envoy_status_t Dispatcher::locallyCloseStream(envoy_stream_t) { return ENVOY_FAILURE; }
envoy_status_t Dispatcher::resetStream(envoy_stream_t) { return ENVOY_FAILURE; }

Dispatcher::DirectStream* Dispatcher::getStream(envoy_stream_t stream_id) {
envoy_status_t Dispatcher::resetStream(envoy_stream_t stream) {
event_dispatcher_.post([this, stream]() -> void {
DirectStream* direct_stream = getStream(stream);
if (direct_stream) {
direct_stream->underlying_stream_.reset();
}
});
return ENVOY_SUCCESS;
}

Dispatcher::DirectStream* Dispatcher::getStream(envoy_stream_t stream) {
ASSERT(event_dispatcher_.isThreadSafe(),
"stream interaction must be performed on the event_dispatcher_'s thread.");
auto direct_stream_pair_it = streams_.find(stream_id);
auto direct_stream_pair_it = streams_.find(stream);
return (direct_stream_pair_it != streams_.end()) ? direct_stream_pair_it->second.get() : nullptr;
}

// TODO: implement. Note: the stream might not be in the map if for example startStream called
// onReset due to its inability to get an underlying stream.
envoy_status_t Dispatcher::removeStream(envoy_stream_t) { return ENVOY_FAILURE; }
void Dispatcher::cleanup(envoy_stream_t stream_handle) {
DirectStream* direct_stream = getStream(stream_handle);

RELEASE_ASSERT(direct_stream,
"cleanup is a private method that is only called with stream ids that exist");

// TODO: think about thread safety of deleting the DirectStream immediately.
size_t erased = streams_.erase(stream_handle);
ASSERT(erased == 1, "cleanup should always remove one entry from the streams map");
ENVOY_LOG(debug, "[S{}] remove stream", stream_handle);
}

} // namespace Http
} // namespace Envoy
19 changes: 8 additions & 11 deletions library/common/http/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
envoy_status_t sendData(envoy_stream_t stream, envoy_headers headers, bool end_stream);
envoy_status_t sendMetadata(envoy_stream_t stream, envoy_headers headers, bool end_stream);
envoy_status_t sendTrailers(envoy_stream_t stream, envoy_headers headers);
envoy_status_t locallyCloseStream(envoy_stream_t stream);
// TODO: when implementing this function we have to make sure to prevent races with already
// scheduled and potentially scheduled callbacks. In order to do so the platform callbacks need to
// check for atomic state (boolean most likely) that will be updated here to mark the stream as
// closed.
envoy_status_t resetStream(envoy_stream_t stream);
envoy_status_t removeStream(envoy_stream_t stream);

private:
/**
Expand All @@ -55,7 +53,7 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
class DirectStreamCallbacks : public AsyncClient::StreamCallbacks,
public Logger::Loggable<Logger::Id::http> {
public:
DirectStreamCallbacks(envoy_stream_t stream, envoy_observer observer,
DirectStreamCallbacks(envoy_stream_t stream_handle, envoy_observer observer,
Dispatcher& http_dispatcher);

// AsyncClient::StreamCallbacks
Expand All @@ -66,7 +64,7 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
void onReset() override;

private:
const envoy_stream_t stream_;
const envoy_stream_t stream_handle_;
const envoy_observer observer_;
Dispatcher& http_dispatcher_;
};
Expand All @@ -78,13 +76,11 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
* AsyncClient::Stream and in the incoming direction via DirectStreamCallbacks.
*/
class DirectStream {
// TODO: Bookkeeping for this class is insufficient to fully cover all cases necessary to
// track the lifecycle of the underlying_stream_. One way or another, we must fix this
// to prevent bugs in the future. (Enhanced internal bookkeeping is probably good enough,
// but other options include upstream modifications to AsyncClient and friends.
public:
DirectStream(AsyncClient::Stream& underlying_stream, DirectStreamCallbacksPtr&& callbacks);
DirectStream(envoy_stream_t stream_handle, AsyncClient::Stream& underlying_stream,
DirectStreamCallbacksPtr&& callbacks);

const envoy_stream_t stream_handle_;
// Used to issue outgoing HTTP stream operations.
AsyncClient::Stream& underlying_stream_;
// Used to receive incoming HTTP stream operations.
Expand All @@ -102,10 +98,11 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {

// Everything in the below interface must only be accessed from the event_dispatcher's thread.
// This allows us to generally avoid synchronization.
DirectStream* getStream(envoy_stream_t stream_id);
DirectStream* getStream(envoy_stream_t stream_handle);
void cleanup(envoy_stream_t stream_handle);

std::unordered_map<envoy_stream_t, DirectStreamPtr> streams_;
std::atomic<envoy_stream_t> current_stream_id_;
std::atomic<envoy_stream_t> current_stream_handle_;
// The event_dispatcher is the only member state that may be accessed from a thread other than
// the event_dispatcher's own thread.
Event::Dispatcher& event_dispatcher_;
Expand Down
8 changes: 6 additions & 2 deletions library/common/include/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ typedef void (*envoy_on_trailers_f)(envoy_headers trailers, void* context);
*/
typedef void (*envoy_on_error_f)(envoy_error error, void* context);

// FIXME comments
/**
* Called when the async HTTP stream has completed without an error bi-directionally.
* @param context, contains the necessary state to carry out platform-specific dispatch and
* execution.
*/
typedef void (*envoy_on_complete_f)(void* context);

#ifdef __cplusplus
Expand All @@ -167,7 +171,7 @@ typedef struct {
envoy_on_data_f on_data;
envoy_on_metadata_f on_metadata;
envoy_on_trailers_f on_trailers;
envoy_on_complete_f on_complete;
envoy_on_error_f on_error;
envoy_on_complete_f on_complete;
void* context; // Will be passed through to callbacks to provide dispatch and execution state.
} envoy_observer;
8 changes: 4 additions & 4 deletions library/common/main_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ envoy_stream start_stream(envoy_observer observer) {
return {ENVOY_SUCCESS, http_dispatcher_->startStream(observer)};
}

envoy_status_t send_headers(envoy_stream_t stream_id, envoy_headers headers, bool end_stream) {
return http_dispatcher_->sendHeaders(stream_id, headers, end_stream);
envoy_status_t send_headers(envoy_stream_t stream, envoy_headers headers, bool end_stream) {
return http_dispatcher_->sendHeaders(stream, headers, end_stream);
}

// TODO: implement.
envoy_status_t send_data(envoy_stream_t, envoy_data, bool) { return ENVOY_FAILURE; }
envoy_status_t send_metadata(envoy_stream_t, envoy_headers) { return ENVOY_FAILURE; }
envoy_status_t send_trailers(envoy_stream_t, envoy_headers) { return ENVOY_FAILURE; }
envoy_status_t locally_close_stream(envoy_stream_t) { return ENVOY_FAILURE; }
envoy_status_t reset_stream(envoy_stream_t) { return ENVOY_FAILURE; }

envoy_status_t reset_stream(envoy_stream_t stream) { return http_dispatcher_->resetStream(stream); }

/*
* Setup envoy for interaction via the main interface.
Expand Down
8 changes: 0 additions & 8 deletions library/common/main_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ envoy_status_t send_metadata(envoy_stream_t stream, envoy_headers metadata);
*/
envoy_status_t send_trailers(envoy_stream_t stream, envoy_headers trailers);

/**
* Half-close an HTTP stream. The stream will be observable and may return further data
* via the observer callbacks. However, nothing further may be sent.
* @param stream, the stream to close.
* @return envoy_status_t, the resulting status of the operation.
*/
envoy_status_t locally_close_stream(envoy_stream_t stream);

/**
* Detach all observers from a stream and send an interrupt upstream if supported by transport.
* @param stream, the stream to evict.
Expand Down
17 changes: 17 additions & 0 deletions test/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ load("@envoy//bazel:envoy_build_system.bzl", "envoy_cc_test", "envoy_package")

envoy_package()

envoy_cc_test(
name = "dispatcher_test",
srcs = ["dispatcher_test.cc"],
repository = "@envoy",
deps = [
"//library/common/http:dispatcher_lib",
"//library/common/http:header_utility_lib",
"//library/common/include:c_types_interface",
"@envoy//source/common/http:async_client_lib",
"@envoy//source/common/http:context_lib",
"@envoy//test/common/http:common_lib",
"@envoy//test/mocks/event:event_mocks",
"@envoy//test/mocks/local_info:local_info_mocks",
"@envoy//test/mocks/upstream:upstream_mocks",
],
)

envoy_cc_test(
name = "header_utility_test",
srcs = ["header_utility_test.cc"],
Expand Down
Loading