diff --git a/library/cc/BUILD b/library/cc/BUILD index 0bdc992981..92eb4209e7 100644 --- a/library/cc/BUILD +++ b/library/cc/BUILD @@ -10,6 +10,7 @@ envoy_cc_library( "bridge_utility.cc", "engine.cc", "engine_builder.cc", + "engine_callbacks.cc", "headers.cc", "headers_builder.cc", "log_level.cc", @@ -33,6 +34,7 @@ envoy_cc_library( "bridge_utility.h", "engine.h", "engine_builder.h", + "engine_callbacks.h", "envoy_error.h", "headers.h", "headers_builder.h", diff --git a/library/cc/engine.cc b/library/cc/engine.cc index cb368d4146..17007cf6c2 100644 --- a/library/cc/engine.cc +++ b/library/cc/engine.cc @@ -6,22 +6,17 @@ namespace Envoy { namespace Platform { -Engine::Engine(envoy_engine_t engine, const std::string& configuration, LogLevel log_level) - : engine_(engine), terminated_(false) { - run_engine(this->engine_, configuration.c_str(), logLevelToString(log_level).c_str()); - - this->stream_client_ = std::make_shared(this->engine_); - this->pulse_client_ = std::make_shared(); -} - -Engine::~Engine() { - if (!this->terminated_) { - terminate_engine(this->engine_); - } +Engine::Engine(envoy_engine_t engine) : engine_(engine), terminated_(false) {} + +// we lazily construct the stream and pulse clients +// because they either require or will require a weak ptr +// which can't be provided from inside of the constructor +// because of how std::enable_shared_from_this works +StreamClientSharedPtr Engine::streamClient() { + return std::make_shared(this->shared_from_this()); } -StreamClientSharedPtr Engine::streamClient() { return this->stream_client_; } -PulseClientSharedPtr Engine::pulseClient() { return this->pulse_client_; } +PulseClientSharedPtr Engine::pulseClient() { return std::make_shared(); } void Engine::terminate() { if (this->terminated_) { diff --git a/library/cc/engine.h b/library/cc/engine.h index f352515e39..e1c6768eb0 100644 --- a/library/cc/engine.h +++ b/library/cc/engine.h @@ -10,30 +10,25 @@ namespace Envoy { namespace Platform { -struct EngineCallbacks { - std::function on_engine_running; - // unused: - // std::function on_exit; -}; - -using EngineCallbacksSharedPtr = std::shared_ptr; +class StreamClient; +using StreamClientSharedPtr = std::shared_ptr; -class Engine { +class Engine : public std::enable_shared_from_this { public: - ~Engine(); - StreamClientSharedPtr streamClient(); PulseClientSharedPtr pulseClient(); void terminate(); private: - Engine(envoy_engine_t engine, const std::string& configuration, LogLevel log_level); + Engine(envoy_engine_t engine); + // required to access private constructor friend class EngineBuilder; + // required to use envoy_engine_t without exposing it publicly + friend class StreamPrototype; envoy_engine_t engine_; - EngineCallbacksSharedPtr callbacks_; StreamClientSharedPtr stream_client_; PulseClientSharedPtr pulse_client_; bool terminated_; diff --git a/library/cc/engine_builder.cc b/library/cc/engine_builder.cc index fbfa60c112..d04c97e503 100644 --- a/library/cc/engine_builder.cc +++ b/library/cc/engine_builder.cc @@ -5,21 +5,6 @@ namespace Envoy { namespace Platform { -namespace { - -void c_on_engine_running(void* context) { - EngineCallbacks* engine_callbacks = static_cast(context); - engine_callbacks->on_engine_running(); -} - -void c_on_exit(void* context) { - // NOTE: this function is intentionally empty - // as we don't actually do any post-processing on exit. - (void)context; -} - -} // namespace - EngineBuilder::EngineBuilder(std::string config_template) : config_template_(config_template) {} EngineBuilder::EngineBuilder() : EngineBuilder(std::string(config_template)) {} @@ -120,16 +105,14 @@ EngineSharedPtr EngineBuilder::build() { .release = envoy_noop_const_release, .context = nullptr, }; - - envoy_engine_callbacks envoy_callbacks{ - .on_engine_running = &c_on_engine_running, - .on_exit = &c_on_exit, - .context = this->callbacks_.get(), - }; - - Engine* engine = - new Engine(init_engine(envoy_callbacks, null_logger), config_str, this->log_level_); - return EngineSharedPtr(engine); + auto envoy_engine = init_engine(this->callbacks_->asEnvoyEngineCallbacks(), null_logger); + run_engine(envoy_engine, config_str.c_str(), logLevelToString(this->log_level_).c_str()); + + // we can't construct via std::make_shared + // because Engine is only constructible as a friend + Engine* engine = new Engine(envoy_engine); + auto engine_ptr = EngineSharedPtr(engine); + return engine_ptr; } } // namespace Platform diff --git a/library/cc/engine_builder.h b/library/cc/engine_builder.h index d352e5231d..ea5eb45458 100644 --- a/library/cc/engine_builder.h +++ b/library/cc/engine_builder.h @@ -4,6 +4,7 @@ #include #include "engine.h" +#include "engine_callbacks.h" #include "log_level.h" namespace Envoy { diff --git a/library/cc/engine_callbacks.cc b/library/cc/engine_callbacks.cc new file mode 100644 index 0000000000..04288d8bd5 --- /dev/null +++ b/library/cc/engine_callbacks.cc @@ -0,0 +1,29 @@ +#include "engine_callbacks.h" + +namespace Envoy { +namespace Platform { + +namespace { + +void c_on_engine_running(void* context) { + auto engine_callbacks = *static_cast(context); + engine_callbacks->on_engine_running(); +} + +void c_on_exit(void* context) { + auto engine_callbacks_ptr = static_cast(context); + delete engine_callbacks_ptr; +} + +} // namespace + +envoy_engine_callbacks EngineCallbacks::asEnvoyEngineCallbacks() { + return envoy_engine_callbacks{ + .on_engine_running = &c_on_engine_running, + .on_exit = &c_on_exit, + .context = new EngineCallbacksSharedPtr(this->shared_from_this()), + }; +} + +} // namespace Platform +} // namespace Envoy diff --git a/library/cc/engine_callbacks.h b/library/cc/engine_callbacks.h new file mode 100644 index 0000000000..61c1d196f4 --- /dev/null +++ b/library/cc/engine_callbacks.h @@ -0,0 +1,23 @@ +#pragma once + +#include +#include + +#include "engine.h" +#include "library/common/types/c_types.h" + +namespace Envoy { +namespace Platform { + +struct EngineCallbacks : public std::enable_shared_from_this { + std::function on_engine_running; + // unused: + // std::function on_exit; + + envoy_engine_callbacks asEnvoyEngineCallbacks(); +}; + +using EngineCallbacksSharedPtr = std::shared_ptr; + +} // namespace Platform +} // namespace Envoy diff --git a/library/cc/stream.cc b/library/cc/stream.cc index a4151624a5..0fa930f1e1 100644 --- a/library/cc/stream.cc +++ b/library/cc/stream.cc @@ -1,7 +1,5 @@ #include "stream.h" -#include - #include "bridge_utility.h" #include "library/common/main_interface.h" #include "library/common/types/c_types.h" @@ -9,8 +7,7 @@ namespace Envoy { namespace Platform { -Stream::Stream(envoy_stream_t handle, StreamCallbacksSharedPtr callbacks) - : handle_(handle), callbacks_(callbacks) {} +Stream::Stream(envoy_stream_t handle) : handle_(handle) {} Stream& Stream::sendHeaders(RequestHeadersSharedPtr headers, bool end_stream) { envoy_headers raw_headers = rawHeaderMapAsEnvoyHeaders(headers->allHeaders()); diff --git a/library/cc/stream.h b/library/cc/stream.h index d85aba5a15..a407a592d1 100644 --- a/library/cc/stream.h +++ b/library/cc/stream.h @@ -10,9 +10,12 @@ namespace Envoy { namespace Platform { +class Engine; +using EngineSharedPtr = std::shared_ptr; + class Stream { public: - Stream(envoy_stream_t handle, StreamCallbacksSharedPtr callbacks); + Stream(envoy_stream_t handle); Stream& sendHeaders(RequestHeadersSharedPtr headers, bool end_stream); Stream& sendData(envoy_data data); @@ -22,7 +25,6 @@ class Stream { private: envoy_stream_t handle_; - StreamCallbacksSharedPtr callbacks_; }; using StreamSharedPtr = std::shared_ptr; diff --git a/library/cc/stream_callbacks.cc b/library/cc/stream_callbacks.cc index 1ffcce1e2a..e251d0c680 100644 --- a/library/cc/stream_callbacks.cc +++ b/library/cc/stream_callbacks.cc @@ -10,7 +10,7 @@ namespace Platform { namespace { void* c_on_headers(envoy_headers headers, bool end_stream, void* context) { - auto stream_callbacks = static_cast(context); + auto stream_callbacks = *static_cast(context); if (stream_callbacks->on_headers.has_value()) { auto raw_headers = envoyHeadersAsRawHeaderMap(headers); ResponseHeadersBuilder builder; @@ -27,7 +27,7 @@ void* c_on_headers(envoy_headers headers, bool end_stream, void* context) { } void* c_on_data(envoy_data data, bool end_stream, void* context) { - auto stream_callbacks = static_cast(context); + auto stream_callbacks = *static_cast(context); if (stream_callbacks->on_data.has_value()) { auto on_data = stream_callbacks->on_data.value(); on_data(data, end_stream); @@ -36,7 +36,7 @@ void* c_on_data(envoy_data data, bool end_stream, void* context) { } void* c_on_trailers(envoy_headers metadata, void* context) { - auto stream_callbacks = static_cast(context); + auto stream_callbacks = *static_cast(context); if (stream_callbacks->on_trailers.has_value()) { auto raw_headers = envoyHeadersAsRawHeaderMap(metadata); ResponseTrailersBuilder builder; @@ -50,7 +50,8 @@ void* c_on_trailers(envoy_headers metadata, void* context) { } void* c_on_error(envoy_error raw_error, void* context) { - auto stream_callbacks = static_cast(context); + auto stream_callbacks_ptr = static_cast(context); + auto stream_callbacks = *stream_callbacks_ptr; if (stream_callbacks->on_error.has_value()) { EnvoyErrorSharedPtr error = std::make_shared(); error->error_code = raw_error.error_code; @@ -61,25 +62,30 @@ void* c_on_error(envoy_error raw_error, void* context) { auto on_error = stream_callbacks->on_error.value(); on_error(error); } - return context; + delete stream_callbacks_ptr; + return nullptr; } void* c_on_complete(void* context) { - auto stream_callbacks = static_cast(context); + auto stream_callbacks_ptr = static_cast(context); + auto stream_callbacks = *stream_callbacks_ptr; if (stream_callbacks->on_complete.has_value()) { auto on_complete = stream_callbacks->on_complete.value(); on_complete(); } - return context; + delete stream_callbacks_ptr; + return nullptr; } void* c_on_cancel(void* context) { - auto stream_callbacks = static_cast(context); + auto stream_callbacks_ptr = static_cast(context); + auto stream_callbacks = *stream_callbacks_ptr; if (stream_callbacks->on_cancel.has_value()) { auto on_cancel = stream_callbacks->on_cancel.value(); on_cancel(); } - return context; + delete stream_callbacks_ptr; + return nullptr; } } // namespace @@ -93,7 +99,7 @@ envoy_http_callbacks StreamCallbacks::asEnvoyHttpCallbacks() { .on_error = &c_on_error, .on_complete = &c_on_complete, .on_cancel = &c_on_cancel, - .context = this, + .context = new StreamCallbacksSharedPtr(this->shared_from_this()), }; } diff --git a/library/cc/stream_callbacks.h b/library/cc/stream_callbacks.h index bbe097fba4..8c33330eae 100644 --- a/library/cc/stream_callbacks.h +++ b/library/cc/stream_callbacks.h @@ -9,10 +9,14 @@ #include "library/common/types/c_types.h" #include "response_headers.h" #include "response_trailers.h" +#include "stream.h" namespace Envoy { namespace Platform { +class Stream; +using StreamSharedPtr = std::shared_ptr; + using OnHeadersCallback = std::function; using OnDataCallback = std::function; using OnTrailersCallback = std::function; @@ -20,7 +24,7 @@ using OnErrorCallback = std::function; using OnCompleteCallback = std::function; using OnCancelCallback = std::function; -struct StreamCallbacks { +struct StreamCallbacks : public std::enable_shared_from_this { absl::optional on_headers; absl::optional on_data; absl::optional on_trailers; diff --git a/library/cc/stream_client.cc b/library/cc/stream_client.cc index 6837992617..3f5991b628 100644 --- a/library/cc/stream_client.cc +++ b/library/cc/stream_client.cc @@ -3,7 +3,7 @@ namespace Envoy { namespace Platform { -StreamClient::StreamClient(envoy_engine_t engine) : engine_(engine) {} +StreamClient::StreamClient(EngineSharedPtr engine) : engine_(engine) {} StreamPrototypeSharedPtr StreamClient::newStreamPrototype() { return std::make_shared(this->engine_); diff --git a/library/cc/stream_client.h b/library/cc/stream_client.h index 52c119f0f4..ad3e8a4eec 100644 --- a/library/cc/stream_client.h +++ b/library/cc/stream_client.h @@ -2,19 +2,26 @@ #include +#include "engine.h" #include "stream_prototype.h" namespace Envoy { namespace Platform { +class Engine; +using EngineSharedPtr = std::shared_ptr; + +class StreamPrototype; +using StreamPrototypeSharedPtr = std::shared_ptr; + class StreamClient { public: - StreamClient(envoy_engine_t engine); + StreamClient(EngineSharedPtr engine); StreamPrototypeSharedPtr newStreamPrototype(); private: - envoy_engine_t engine_; + EngineSharedPtr engine_; }; using StreamClientSharedPtr = std::shared_ptr; diff --git a/library/cc/stream_prototype.cc b/library/cc/stream_prototype.cc index 2e74225609..52daa8023b 100644 --- a/library/cc/stream_prototype.cc +++ b/library/cc/stream_prototype.cc @@ -5,15 +5,14 @@ namespace Envoy { namespace Platform { -StreamPrototype::StreamPrototype(envoy_engine_t engine) : engine_(engine) { +StreamPrototype::StreamPrototype(EngineSharedPtr engine) : engine_(engine) { this->callbacks_ = std::make_shared(); } StreamSharedPtr StreamPrototype::start() { - auto stream = init_stream(this->engine_); - start_stream(stream, this->callbacks_->asEnvoyHttpCallbacks()); - - return std::make_shared(stream, this->callbacks_); + auto envoy_stream = init_stream(this->engine_->engine_); + start_stream(envoy_stream, this->callbacks_->asEnvoyHttpCallbacks()); + return std::make_shared(envoy_stream); } StreamPrototype& StreamPrototype::setOnHeaders(OnHeadersCallback closure) { diff --git a/library/cc/stream_prototype.h b/library/cc/stream_prototype.h index a75f6fd22c..5642cf4b90 100644 --- a/library/cc/stream_prototype.h +++ b/library/cc/stream_prototype.h @@ -2,6 +2,7 @@ #include +#include "engine.h" #include "envoy_error.h" #include "library/common/types/c_types.h" #include "response_headers.h" @@ -12,9 +13,12 @@ namespace Envoy { namespace Platform { +class Engine; +using EngineSharedPtr = std::shared_ptr; + class StreamPrototype { public: - StreamPrototype(envoy_engine_t engine); + StreamPrototype(EngineSharedPtr engine); StreamSharedPtr start(); @@ -26,7 +30,7 @@ class StreamPrototype { StreamPrototype& setOnCancel(OnCancelCallback closure); private: - envoy_engine_t engine_; + EngineSharedPtr engine_; StreamCallbacksSharedPtr callbacks_; }; diff --git a/library/python/BUILD b/library/python/BUILD index a7516aac31..55cebd4f0a 100644 --- a/library/python/BUILD +++ b/library/python/BUILD @@ -76,7 +76,7 @@ py_wheel( }), python_tag = python_tag(), strip_path_prefixes = ["library/python"], - version = "0.0.1a0", + version = "0.0.1a1", deps = [ ":envoy_engine.so", ":envoy_requests", diff --git a/library/python/module_definition.cc b/library/python/module_definition.cc index 92bbcaef9a..37b8bab69f 100644 --- a/library/python/module_definition.cc +++ b/library/python/module_definition.cc @@ -44,7 +44,7 @@ PYBIND11_MODULE(envoy_engine, m) { py::class_(m, "Engine") .def("stream_client", &Engine::streamClient) .def("pulse_client", &Engine::pulseClient) - .def("terminate", &Engine::terminate); + .def("terminate", &Engine::terminate, py::call_guard()); py::class_(m, "EngineBuilder") .def(py::init()) diff --git a/test/cc/integration/BUILD b/test/cc/integration/BUILD index 0ab3e7d997..58e66f3aba 100644 --- a/test/cc/integration/BUILD +++ b/test/cc/integration/BUILD @@ -12,3 +12,12 @@ envoy_cc_test( "//library/cc:envoy_engine_cc_lib_no_stamp", ], ) + +envoy_cc_test( + name = "lifetimes_test", + srcs = ["lifetimes_test.cc"], + repository = "@envoy", + deps = [ + "//library/cc:envoy_engine_cc_lib_no_stamp", + ], +) diff --git a/test/cc/integration/lifetimes_test.cc b/test/cc/integration/lifetimes_test.cc new file mode 100644 index 0000000000..e6d95f3fc3 --- /dev/null +++ b/test/cc/integration/lifetimes_test.cc @@ -0,0 +1,108 @@ +#include "absl/synchronization/notification.h" +#include "gtest/gtest.h" +#include "library/cc/engine.h" +#include "library/cc/engine_builder.h" +#include "library/cc/envoy_error.h" +#include "library/cc/log_level.h" +#include "library/cc/request_headers_builder.h" +#include "library/cc/request_method.h" +#include "library/cc/response_headers.h" + +namespace Envoy { +namespace { + +const static std::string CONFIG_TEMPLATE = "\ +static_resources:\n\ + listeners:\n\ + - name: base_api_listener\n\ + address:\n\ + socket_address:\n\ + protocol: TCP\n\ + address: 0.0.0.0\n\ + port_value: 10000\n\ + api_listener:\n\ + api_listener:\n\ + \"@type\": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager\n\ + stat_prefix: hcm\n\ + route_config:\n\ + name: api_router\n\ + virtual_hosts:\n\ + - name: api\n\ + domains:\n\ + - \"*\"\n\ + routes:\n\ + - match:\n\ + prefix: \"/\"\n\ + direct_response:\n\ + status: 200\n\ + http_filters:\n\ + - name: envoy.filters.http.assertion\n\ + typed_config:\n\ + \"@type\": type.googleapis.com/envoymobile.extensions.filters.http.assertion.Assertion\n\ + match_config:\n\ + http_request_headers_match:\n\ + headers:\n\ + - name: \":authority\"\n\ + exact_match: example.com\n\ + - name: envoy.router\n\ + typed_config:\n\ + \"@type\": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router\n\ +"; + +struct Status { + int status_code; + bool end_stream; +}; + +void sendRequest(Platform::EngineSharedPtr engine, Status& status, + absl::Notification& stream_complete) { + auto stream_prototype = engine->streamClient()->newStreamPrototype(); + auto stream = (*stream_prototype) + .setOnHeaders([&](Platform::ResponseHeadersSharedPtr headers, bool end_stream) { + status.status_code = headers->httpStatus(); + status.end_stream = end_stream; + }) + .setOnComplete([&]() { stream_complete.Notify(); }) + .setOnError([&](Platform::EnvoyErrorSharedPtr envoy_error) { + (void)envoy_error; + stream_complete.Notify(); + }) + .setOnCancel([&]() { stream_complete.Notify(); }) + .start(); + + auto request_headers = + Platform::RequestHeadersBuilder(Platform::RequestMethod::GET, "https", "example.com", "/") + .build(); + stream->sendHeaders(std::make_shared(request_headers), true); +} + +void sendRequestEndToEnd() { + Platform::EngineSharedPtr engine; + absl::Notification engine_running; + auto engine_builder = Platform::EngineBuilder(CONFIG_TEMPLATE); + engine = engine_builder.addLogLevel(Platform::LogLevel::debug) + .setOnEngineRunning([&]() { engine_running.Notify(); }) + .build(); + engine_running.WaitForNotification(); + + Status status; + absl::Notification stream_complete; + sendRequest(engine, status, stream_complete); + stream_complete.WaitForNotification(); + + EXPECT_EQ(status.status_code, 200); + EXPECT_EQ(status.end_stream, true); + + engine->terminate(); +} + +// this test attempts to elicit race conditions deriving from +// the semantics of ownership on StreamCallbacks +TEST(TestLifetimes, CallbacksStayAlive) { + for (size_t i = 0; i < 10; i++) { + sendRequestEndToEnd(); + } +} + +} // namespace +} // namespace Envoy