-
Notifications
You must be signed in to change notification settings - Fork 5.5k
lua: add fire-and-forget functionality to http call #10145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
871ee1b
df9df09
541bc4b
8b5b84d
323d728
e892eaa
29464ba
b566eab
4cb0ecc
27a0f3c
11c1301
21aa51e
bf9998d
3e0f6c4
38a8a99
67aee6e
e5bee45
ac562dd
aa04eb8
d838492
7b234f3
f4cd746
d6e452b
d022924
5ad50d1
55d1682
1ebaac1
f279ad2
1594207
9945ac5
1d4ba85
6dcdcf5
8459416
d190e36
9a1dbd7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -244,6 +244,23 @@ data to send. *timeout* is an integer that specifies the call timeout in millise | |
| Returns *headers* which is a table of response headers. Returns *body* which is the string response | ||
| body. May be nil if there is no body. | ||
|
|
||
| httpCallAsync() | ||
| ^^^^^^^^^^^^^^^ | ||
|
|
||
| .. code-block:: lua | ||
|
|
||
| headers, body = handle:httpCallAsync(cluster, headers, body, timeout) | ||
|
|
||
| Makes an HTTP call to an upstream host. Same behavior as httpCall, except that Envoy will fire and forget. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest spelling out what fire and forget means here wrt continuation of the filter chain |
||
|
|
||
| *cluster* is a string which maps to a configured cluster manager cluster. *headers* | ||
| is a table of key/value pairs to send (the value can be a string or table of strings). Note that | ||
| the *:method*, *:path*, and *:authority* headers must be set. *body* is an optional string of body | ||
| data to send. *timeout* is an integer that specifies the call timeout in milliseconds. | ||
|
|
||
| Returns *headers* which is a table of response headers. Returns *body* which is the string response | ||
| body. May be nil if there is no body. | ||
|
|
||
| respond() | ||
| ^^^^^^^^^^ | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,10 @@ StreamHandleWrapper::StreamHandleWrapper(Filters::Common::Lua::Coroutine& corout | |
| if (state_ == State::Running) { | ||
| throw Filters::Common::Lua::LuaException("script performed an unexpected yield"); | ||
| } | ||
| }) {} | ||
| }), | ||
| fireAndForgetWriter_(new FireAndForgetWriter(filter)) {} | ||
|
|
||
| StreamHandleWrapper::~StreamHandleWrapper() { delete fireAndForgetWriter_; } | ||
|
|
||
| Http::FilterHeadersStatus StreamHandleWrapper::start(int function_ref) { | ||
| // We are on the top of the stack. | ||
|
|
@@ -116,7 +119,7 @@ int StreamHandleWrapper::luaRespond(lua_State* state) { | |
| size_t body_size; | ||
| const char* raw_body = luaL_optlstring(state, 3, nullptr, &body_size); | ||
| auto headers = std::make_unique<Http::ResponseHeaderMapImpl>(); | ||
| buildHeadersFromTable(*headers, state, 2); | ||
| LuaFilterUtil::buildHeadersFromTable(*headers, state, 2); | ||
|
|
||
| uint64_t status; | ||
| if (headers->Status() == nullptr || | ||
|
|
@@ -138,71 +141,9 @@ int StreamHandleWrapper::luaRespond(lua_State* state) { | |
| return lua_yield(state, 0); | ||
| } | ||
|
|
||
| void StreamHandleWrapper::buildHeadersFromTable(Http::HeaderMap& headers, lua_State* state, | ||
| int table_index) { | ||
| // Build a header map to make the request. We iterate through the provided table to do this and | ||
| // check that we are getting strings. | ||
| lua_pushnil(state); | ||
| while (lua_next(state, table_index) != 0) { | ||
| // Uses 'key' (at index -2) and 'value' (at index -1). | ||
| const char* key = luaL_checkstring(state, -2); | ||
| // Check if the current value is a table, we iterate through the table and add each element of | ||
| // it as a header entry value for the current key. | ||
| if (lua_istable(state, -1)) { | ||
| lua_pushnil(state); | ||
| while (lua_next(state, -2) != 0) { | ||
| const char* value = luaL_checkstring(state, -1); | ||
| headers.addCopy(Http::LowerCaseString(key), value); | ||
| lua_pop(state, 1); | ||
| } | ||
| } else { | ||
| const char* value = luaL_checkstring(state, -1); | ||
| headers.addCopy(Http::LowerCaseString(key), value); | ||
| } | ||
| // Removes 'value'; keeps 'key' for next iteration. This is the input for lua_next() so that | ||
| // it can push the next key/value pair onto the stack. | ||
| lua_pop(state, 1); | ||
| } | ||
| } | ||
|
|
||
| int StreamHandleWrapper::luaHttpCall(lua_State* state) { | ||
| ASSERT(state_ == State::Running); | ||
|
|
||
| const std::string cluster = luaL_checkstring(state, 2); | ||
| luaL_checktype(state, 3, LUA_TTABLE); | ||
| size_t body_size; | ||
| const char* body = luaL_optlstring(state, 4, nullptr, &body_size); | ||
| int timeout_ms = luaL_checkint(state, 5); | ||
| if (timeout_ms < 0) { | ||
| return luaL_error(state, "http call timeout must be >= 0"); | ||
| } | ||
|
|
||
| if (filter_.clusterManager().get(cluster) == nullptr) { | ||
| return luaL_error(state, "http call cluster invalid. Must be configured"); | ||
| } | ||
|
|
||
| auto headers = std::make_unique<Http::RequestHeaderMapImpl>(); | ||
| buildHeadersFromTable(*headers, state, 3); | ||
| Http::RequestMessagePtr message(new Http::RequestMessageImpl(std::move(headers))); | ||
|
|
||
| // Check that we were provided certain headers. | ||
| if (message->headers().Path() == nullptr || message->headers().Method() == nullptr || | ||
| message->headers().Host() == nullptr) { | ||
| return luaL_error(state, "http call headers must include ':path', ':method', and ':authority'"); | ||
| } | ||
|
|
||
| if (body != nullptr) { | ||
| message->body() = std::make_unique<Buffer::OwnedImpl>(body, body_size); | ||
| message->headers().setContentLength(body_size); | ||
| } | ||
|
|
||
| absl::optional<std::chrono::milliseconds> timeout; | ||
| if (timeout_ms > 0) { | ||
| timeout = std::chrono::milliseconds(timeout_ms); | ||
| } | ||
|
|
||
| http_request_ = filter_.clusterManager().httpAsyncClientForCluster(cluster).send( | ||
| std::move(message), *this, Http::AsyncClient::RequestOptions().setTimeout(timeout)); | ||
| http_request_ = LuaFilterUtil::makeHttpCall(state, filter_, *this); | ||
| if (http_request_) { | ||
| state_ = State::HttpCall; | ||
| return lua_yield(state, 0); | ||
|
|
@@ -213,6 +154,10 @@ int StreamHandleWrapper::luaHttpCall(lua_State* state) { | |
| } | ||
| } | ||
|
|
||
| int StreamHandleWrapper::luaHttpCallAsync(lua_State* state) { | ||
| return fireAndForgetWriter_->luaHttpCallAsync(state); | ||
| } | ||
|
|
||
| void StreamHandleWrapper::onSuccess(Http::ResponseMessagePtr&& response) { | ||
| ASSERT(state_ == State::HttpCall || state_ == State::Running); | ||
| ENVOY_LOG(debug, "async HTTP response complete"); | ||
|
|
@@ -485,6 +430,84 @@ int StreamHandleWrapper::luaImportPublicKey(lua_State* state) { | |
| return 1; | ||
| } | ||
|
|
||
| FireAndForgetWriter::FireAndForgetWriter(Filter& filter) : filter_(filter) {} | ||
|
|
||
| int FireAndForgetWriter::luaHttpCallAsync(lua_State* state) { | ||
| if (LuaFilterUtil::makeHttpCall(state, filter_, *this)) { | ||
| return 0; | ||
| } else { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Superfluous |
||
| return 2; | ||
| } | ||
| } | ||
|
|
||
| Http::AsyncClient::Request* | ||
| LuaFilterUtil::makeHttpCall(lua_State* state, Filter& filter, | ||
| Http::AsyncClient::Callbacks& callbacksListener) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: callbacksListener -> callbacks (or at the very least snake case parameter names) |
||
| const std::string cluster = luaL_checkstring(state, 2); | ||
| luaL_checktype(state, 3, LUA_TTABLE); | ||
| size_t body_size; | ||
| const char* body = luaL_optlstring(state, 4, nullptr, &body_size); | ||
| int timeout_ms = luaL_checkint(state, 5); | ||
| if (timeout_ms < 0) { | ||
| luaL_error(state, "http call timeout must be >= 0"); | ||
| } | ||
|
|
||
| if (filter.clusterManager().get(cluster) == nullptr) { | ||
| luaL_error(state, "http call cluster invalid. Must be configured"); | ||
| } | ||
|
|
||
| auto headers = std::make_unique<Http::RequestHeaderMapImpl>(); | ||
| LuaFilterUtil::buildHeadersFromTable(*headers, state, 3); | ||
| Http::RequestMessagePtr message(new Http::RequestMessageImpl(std::move(headers))); | ||
|
|
||
| // Check that we were provided certain headers. | ||
| if (message->headers().Path() == nullptr || message->headers().Method() == nullptr || | ||
| message->headers().Host() == nullptr) { | ||
| luaL_error(state, "http call headers must include ':path', ':method', and ':authority'"); | ||
| } | ||
|
|
||
| if (body != nullptr) { | ||
| message->body() = std::make_unique<Buffer::OwnedImpl>(body, body_size); | ||
| message->headers().setContentLength(body_size); | ||
| } | ||
|
|
||
| absl::optional<std::chrono::milliseconds> timeout; | ||
| if (timeout_ms > 0) { | ||
| timeout = std::chrono::milliseconds(timeout_ms); | ||
| } | ||
|
|
||
| return filter.clusterManager().httpAsyncClientForCluster(cluster).send( | ||
| std::move(message), callbacksListener, | ||
| Http::AsyncClient::RequestOptions().setTimeout(timeout)); | ||
| } | ||
|
|
||
| void LuaFilterUtil::buildHeadersFromTable(Http::HeaderMap& headers, lua_State* state, | ||
| int table_index) { | ||
| // Build a header map to make the request. We iterate through the provided table to do this and | ||
| // check that we are getting strings. | ||
| lua_pushnil(state); | ||
| while (lua_next(state, table_index) != 0) { | ||
| // Uses 'key' (at index -2) and 'value' (at index -1). | ||
| const char* key = luaL_checkstring(state, -2); | ||
| // Check if the current value is a table, we iterate through the table and add each element of | ||
| // it as a header entry value for the current key. | ||
| if (lua_istable(state, -1)) { | ||
| lua_pushnil(state); | ||
| while (lua_next(state, -2) != 0) { | ||
| const char* value = luaL_checkstring(state, -1); | ||
| headers.addCopy(Http::LowerCaseString(key), value); | ||
| lua_pop(state, 1); | ||
| } | ||
| } else { | ||
| const char* value = luaL_checkstring(state, -1); | ||
| headers.addCopy(Http::LowerCaseString(key), value); | ||
| } | ||
| // Removes 'value'; keeps 'key' for next iteration. This is the input for lua_next() so that | ||
| // it can push the next key/value pair onto the stack. | ||
| lua_pop(state, 1); | ||
| } | ||
| } | ||
|
|
||
| FilterConfig::FilterConfig(const std::string& lua_code, ThreadLocal::SlotAllocator& tls, | ||
| Upstream::ClusterManager& cluster_manager) | ||
| : cluster_manager_(cluster_manager), lua_state_(lua_code, tls) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -86,6 +86,8 @@ class FilterCallbacks { | |||||
|
|
||||||
| class Filter; | ||||||
|
|
||||||
| class FireAndForgetWriter; | ||||||
|
|
||||||
| /** | ||||||
| * A wrapper for a currently running request/response. This is the primary handle passed to Lua. | ||||||
| * The script interacts with Envoy entirely through this handle. | ||||||
|
|
@@ -117,6 +119,7 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan | |||||
|
|
||||||
| StreamHandleWrapper(Filters::Common::Lua::Coroutine& coroutine, Http::HeaderMap& headers, | ||||||
| bool end_stream, Filter& filter, FilterCallbacks& callbacks); | ||||||
| ~StreamHandleWrapper(); | ||||||
|
|
||||||
| Http::FilterHeadersStatus start(int function_ref); | ||||||
| Http::FilterDataStatus onData(Buffer::Instance& data, bool end_stream); | ||||||
|
|
@@ -142,6 +145,7 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan | |||||
| {"logErr", static_luaLogErr}, | ||||||
| {"logCritical", static_luaLogCritical}, | ||||||
| {"httpCall", static_luaHttpCall}, | ||||||
| {"httpCallAsync", static_luaHttpCallAsync}, | ||||||
| {"respond", static_luaRespond}, | ||||||
| {"streamInfo", static_luaStreamInfo}, | ||||||
| {"connection", static_luaConnection}, | ||||||
|
|
@@ -160,6 +164,16 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan | |||||
| */ | ||||||
| DECLARE_LUA_FUNCTION(StreamHandleWrapper, luaHttpCall); | ||||||
|
|
||||||
| /** | ||||||
| * Perform an asynchronous HTTP call to an upstream host. Fires and forgets. | ||||||
| * @param 1 (string): The name of the upstream cluster to call. This cluster must be configured. | ||||||
| * @param 2 (table): A table of HTTP headers. :method, :path, and :authority must be defined. | ||||||
| * @param 3 (string): Body. Can be nil. | ||||||
| * @param 4 (int): Timeout in milliseconds for the call. | ||||||
| * @return headers (table), body (string/nil) | ||||||
| */ | ||||||
| DECLARE_LUA_FUNCTION(StreamHandleWrapper, luaHttpCallAsync); | ||||||
|
|
||||||
| /** | ||||||
| * Perform an inline response. This call is currently only valid on the request path. Further | ||||||
| * filter iteration will stop. No further script code will run after this call. | ||||||
|
|
@@ -247,8 +261,6 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan | |||||
| */ | ||||||
| DECLARE_LUA_CLOSURE(StreamHandleWrapper, luaBodyIterator); | ||||||
|
|
||||||
| static void buildHeadersFromTable(Http::HeaderMap& headers, lua_State* state, int table_index); | ||||||
|
|
||||||
| // Filters::Common::Lua::BaseLuaObject | ||||||
| void onMarkDead() override { | ||||||
| // Headers/body/trailers wrappers do not survive any yields. The user can request them | ||||||
|
|
@@ -285,6 +297,38 @@ class StreamHandleWrapper : public Filters::Common::Lua::BaseLuaObject<StreamHan | |||||
| State state_{State::Running}; | ||||||
| std::function<void()> yield_callback_; | ||||||
| Http::AsyncClient::Request* http_request_{}; | ||||||
| FireAndForgetWriter* fireAndForgetWriter_; | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYT of making this a |
||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * Implementation that takes incoming requests and implements | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, could you expand this a little bit? I'm a bit confused by "takes incoming requests". |
||||||
| * "fire and forget" behavior using an async client. | ||||||
| */ | ||||||
| class FireAndForgetWriter : public Filters::Common::Lua::BaseLuaObject<FireAndForgetWriter>, | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this writer currently is an HTTP async client "writer". Can we name it as |
||||||
| public Http::AsyncClient::Callbacks { | ||||||
| public: | ||||||
| FireAndForgetWriter(Filter& filter); | ||||||
|
|
||||||
| int luaHttpCallAsync(lua_State* state); | ||||||
|
|
||||||
| // Http::AsyncClient::Callbacks | ||||||
| void onSuccess(Http::ResponseMessagePtr&&) override {} | ||||||
|
|
||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: del newline |
||||||
| void onFailure(Http::AsyncClient::FailureReason) override {} | ||||||
|
|
||||||
| private: | ||||||
| Filter& filter_; | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
| * A class with shared code for building and making http calls | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A super nit,
Suggested change
|
||||||
| */ | ||||||
| class LuaFilterUtil : public Filters::Common::Lua::BaseLuaObject<LuaFilterUtil> { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of making these utility functions exposed outside of this class/translation unit i'd suggest putting them in the anonymous namespace in the .cc file. This tends to be the preferred way of doing helper functions that are only used in one .cc file and that doesn't benefit from being a member function you can look for |
||||||
| public: | ||||||
| static Http::AsyncClient::Request* makeHttpCall(lua_State* state, Filter& filter, | ||||||
| Http::AsyncClient::Callbacks& callbacksListener); | ||||||
|
|
||||||
| static void buildHeadersFromTable(Http::HeaderMap& headers, lua_State* state, int table_index); | ||||||
| }; | ||||||
|
|
||||||
| /** | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -799,6 +799,59 @@ TEST_F(LuaHttpFilterTest, HttpCall) { | |||||
| callbacks->onSuccess(std::move(response_message)); | ||||||
| } | ||||||
|
|
||||||
| // Basic Asynchronous, Fire and Forget HTTP request flow. | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| TEST_F(LuaHttpFilterTest, HttpCallAsync) { | ||||||
| const std::string SCRIPT{R"EOF( | ||||||
| function envoy_on_request(request_handle) | ||||||
| local headers, body = request_handle:httpCallAsync( | ||||||
| "cluster", | ||||||
| { | ||||||
| [":method"] = "POST", | ||||||
| [":path"] = "/", | ||||||
| [":authority"] = "foo", | ||||||
| ["set-cookie"] = { "flavor=chocolate; Path=/", "variant=chewy; Path=/" } | ||||||
| }, | ||||||
| "hello world", | ||||||
| 5000) | ||||||
| end | ||||||
| )EOF"}; | ||||||
|
|
||||||
| InSequence s; | ||||||
| setup(SCRIPT); | ||||||
|
|
||||||
| Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}}; | ||||||
| Http::MockAsyncClientRequest request(&cluster_manager_.async_client_); | ||||||
| Http::AsyncClient::Callbacks* callbacks; | ||||||
| EXPECT_CALL(cluster_manager_, get(Eq("cluster"))); | ||||||
| EXPECT_CALL(cluster_manager_, httpAsyncClientForCluster("cluster")); | ||||||
| EXPECT_CALL(cluster_manager_.async_client_, send_(_, _, _)) | ||||||
| .WillOnce( | ||||||
| Invoke([&](Http::RequestMessagePtr& message, Http::AsyncClient::Callbacks& cb, | ||||||
| const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* { | ||||||
| EXPECT_EQ((Http::TestHeaderMapImpl{{":path", "/"}, | ||||||
| {":method", "POST"}, | ||||||
| {":authority", "foo"}, | ||||||
| {"set-cookie", "flavor=chocolate; Path=/"}, | ||||||
| {"set-cookie", "variant=chewy; Path=/"}, | ||||||
| {"content-length", "11"}}), | ||||||
| message->headers()); | ||||||
| callbacks = &cb; | ||||||
| return &request; | ||||||
| })); | ||||||
|
|
||||||
| EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(request_headers, false)); | ||||||
|
|
||||||
| Buffer::OwnedImpl data("hello"); | ||||||
| EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(data, false)); | ||||||
|
|
||||||
| Http::TestRequestTrailerMapImpl request_trailers{{"foo", "bar"}}; | ||||||
| EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(request_trailers)); | ||||||
|
|
||||||
| Http::ResponseMessagePtr response_message(new Http::ResponseMessageImpl( | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems unused? |
||||||
| Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}})); | ||||||
| response_message->body() = std::make_unique<Buffer::OwnedImpl>("response"); | ||||||
| } | ||||||
|
|
||||||
| // Double HTTP call. Responses before request body. | ||||||
| TEST_F(LuaHttpFilterTest, DoubleHttpCall) { | ||||||
| const std::string SCRIPT{R"EOF( | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by comment; I know nothing about Lua but I would've expected, and would hope you would not return headers/body from an async function, but would instead have the user supply another optional argument, which would be some sort of closure or lambda to run when the async fetch was complete, passing the headers and body to that function.
Even better, I think 2 lambdas should be supplied: one that is passed the headers when available. The second is called with body-chunks, until there are no more. It would receive a bool param that would be 'true' when the request is complete.
Actually I haven't looked deeply at the Envoy infrastructure for HTTP Fetches, but modeling the Lua interface based on that pattern (which might differ from my suggestion above) would be even better.
The phrase 'fire and forget' doesn't seem good to me from a resource usage perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're correct, this does not return headers/body. Pushed a change. Thanks for noticing that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource/memory wise, it looks like there are checks in the pipeline for this (I ran into it in previous pushes, and resolved them).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jmarantz and I talked about this offline. We will rename this
httpCallNonblockingand proceed without callbacks.