Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ REPOSITORY_LOCATIONS = dict(
urls = ["https://commondatastorage.googleapis.com/chromium-boringssl-docs/fips/boringssl-66005f41fbc3529ffe8d007708756720529da20d.tar.xz"],
),
com_google_absl = dict(
sha256 = "190b0c9e65ef0866b44c54b517b5a3e15b67a1001b34547f03f8f4d8553c2851",
strip_prefix = "abseil-cpp-63ee2f8877915a3565c29707dba8fe4d7822596a",
# 2020-01-08
urls = ["https://github.com/abseil/abseil-cpp/archive/63ee2f8877915a3565c29707dba8fe4d7822596a.tar.gz"],
sha256 = "19391fb4882601a65cb648d638c11aa301ce5f525ef02da1a9eafd22f72d7c59",
strip_prefix = "abseil-cpp-37dd2562ec830d547a1524bb306be313ac3f2556",
# 2020-01-29
urls = ["https://github.com/abseil/abseil-cpp/archive/37dd2562ec830d547a1524bb306be313ac3f2556.tar.gz"],
),
com_github_apache_thrift = dict(
sha256 = "7d59ac4fdcb2c58037ebd4a9da5f9a49e3e034bf75b3f26d9fe48ba3d8806e6b",
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/access_loggers/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_con
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot->set([base_wasm, plugin, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
access_log->setTlsSlot(std::move(tls_slot));
};
Expand Down
149 changes: 99 additions & 50 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace {

std::atomic<int64_t> active_wasm_;

std::string base64Sha256(absl::string_view data) {
std::string Sha256(absl::string_view data) {
std::vector<uint8_t> digest(SHA256_DIGEST_LENGTH);
EVP_MD_CTX* ctx(EVP_MD_CTX_new());
auto rc = EVP_DigestInit(ctx, EVP_sha256());
Expand All @@ -65,11 +65,24 @@ std::string base64Sha256(absl::string_view data) {
rc = EVP_DigestFinal(ctx, digest.data(), nullptr);
RELEASE_ASSERT(rc == 1, "Failed to finalize digest");
EVP_MD_CTX_free(ctx);
return Base64::encode(reinterpret_cast<const char*>(&digest[0]), digest.size());
return std::string(reinterpret_cast<const char*>(&digest[0]), digest.size());
}

// Map from Wasm ID to the local Wasm instance.
thread_local absl::flat_hash_map<std::string, std::weak_ptr<WasmHandle>> local_wasms;
std::string Xor(absl::string_view a, absl::string_view b) {
ASSERT(a.size() == b.size());
std::string result;
result.reserve(a.size());
for (size_t i = 0; i < a.size(); i++) {
result.push_back(a[i] ^ b[i]);
}
return result;
}

// Map from Wasm Key to the local Wasm instance.
thread_local absl::flat_hash_map<std::string, std::weak_ptr<WasmHandle>> local_wasms_;
// Map from Wasm Key to the base Wasm instance, using a pointer to avoid the initialization fiasco.
ABSL_CONST_INIT absl::Mutex base_wasms_mutex_(absl::kConstInit);
absl::flat_hash_map<std::string, std::weak_ptr<WasmHandle>>* base_wasms_ = nullptr;

const std::string INLINE_STRING = "<inline>";

Expand All @@ -92,10 +105,11 @@ const uint8_t* decodeVarint(const uint8_t* pos, const uint8_t* end, uint32_t* ou
} // namespace

Wasm::Wasm(absl::string_view runtime, absl::string_view vm_id, absl::string_view vm_configuration,
Stats::ScopeSharedPtr scope, Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher)
: vm_id_(std::string(vm_id)), wasm_vm_(Common::Wasm::createWasmVm(runtime, scope)),
scope_(scope), cluster_manager_(cluster_manager), dispatcher_(dispatcher),
absl::string_view vm_key, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher)
: vm_id_(std::string(vm_id)), vm_key_(std::string(vm_key)),
wasm_vm_(Common::Wasm::createWasmVm(runtime, scope)), scope_(scope),
cluster_manager_(cluster_manager), dispatcher_(dispatcher),
time_source_(dispatcher.timeSource()), vm_configuration_(vm_configuration),
wasm_stats_(WasmStats{
ALL_WASM_STATS(POOL_COUNTER_PREFIX(*scope_, absl::StrCat("wasm.", runtime, ".")),
Expand Down Expand Up @@ -243,18 +257,22 @@ void Wasm::getFunctions() {
}
}

Wasm::Wasm(const Wasm& wasm, Event::Dispatcher& dispatcher)
: std::enable_shared_from_this<Wasm>(wasm), vm_id_(wasm.vm_id_),
vm_id_with_hash_(wasm.vm_id_with_hash_), started_from_(wasm.wasm_vm()->cloneable()),
scope_(wasm.scope_), cluster_manager_(wasm.cluster_manager_), dispatcher_(dispatcher),
time_source_(dispatcher.timeSource()), wasm_stats_(wasm.wasm_stats_),
stat_name_set_(wasm.stat_name_set_) {
Wasm::Wasm(WasmHandleSharedPtr& base_wasm_handle, Event::Dispatcher& dispatcher)
: std::enable_shared_from_this<Wasm>(*base_wasm_handle->wasm()),
vm_id_(base_wasm_handle->wasm()->vm_id_), vm_key_(base_wasm_handle->wasm()->vm_key_),
started_from_(base_wasm_handle->wasm()->wasm_vm()->cloneable()),
scope_(base_wasm_handle->wasm()->scope_),
cluster_manager_(base_wasm_handle->wasm()->cluster_manager_), dispatcher_(dispatcher),
time_source_(dispatcher.timeSource()), base_wasm_handle_(base_wasm_handle),
wasm_stats_(base_wasm_handle->wasm()->wasm_stats_),
stat_name_set_(base_wasm_handle->wasm()->stat_name_set_) {
if (started_from_ != Cloneable::NotCloneable) {
wasm_vm_ = wasm.wasm_vm()->clone();
wasm_vm_ = base_wasm_handle->wasm()->wasm_vm()->clone();
} else {
wasm_vm_ = Common::Wasm::createWasmVm(wasm.wasm_vm()->runtime(), scope_);
wasm_vm_ = Common::Wasm::createWasmVm(base_wasm_handle->wasm()->wasm_vm()->runtime(), scope_);
}
if (!initialize(wasm.code(), wasm.allow_precompiled())) {
if (!initialize(base_wasm_handle->wasm()->code(),
base_wasm_handle->wasm()->allow_precompiled())) {
throw WasmException("Failed to load WASM code");
}
active_wasm_++;
Expand All @@ -280,10 +298,6 @@ bool Wasm::initialize(const std::string& code, bool allow_precompiled) {
}

if (started_from_ == Cloneable::NotCloneable) {
// Construct a unique identifier for the VM based on the provided vm_id and a hash of the
// code.
vm_id_with_hash_ = vm_id_ + ":" + base64Sha256(code);

auto ok = wasm_vm_->load(code, allow_precompiled);
if (!ok) {
return false;
Expand Down Expand Up @@ -338,6 +352,16 @@ bool Wasm::initialize(const std::string& code, bool allow_precompiled) {
return true;
}

Context* Wasm::getOrCreateRootContext(const PluginSharedPtr& plugin) {
auto root_context = getRootContext(plugin->root_id_);
if (!root_context) {
auto context = std::make_unique<Context>(this, plugin);
root_context = context.get();
root_contexts_[plugin->root_id_] = std::move(context);
}
return root_context;
}

void Wasm::startVm(Context* root_context) {
/* Call "_start" function, and fallback to "__wasm_call_ctors" if the former is not available. */
if (_start_) {
Expand Down Expand Up @@ -482,10 +506,6 @@ static void createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin
Api::Api& api, std::unique_ptr<Context> root_context_for_testing,
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
CreateWasmCallback&& cb) {
auto wasm = std::make_shared<WasmHandle>(
std::make_shared<Wasm>(vm_config.runtime(), vm_config.vm_id(), vm_config.configuration(),
scope, cluster_manager, dispatcher));

std::string source, code;
if (vm_config.code().has_remote()) {
source = vm_config.code().remote().http_uri().uri();
Expand All @@ -495,22 +515,50 @@ static void createWasmInternal(const VmConfig& vm_config, PluginSharedPtr plugin
.value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
}

auto callback = [wasm, plugin, cb, source, allow_precompiled = vm_config.allow_precompiled(),
auto callback = [vm_config, scope, &cluster_manager, &dispatcher, plugin, cb, source,
context_ptr = root_context_for_testing ? root_context_for_testing.release()
: nullptr](const std::string& code) {
std::unique_ptr<Context> context(context_ptr);
if (code.empty()) {
throw WasmException(fmt::format("Failed to load WASM code from {}", source));
}
if (!wasm->wasm()->initialize(code, allow_precompiled)) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", source));
}
if (!context) {
wasm->wasm()->start(plugin);
} else {
wasm->wasm()->startForTesting(std::move(context), plugin);
// Construct a unique identifier for the VM based on a hash of the code, vm configuration data
// and vm_id.
std::string vm_key = Sha256(vm_config.vm_id());
vm_key = Xor(vm_key, Sha256(vm_config.configuration()));
vm_key = Xor(vm_key, Sha256(code));
vm_key = Base64::encode(&*vm_key.begin(), vm_key.size());

std::shared_ptr<WasmHandle> wasm;
{
absl::MutexLock l(&base_wasms_mutex_);
if (!base_wasms_) {
base_wasms_ = new std::remove_reference<decltype(*base_wasms_)>::type;
}
auto it = base_wasms_->find(vm_key);
if (it != base_wasms_->end()) {
wasm = it->second.lock();
if (!wasm) {
base_wasms_->erase(it);
}
}
if (!wasm) {
wasm = std::make_shared<WasmHandle>(std::make_shared<Wasm>(
vm_config.runtime(), vm_config.vm_id(), vm_config.configuration(), vm_key, scope,
cluster_manager, dispatcher));
if (!wasm->wasm()->initialize(code, vm_config.allow_precompiled())) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", source));
}
if (!context) {
wasm->wasm()->start(plugin);
} else {
wasm->wasm()->startForTesting(std::move(context), plugin);
}
(*base_wasms_)[vm_key] = wasm;
}
}
cb(wasm);

cb(std::move(wasm));
};

if (vm_config.code().has_remote()) {
Expand Down Expand Up @@ -543,40 +591,41 @@ void createWasmForTesting(const envoy::extensions::wasm::v3::VmConfig& vm_config
std::move(root_context_for_testing), remote_data_provider, std::move(cb));
}

WasmHandleSharedPtr createThreadLocalWasm(WasmHandle& base_wasm, PluginSharedPtr plugin,
WasmHandleSharedPtr createThreadLocalWasm(WasmHandleSharedPtr& base_wasm, PluginSharedPtr plugin,
absl::string_view configuration,
Event::Dispatcher& dispatcher) {
auto wasm = std::make_shared<WasmHandle>(std::make_shared<Wasm>(*base_wasm.wasm(), dispatcher));
Context* root_context = wasm->wasm()->start(plugin);
if (!wasm->wasm()->configure(root_context, plugin, configuration)) {
auto wasm_handle = std::make_shared<WasmHandle>(std::make_shared<Wasm>(base_wasm, dispatcher));
Context* root_context = wasm_handle->wasm()->start(plugin);
if (!wasm_handle->wasm()->configure(root_context, plugin, configuration)) {
throw WasmException("Failed to configure WASM code");
}
local_wasms[wasm->wasm()->vm_id_with_hash()] = wasm;
return wasm;
local_wasms_[wasm_handle->wasm()->vm_key()] = wasm_handle;
return wasm_handle;
}

WasmHandleSharedPtr getThreadLocalWasmPtr(absl::string_view vm_id) {
auto it = local_wasms.find(vm_id);
if (it == local_wasms.end()) {
WasmHandleSharedPtr getThreadLocalWasmPtr(absl::string_view vm_key) {
auto it = local_wasms_.find(vm_key);
if (it == local_wasms_.end()) {
return nullptr;
}
auto wasm = it->second.lock();
if (!wasm) {
local_wasms.erase(vm_id);
local_wasms_.erase(vm_key);
}
return wasm;
}

WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandle& base_wasm, PluginSharedPtr plugin,
WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandleSharedPtr base_wasm,
PluginSharedPtr plugin,
absl::string_view configuration,
Event::Dispatcher& dispatcher) {
auto wasm = getThreadLocalWasmPtr(base_wasm.wasm()->vm_id_with_hash());
if (wasm) {
auto root_context = wasm->wasm()->start(plugin);
if (!wasm->wasm()->configure(root_context, plugin, configuration)) {
auto wasm_handle = getThreadLocalWasmPtr(base_wasm->wasm()->vm_key());
if (wasm_handle) {
auto root_context = wasm_handle->wasm()->getOrCreateRootContext(plugin);
if (!wasm_handle->wasm()->configure(root_context, plugin, configuration)) {
throw WasmException("Failed to configure WASM code");
}
return wasm;
return wasm_handle;
}
return createThreadLocalWasm(base_wasm, plugin, configuration, dispatcher);
}
Expand Down
32 changes: 17 additions & 15 deletions source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ using VmConfig = envoy::extensions::wasm::v3::VmConfig;
using WasmForeignFunction =
std::function<WasmResult(Wasm&, absl::string_view, std::function<void*(size_t size)>)>;

class WasmHandle;

// Wasm execution instance. Manages the Envoy side of the Wasm interface.
class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_shared_from_this<Wasm> {
public:
Wasm(absl::string_view runtime, absl::string_view vm_id, absl::string_view vm_configuration,
Stats::ScopeSharedPtr scope, Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher);
Wasm(const Wasm& other, Event::Dispatcher& dispatcher);
absl::string_view vm_key, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher);
Wasm(std::shared_ptr<WasmHandle>& other, Event::Dispatcher& dispatcher);
~Wasm();

bool initialize(const std::string& code, bool allow_precompiled = false);
Expand All @@ -68,11 +70,12 @@ class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_share
Context* start(PluginSharedPtr plugin); // returns the root Context.

absl::string_view vm_id() const { return vm_id_; }
absl::string_view vm_id_with_hash() const { return vm_id_with_hash_; }
absl::string_view vm_key() const { return vm_key_; }
WasmVm* wasm_vm() const { return wasm_vm_.get(); }
Context* vm_context() const { return vm_context_.get(); }
Stats::StatNameSetSharedPtr stat_name_set() const { return stat_name_set_; }
Context* getRootContext(absl::string_view root_id) { return root_contexts_[root_id].get(); }
Context* getOrCreateRootContext(const PluginSharedPtr& plugin);
Context* getContext(uint32_t id) {
auto it = contexts_.find(id);
if (it != contexts_.end())
Expand Down Expand Up @@ -169,8 +172,8 @@ class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_share
void establishEnvironment(); // Language specific environments.
void getFunctions(); // Get functions call into WASM.

std::string vm_id_; // User-provided vm_id.
std::string vm_id_with_hash_; // vm_id + hash of code.
std::string vm_id_; // User-provided vm_id.
std::string vm_key_; // Hash(code, vm configuration data, vm_id_)
std::unique_ptr<WasmVm> wasm_vm_;
Cloneable started_from_{Cloneable::NotCloneable};
Stats::ScopeSharedPtr scope_;
Expand Down Expand Up @@ -234,6 +237,8 @@ class Wasm : public Logger::Loggable<Logger::Id::wasm>, public std::enable_share
WasmCallVoid<1> on_log_;
WasmCallVoid<1> on_delete_;

std::shared_ptr<WasmHandle> base_wasm_handle_;

// Used by the base_wasm to enable non-clonable thread local Wasm(s) to be constructed.
std::string code_;
std::string vm_configuration_;
Expand Down Expand Up @@ -269,14 +274,9 @@ class WasmHandle : public Envoy::Server::Wasm,
public std::enable_shared_from_this<WasmHandle> {
public:
explicit WasmHandle(WasmSharedPtr wasm) : wasm_(wasm) {}
~WasmHandle() {
auto wasm = wasm_;
// NB: V8 will stack overflow during the stress test if we shutdown with the call stack in the
// ThreadLocal set call so shift to a fresh call stack.
wasm_->dispatcher().post([wasm] { wasm->shutdown(); });
}
~WasmHandle() { wasm_->shutdown(); }

const WasmSharedPtr& wasm() { return wasm_; }
WasmSharedPtr& wasm() { return wasm_; }

private:
WasmSharedPtr wasm_;
Expand Down Expand Up @@ -308,8 +308,10 @@ void createWasmForTesting(const VmConfig& vm_config, PluginSharedPtr plugin,
// Get an existing ThreadLocal VM matching 'vm_id' or nullptr if there isn't one.
WasmHandleSharedPtr getThreadLocalWasmPtr(absl::string_view vm_id);
// Get an existing ThreadLocal VM matching 'vm_id' or create one using 'base_wavm' by cloning or by
// using it it as a template.
WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandle& base_wasm, PluginSharedPtr plugin,
// using it it as a template. Note that 'base_wasm' typically is a const lambda capture and needs
// to be copied to be passed, hence the pass-by-value interface.
WasmHandleSharedPtr getOrCreateThreadLocalWasm(WasmHandleSharedPtr base_wasm,
PluginSharedPtr plugin,
absl::string_view configuration,
Event::Dispatcher& dispatcher);

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/wasm/wasm_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Was
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, plugin, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
};

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/network/wasm/wasm_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ FilterConfig::FilterConfig(const envoy::extensions::filters::network::wasm::v3::
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, plugin, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
};

Expand Down
5 changes: 2 additions & 3 deletions source/extensions/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ void WasmFactory::createWasm(const envoy::extensions::wasm::v3::WasmService& con
config.config().name(), config.config().root_id(), config.config().vm_config().vm_id(),
envoy::config::core::v3::TrafficDirection::UNSPECIFIED, context.localInfo(), nullptr);

auto callback = [&context, &config, plugin,
cb](std::shared_ptr<Common::Wasm::WasmHandle> base_wasm) {
auto callback = [&context, &config, plugin, cb](Common::Wasm::WasmHandleSharedPtr base_wasm) {
if (config.singleton()) {
// Return the WASM VM which will be stored as a singleton by the Server.
auto root_context = base_wasm->wasm()->start(plugin);
Expand All @@ -40,7 +39,7 @@ void WasmFactory::createWasm(const envoy::extensions::wasm::v3::WasmService& con
context.threadLocal().allocateSlot()->set([base_wasm, plugin,
configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, plugin, *configuration, dispatcher));
Common::Wasm::getOrCreateThreadLocalWasm(base_wasm, plugin, *configuration, dispatcher));
});
// Do not return this WASM VM since this is per-thread. Returning it would indicate that
// this is a singleton.
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/access_loggers/wasm/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ TEST_P(WasmAccessLogConfigTest, CreateWasmFromEmpty) {
AccessLog::InstanceSharedPtr instance;
EXPECT_THROW_WITH_MESSAGE(
instance = factory->createAccessLogInstance(*message, std::move(filter), context),
Common::Wasm::WasmVmException, "Failed to create WASM VM with unspecified runtime.");
Common::Wasm::WasmException, "Failed to load WASM code from ");
}

TEST_P(WasmAccessLogConfigTest, CreateWasmFromWASM) {
Expand Down
Loading