Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bazel/external/cargo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ name = "http_metadata_rust"
path = "../../../test/extensions/filters/http/wasm/test_data/metadata_rust.rs"
crate-type = ["cdylib"]

[[example]]
name = "http_resume_call_rust"
path = "../../../test/extensions/filters/http/wasm/test_data/resume_call_rust.rs"
crate-type = ["cdylib"]

[[example]]
name = "http_shared_data_rust"
path = "../../../test/extensions/filters/http/wasm/test_data/shared_data_rust.rs"
Expand Down
6 changes: 3 additions & 3 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,8 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "WebAssembly for Proxies (C++ host implementation)",
project_desc = "WebAssembly for Proxies (C++ host implementation)",
project_url = "https://github.com/proxy-wasm/proxy-wasm-cpp-host",
version = "eceb02d5b7772ec1cd78a4d35356e57d2e6d59bb",
sha256 = "ae9d9b87d21d95647ebda197d130b37bddc5c6ee3e6630909a231fd55fcc9069",
version = "15827110ac35fdac9abdb6b05d04ee7ee2044dae",
sha256 = "77a2671205eb0973bee375a1bee4099edef991350433981f6e3508780318117d",
strip_prefix = "proxy-wasm-cpp-host-{version}",
urls = ["https://github.com/proxy-wasm/proxy-wasm-cpp-host/archive/{version}.tar.gz"],
use_category = ["dataplane_ext"],
Expand All @@ -882,7 +882,7 @@ REPOSITORY_LOCATIONS_SPEC = dict(
"envoy.filters.network.wasm",
"envoy.stat_sinks.wasm",
],
release_date = "2020-11-10",
release_date = "2020-11-12",
cpe = "N/A",
),
emscripten_toolchain = dict(
Expand Down
6 changes: 4 additions & 2 deletions source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1490,12 +1490,14 @@ WasmResult Context::continueStream(WasmStreamType stream_type) {
switch (stream_type) {
case WasmStreamType::Request:
if (decoder_callbacks_) {
decoder_callbacks_->continueDecoding();
// We are in a reentrant call, so defer.
wasm()->addAfterVmCallAction([this] { decoder_callbacks_->continueDecoding(); });
}
break;
case WasmStreamType::Response:
if (encoder_callbacks_) {
encoder_callbacks_->continueEncoding();
// We are in a reentrant call, so defer.
wasm()->addAfterVmCallAction([this] { encoder_callbacks_->continueEncoding(); });
}
break;
default:
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/wasm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ envoy_extension_cc_test(
"//test/extensions/filters/http/wasm/test_data:body_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:headers_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:metadata_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:resume_call_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:shared_data_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:shared_queue_rust.wasm",
"//test/extensions/filters/http/wasm/test_data:test_cpp.wasm",
Expand Down
11 changes: 11 additions & 0 deletions test/extensions/filters/http/wasm/test_data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ wasm_rust_binary(
],
)

wasm_rust_binary(
name = "resume_call_rust.wasm",
srcs = ["resume_call_rust.rs"],
deps = [
"//bazel/external/cargo:log",
"//bazel/external/cargo:proxy_wasm",
],
)

wasm_rust_binary(
name = "shared_data_rust.wasm",
srcs = ["shared_data_rust.rs"],
Expand Down Expand Up @@ -72,6 +81,7 @@ envoy_cc_library(
"test_cpp_null_plugin.cc",
"test_grpc_call_cpp.cc",
"test_grpc_stream_cpp.cc",
"test_resume_call_cpp.cc",
"test_shared_data_cpp.cc",
"test_shared_queue_cpp.cc",
],
Expand All @@ -97,6 +107,7 @@ envoy_wasm_cc_binary(
"test_cpp.cc",
"test_grpc_call_cpp.cc",
"test_grpc_stream_cpp.cc",
"test_resume_call_cpp.cc",
"test_shared_data_cpp.cc",
"test_shared_queue_cpp.cc",
],
Expand Down
39 changes: 39 additions & 0 deletions test/extensions/filters/http/wasm/test_data/resume_call_rust.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use log::info;
use proxy_wasm::traits::{Context, HttpContext};
use proxy_wasm::types::*;
use std::time::Duration;

#[no_mangle]
pub fn _start() {
proxy_wasm::set_log_level(LogLevel::Trace);
proxy_wasm::set_http_context(|_, _| -> Box<dyn HttpContext> { Box::new(TestStream) });
}

struct TestStream;

impl HttpContext for TestStream {
fn on_http_request_headers(&mut self, _: usize) -> Action {
self.dispatch_http_call(
"cluster",
vec![(":method", "POST"), (":path", "/"), (":authority", "foo")],
Some(b"resume"),
vec![],
Duration::from_secs(1),
)
.unwrap();
info!("onRequestHeaders");
Action::Pause
}

fn on_http_request_body(&mut self, _: usize, _: bool) -> Action {
info!("onRequestBody");
Action::Continue
}
}

impl Context for TestStream {
fn on_http_call_response(&mut self, _: u32, _: usize, _: usize, _: usize) {
info!("continueRequest");
self.resume_http_request();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// NOLINT(namespace-envoy)
#include <memory>
#include <string>
#include <unordered_map>

#ifndef NULL_PLUGIN
#include "proxy_wasm_intrinsics_lite.h"
#else
#include "extensions/common/wasm/ext/envoy_null_plugin.h"
#endif

START_WASM_PLUGIN(HttpWasmTestCpp)

class ResumeCallContext : public Context {
public:
explicit ResumeCallContext(uint32_t id, RootContext* root) : Context(id, root) {}

FilterHeadersStatus onRequestHeaders(uint32_t, bool) override;
FilterDataStatus onRequestBody(size_t, bool) override;
};

class ResumeCallRootContext : public RootContext {
public:
explicit ResumeCallRootContext(uint32_t id, std::string_view root_id)
: RootContext(id, root_id) {}
};

static RegisterContextFactory register_ResumeCallContext(CONTEXT_FACTORY(ResumeCallContext),
ROOT_FACTORY(ResumeCallRootContext),
"resume_call");

FilterHeadersStatus ResumeCallContext::onRequestHeaders(uint32_t, bool) {
auto context_id = id();
auto resume_callback = [context_id](uint32_t, size_t, uint32_t) {
getContext(context_id)->setEffectiveContext();
logInfo("continueRequest");
continueRequest();
};
if (root()->httpCall("cluster", {{":method", "POST"}, {":path", "/"}, {":authority", "foo"}},
"resume", {}, 1000, resume_callback) != WasmResult::Ok) {
logError("unexpected failure");
return FilterHeadersStatus::StopIteration;
}
logInfo("onRequestHeaders");
return FilterHeadersStatus::StopIteration;
}

FilterDataStatus ResumeCallContext::onRequestBody(size_t, bool) {
logInfo("onRequestBody");
return FilterDataStatus::Continue;
}

END_WASM_PLUGIN
78 changes: 64 additions & 14 deletions test/extensions/filters/http/wasm/wasm_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "test/test_common/wasm_base.h"

using testing::Eq;
using testing::InSequence;
using testing::Invoke;
using testing::Return;
using testing::ReturnRef;
Expand Down Expand Up @@ -214,7 +215,7 @@ TEST_P(WasmHttpFilterTest, HeadersStopAndContinue) {
EXPECT_CALL(filter(), log_(spdlog::level::info, Eq(absl::string_view("header path /"))));
EXPECT_CALL(filter(), log_(spdlog::level::warn, Eq(absl::string_view("onDone 2"))));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}, {"server", "envoy-wasm-pause"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, true));
root_context_->onTick(0);
filter().clearRouteCache();
Expand Down Expand Up @@ -615,7 +616,56 @@ TEST_P(WasmHttpFilterTest, AsyncCall) {
callbacks->onSuccess(request, std::move(response_message));
return proxy_wasm::WasmResult::Ok;
}));
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_NE(callbacks, nullptr);
}

TEST_P(WasmHttpFilterTest, StopAndResumeViaAsyncCall) {
setupTest("resume_call");
setupFilter();

InSequence s;

Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
Http::MockAsyncClientRequest request(&cluster_manager_.async_client_);
Http::AsyncClient::Callbacks* callbacks = nullptr;
EXPECT_CALL(cluster_manager_, get(Eq("cluster")));
EXPECT_CALL(cluster_manager_, httpAsyncClientForCluster("cluster"));
EXPECT_CALL(cluster_manager_.async_client_, send_(_, _, _))
.WillOnce(
Invoke([&](Http::RequestMessagePtr& message, Http::AsyncClient::Callbacks& cb,
const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* {
EXPECT_EQ((Http::TestRequestHeaderMapImpl{{":method", "POST"},
{":path", "/"},
{":authority", "foo"},
{"content-length", "6"}}),
message->headers());
callbacks = &cb;
return &request;
}));

EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("onRequestHeaders")))
.WillOnce(Invoke([&](uint32_t, absl::string_view) -> proxy_wasm::WasmResult {
Http::ResponseMessagePtr response_message(new Http::ResponseMessageImpl(
Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}}));
NiceMock<Tracing::MockSpan> span;
Http::TestResponseHeaderMapImpl response_header{{":status", "200"}};
callbacks->onBeforeFinalizeUpstreamSpan(span, &response_header);
callbacks->onSuccess(request, std::move(response_message));
return proxy_wasm::WasmResult::Ok;
}));
EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("continueRequest")));

Http::MockStreamDecoderFilterCallbacks decoder_callbacks;
filter().setDecoderFilterCallbacks(decoder_callbacks);
EXPECT_CALL(decoder_callbacks, continueDecoding()).WillOnce(Invoke([&]() {
// Verify that we're not resuming processing from within Wasm callback.
EXPECT_EQ(proxy_wasm::current_context_, nullptr);
}));

EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_NE(callbacks, nullptr);
Expand Down Expand Up @@ -680,7 +730,7 @@ TEST_P(WasmHttpFilterTest, AsyncCallFailure) {
} else {
EXPECT_CALL(rootContext(), log_(spdlog::level::info, Eq("async_call failed")));
}
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_NE(callbacks, nullptr);
Expand Down Expand Up @@ -711,7 +761,7 @@ TEST_P(WasmHttpFilterTest, AsyncCallAfterDestroyed) {
}));

EXPECT_CALL(filter(), log_(spdlog::level::info, Eq("onRequestHeaders")));
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_CALL(request, cancel()).WillOnce([&]() { callbacks = nullptr; });
Expand Down Expand Up @@ -772,7 +822,7 @@ TEST_P(WasmHttpFilterTest, GrpcCall) {
}));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -856,7 +906,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallFailure) {
}));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("failure bad")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

// Test some additional error paths.
Expand Down Expand Up @@ -917,7 +967,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallCancel) {
return std::move(client_factory);
}));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

rootContext().onQueueReady(0);
Expand Down Expand Up @@ -961,7 +1011,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallClose) {
return std::move(client_factory);
}));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

rootContext().onQueueReady(1);
Expand Down Expand Up @@ -1006,7 +1056,7 @@ TEST_P(WasmHttpFilterTest, GrpcCallAfterDestroyed) {
}));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};

EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

EXPECT_CALL(request, cancel()).WillOnce([&]() { callbacks = nullptr; });
Expand Down Expand Up @@ -1071,7 +1121,7 @@ TEST_P(WasmHttpFilterTest, GrpcStream) {
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response")));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close done")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -1102,7 +1152,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCloseLocal) {
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response close")));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close ok")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -1132,7 +1182,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCloseRemote) {
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response")));
EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("close close")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand All @@ -1159,7 +1209,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamCancel) {
setupGrpcStreamTest(callbacks);

Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down Expand Up @@ -1187,7 +1237,7 @@ TEST_P(WasmHttpFilterTest, GrpcStreamOpenAtShutdown) {

EXPECT_CALL(rootContext(), log_(spdlog::level::debug, Eq("response response")));
Http::TestRequestHeaderMapImpl request_headers{{":path", "/"}};
EXPECT_EQ(Http::FilterHeadersStatus::StopIteration,
EXPECT_EQ(Http::FilterHeadersStatus::StopAllIterationAndWatermark,
filter().decodeHeaders(request_headers, false));

ProtobufWkt::Value value;
Expand Down