Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add comment of wasm #370

Merged
merged 9 commits into from
Dec 20, 2021
8 changes: 7 additions & 1 deletion pkg/wasm/abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (

const AbiV2 = "proxy_abi_version_0_2_0"

// registers an abi
func init() {
abi.RegisterABI(AbiV2, abiImplFactory)
}

// initialization an abi
func abiImplFactory(instance types.WasmInstance) types.ABI {
abi := &AbiV2Impl{}
abi.SetInstance(instance)
Expand All @@ -45,15 +47,19 @@ var (
_ Exports = &AbiV2Impl{}
)

// Get abi name
func (a *AbiV2Impl) Name() string {
return AbiV2
}

// Get abi
func (a *AbiV2Impl) GetABIExports() interface{} {
return a
}

// Get id
func (a *AbiV2Impl) ProxyGetID() (string, error) {
// store the funcName and common.WasmFunction, then return the common.WasmFunction
ff, err := a.Instance.GetExportsFunc("proxy_get_id")
if err != nil {
return "", err
Expand All @@ -64,7 +70,7 @@ func (a *AbiV2Impl) ProxyGetID() (string, error) {
return "", err
}
a.Imports.Wait()

// generate functionId
functionId := string(a.Imports.GetFuncCallData().Bytes())
if functionId == "" {
return "", errors.New("")
Expand Down
4 changes: 4 additions & 0 deletions pkg/wasm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type filterConfigItem struct {
PluginName string `json:"-"`
}

// Parse filterConfigItem
func parseFilterConfigItem(cfg map[string]interface{}) (*filterConfigItem, error) {
config := filterConfigItem{
UserData: make(map[string]string),
Expand Down Expand Up @@ -64,6 +65,7 @@ func parseFilterConfigItem(cfg map[string]interface{}) (*filterConfigItem, error
return &config, nil
}

// Check VMconfig of filterConfigItem
func checkVmConfig(config *filterConfigItem) error {
if config.FromWasmPlugin != "" {
config.VmConfig = nil
Expand All @@ -82,6 +84,7 @@ func checkVmConfig(config *filterConfigItem) error {
return nil
}

// Parse user data
func parseUserData(rawConfigBytes []byte, config *filterConfigItem) error {
if len(rawConfigBytes) == 0 || config == nil {
log.DefaultLogger.Errorf("[proxywasm][config] fail to parse user data, invalid param, raw: %v, config: %v",
Expand All @@ -91,6 +94,7 @@ func parseUserData(rawConfigBytes []byte, config *filterConfigItem) error {

m := make(map[string]interface{})

// check rawConfigBytes
if err := json.Unmarshal(rawConfigBytes, &m); err != nil {
log.DefaultLogger.Errorf("[proxywasm][config] fail to unmarshal user data, err: %v", err)
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/wasm/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (route *Router) RegisterRoute(id string, plugin *WasmPlugin) {
}
}

// Get random plugin with rand id
func (route *Router) GetRandomPluginByID(id string) (*WasmPlugin, error) {
group, ok := route.routes[id]
if !ok {
Expand Down
8 changes: 7 additions & 1 deletion pkg/wasm/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type FilterConfigFactory struct {

var _ api.StreamFilterChainFactory = &FilterConfigFactory{}

// Create a proxy factory for WasmFilter
func createProxyWasmFilterFactory(confs map[string]interface{}) (api.StreamFilterChainFactory, error) {
factory := &FilterConfigFactory{
config: make([]*filterConfigItem, 0, len(confs)),
Expand All @@ -64,7 +65,7 @@ func createProxyWasmFilterFactory(confs map[string]interface{}) (api.StreamFilte
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory config not a map, configID: %s", configID)
return nil, errors.New("config not a map")
}

// parse filter config
config, err := parseFilterConfigItem(conf)
if err != nil {
log.DefaultLogger.Errorf("[proxywasm][factory] createProxyWasmFilterFactory fail to parse config, configID: %s, err: %v", configID, err)
Expand Down Expand Up @@ -116,6 +117,7 @@ func createProxyWasmFilterFactory(confs map[string]interface{}) (api.StreamFilte
return factory, nil
}

// Create the FilterChain
func (f *FilterConfigFactory) CreateFilterChain(context context.Context, callbacks api.StreamFilterChainFactoryCallbacks) {
filter := NewFilter(context, f)
if filter == nil {
Expand All @@ -126,6 +128,7 @@ func (f *FilterConfigFactory) CreateFilterChain(context context.Context, callbac
callbacks.AddStreamSenderFilter(filter, api.BeforeSend)
}

// Get RootContext's ID
func (f *FilterConfigFactory) GetRootContextID() int32 {
return f.RootContextID
}
Expand All @@ -134,6 +137,7 @@ func (f *FilterConfigFactory) GetRootContextID() int32 {
// for `pw.RegisterPluginHandler(factory)`
var _ types.WasmPluginHandler = &FilterConfigFactory{}

// update config of FilterConfigFactory
func (f *FilterConfigFactory) OnConfigUpdate(config v2.WasmPluginConfig) {
for _, plugin := range f.config {
if plugin.PluginName == config.PluginName {
Expand All @@ -143,6 +147,7 @@ func (f *FilterConfigFactory) OnConfigUpdate(config v2.WasmPluginConfig) {
}
}

// Execute the plugin of FilterConfigFactory
func (f *FilterConfigFactory) OnPluginStart(plugin types.WasmPlugin) {
plugin.Exec(func(instance types.WasmInstance) bool {
wasmPlugin, ok := f.plugins[plugin.PluginName()]
Expand Down Expand Up @@ -201,4 +206,5 @@ func (f *FilterConfigFactory) OnPluginStart(plugin types.WasmPlugin) {
})
}

// Destroy the plugin of FilterConfigFactory
func (f *FilterConfigFactory) OnPluginDestroy(plugin types.WasmPlugin) {}
18 changes: 18 additions & 0 deletions pkg/wasm/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type WasmPlugin struct {
pluginConfigBytes buffer.IoBuffer
}

// Get the VmConfig of WasmPlugin
func (p *WasmPlugin) GetVmConfig() common.IoBuffer {
if p.vmConfigBytes != nil {
return p.vmConfigBytes
Expand Down Expand Up @@ -96,6 +97,7 @@ func (p *WasmPlugin) GetVmConfig() common.IoBuffer {
return p.vmConfigBytes
}

// Get the plugin config of WasmPlugin
func (p *WasmPlugin) GetPluginConfig() common.IoBuffer {
if p.pluginConfigBytes != nil {
return p.pluginConfigBytes
Expand All @@ -112,6 +114,7 @@ func (p *WasmPlugin) GetPluginConfig() common.IoBuffer {

var contextIDGenerator int32

// new context's ID
func newContextID(rootContextID int32) int32 {
for {
id := atomic.AddInt32(&contextIDGenerator, 1)
Expand All @@ -137,6 +140,7 @@ func NewFilter(ctx context.Context, factory *FilterConfigFactory) *Filter {
return filter
}

// Destruction of filters
func (f *Filter) OnDestroy() {
f.destroyOnce.Do(func() {
if f.pluginUsed == nil || f.instance == nil {
Expand All @@ -161,14 +165,17 @@ func (f *Filter) OnDestroy() {
})
}

// Set ReceiveFilterHandler of filter
func (f *Filter) SetReceiveFilterHandler(handler api.StreamReceiverFilterHandler) {
f.receiverFilterHandler = handler
}

// Set SenderFilterHandler of filter
func (f *Filter) SetSenderFilterHandler(handler api.StreamSenderFilterHandler) {
f.senderFilterHandler = handler
}

// Calculate the size of headerMap
func headerMapSize(headers api.HeaderMap) int {
size := 0

Expand All @@ -182,6 +189,7 @@ func headerMapSize(headers api.HeaderMap) int {
return size
}

// Reset the filter when receiving then return StreamFilter status
func (f *Filter) OnReceive(ctx context.Context, headers api.HeaderMap, buf buffer.IoBuffer, trailers api.HeaderMap) api.StreamFilterStatus {
id, ok := headers.Get("id")
if !ok {
Expand Down Expand Up @@ -263,23 +271,28 @@ func (f *Filter) OnReceive(ctx context.Context, headers api.HeaderMap, buf buffe
return api.StreamFilterContinue
}

// Append ResponseData of filter
func (f *Filter) Append(ctx context.Context, headers api.HeaderMap, buf buffer.IoBuffer, trailers api.HeaderMap) api.StreamFilterStatus {
f.senderFilterHandler.SetResponseData(f.responseBuffer)
return api.StreamFilterContinue
}

// Get RootContext ID of filter's FilterConfigFactory
func (f *Filter) GetRootContextID() int32 {
return f.factory.RootContextID
}

// Get the used WasmPlugin VmConfig of filter
func (f *Filter) GetVmConfig() common.IoBuffer {
return f.pluginUsed.GetVmConfig()
}

// Get the used WasmPlugin config of filter
func (f *Filter) GetPluginConfig() common.IoBuffer {
return f.pluginUsed.GetPluginConfig()
}

// Get the HttpRequest header of proxy-wasm
func (f *Filter) GetHttpRequestHeader() common.HeaderMap {
if f.receiverFilterHandler == nil {
return nil
Expand All @@ -288,6 +301,7 @@ func (f *Filter) GetHttpRequestHeader() common.HeaderMap {
return &proxywasm010.HeaderMapWrapper{HeaderMap: f.receiverFilterHandler.GetRequestHeaders()}
}

// Get the HttpRequest body of proxy-wasm
func (f *Filter) GetHttpRequestBody() common.IoBuffer {
if f.receiverFilterHandler == nil {
return nil
Expand All @@ -296,6 +310,7 @@ func (f *Filter) GetHttpRequestBody() common.IoBuffer {
return &proxywasm010.IoBufferWrapper{IoBuffer: f.requestBuffer}
}

// Get the HttpRequest trailer of proxy-wasm
func (f *Filter) GetHttpRequestTrailer() common.HeaderMap {
if f.receiverFilterHandler == nil {
return nil
Expand All @@ -304,6 +319,7 @@ func (f *Filter) GetHttpRequestTrailer() common.HeaderMap {
return &proxywasm010.HeaderMapWrapper{HeaderMap: f.receiverFilterHandler.GetRequestTrailers()}
}

// Get the HttpResponse header of proxy-wasm
func (f *Filter) GetHttpResponseHeader() common.HeaderMap {
if f.senderFilterHandler == nil {
return nil
Expand All @@ -312,6 +328,7 @@ func (f *Filter) GetHttpResponseHeader() common.HeaderMap {
return &proxywasm010.HeaderMapWrapper{HeaderMap: f.senderFilterHandler.GetResponseHeaders()}
}

// Get the HttpResponse body of proxy-wasm
func (f *Filter) GetHttpResponseBody() common.IoBuffer {
if f.senderFilterHandler == nil {
return nil
Expand All @@ -320,6 +337,7 @@ func (f *Filter) GetHttpResponseBody() common.IoBuffer {
return &proxywasm010.IoBufferWrapper{IoBuffer: f.responseBuffer}
}

// Get the HttpResponse trailer of proxy-wasm
func (f *Filter) GetHttpResponseTrailer() common.HeaderMap {
if f.senderFilterHandler == nil {
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/wasm/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type LayottoHandler struct {

var _ proxywasm.ImportsHandler = &LayottoHandler{}

// Obtains the state for a specific key
func (d *LayottoHandler) GetState(storeName string, key string) (string, proxywasm.WasmResult) {
req := &runtimev1pb.GetStateRequest{
StoreName: storeName,
Expand All @@ -47,6 +48,7 @@ func (d *LayottoHandler) GetState(storeName string, key string) (string, proxywa
return string(resp.Data), proxywasm.WasmResultOk
}

// Do rpc calls
func (d *LayottoHandler) InvokeService(id string, method string, param string) (string, proxywasm.WasmResult) {
req := &runtimev1pb.InvokeServiceRequest{
Id: id,
Expand All @@ -62,6 +64,7 @@ func (d *LayottoHandler) InvokeService(id string, method string, param string) (
return string(resp.Data.Value), proxywasm.WasmResultOk
}

// Get the IoBuffer of LayottoHandler
func (d *LayottoHandler) GetFuncCallData() common.IoBuffer {
if d.IoBuffer == nil {
d.IoBuffer = common.NewIoBufferBytes(make([]byte, 0))
Expand Down
11 changes: 9 additions & 2 deletions pkg/wasm/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
factories = make(map[string]*FilterConfigFactory)
)

// Init watcher
func init() {
var err error
watcher, err = fsnotify.NewWatcher()
Expand All @@ -44,6 +45,7 @@ func init() {
utils.GoWithRecover(runWatcher, nil)
}

// Watching wasm
func runWatcher() {
for {
select {
Expand Down Expand Up @@ -82,8 +84,10 @@ func runWatcher() {
}
}

// Add watching file
func addWatchFile(cfg *filterConfigItem, factory *FilterConfigFactory) {
path := cfg.VmConfig.Path
// Add starts watching the named file or directory (non-recursively).
if err := watcher.Add(path); err != nil {
log.DefaultLogger.Errorf("[proxywasm] [watcher] addWatchFile fail to watch wasm file, err: %v", err)
}
Expand All @@ -99,6 +103,7 @@ func addWatchFile(cfg *filterConfigItem, factory *FilterConfigFactory) {
log.DefaultLogger.Infof("[proxywasm] [watcher] addWatchFile start to watch wasm file and its dir: %s", path)
}

// Reload Wasm's configuration file
func reloadWasm(fullPath string) {
found := false

Expand All @@ -118,7 +123,7 @@ func reloadWasm(fullPath string) {
log.DefaultLogger.Errorf("[proxywasm] [watcher] reloadWasm fail to add plugin, err: %v", err)
return
}

// get WasmPluginWrapper
pw := wasm.GetWasmManager().GetWasmPluginWrapperByName(config.PluginName)
if pw == nil {
log.DefaultLogger.Errorf("[proxywasm] [watcher] reloadWasm plugin not found")
Expand All @@ -136,7 +141,7 @@ func reloadWasm(fullPath string) {
config: config,
}
factory.plugins[config.PluginName] = wasmPlugin

// register plugin
pw.RegisterPluginHandler(factory)

for _, plugin := range factory.plugins {
Expand All @@ -153,6 +158,7 @@ func reloadWasm(fullPath string) {
}
}

// Check if the file exists
func fileExist(file string) bool {
_, err := os.Stat(file)
if err != nil && !os.IsExist(err) {
Expand All @@ -161,6 +167,7 @@ func fileExist(file string) bool {
return true
}

// Check the file suffix of wasm
func pathIsWasmFile(fullPath string) bool {
for path, _ := range configs {
if strings.HasSuffix(fullPath, path) {
Expand Down