diff --git a/internal/plugin/portable/runtime/connection.go b/internal/plugin/portable/runtime/connection.go index a1a6029521..529e5dbca9 100644 --- a/internal/plugin/portable/runtime/connection.go +++ b/internal/plugin/portable/runtime/connection.go @@ -151,7 +151,7 @@ func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error) { } } if err != nil { - return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error()) + return nil, err } result, e := r.sock.Recv() if e != nil { diff --git a/internal/plugin/portable/runtime/function.go b/internal/plugin/portable/runtime/function.go index 47163a2aed..3f7fd3db80 100644 --- a/internal/plugin/portable/runtime/function.go +++ b/internal/plugin/portable/runtime/function.go @@ -16,9 +16,11 @@ package runtime import ( "encoding/json" + "errors" "fmt" "github.com/lf-edge/ekuiper/contract/v2/api" + nerrors "go.nanomsg.org/mangos/v3/errors" "github.com/lf-edge/ekuiper/v2/internal/conf" kctx "github.com/lf-edge/ekuiper/v2/internal/topo/context" @@ -86,7 +88,8 @@ func (f *PortableFunc) Validate(args []interface{}) error { } res, err := f.dataCh.Req(jsonArg) if err != nil { - return err + e := handleTimeout(err, f.reg.Name) + return e } fr := &FuncReply{} err = json.Unmarshal(res, fr) @@ -112,7 +115,8 @@ func (f *PortableFunc) Exec(ctx api.FunctionContext, args []any) (interface{}, b } res, err := f.dataCh.Req(jsonArg) if err != nil { - return err, false + e := handleTimeout(err, f.reg.Name) + return e, false } fr := &FuncReply{} err = json.Unmarshal(res, fr) @@ -129,6 +133,19 @@ func (f *PortableFunc) Exec(ctx api.FunctionContext, args []any) (interface{}, b return fr.Result, fr.State } +func handleTimeout(err error, pname string) error { + if errors.Is(err, nerrors.ErrRecvTimeout) { + pm := GetPluginInsManager() + status, ok := pm.GetPluginInsStatus(pname) + if !ok { + return fmt.Errorf("plugin %s was removed", pname) + } else { + return fmt.Errorf("time out, plugin %s status %s, message: %s", pname, status.Status, status.ErrMsg) + } + } + return err +} + func (f *PortableFunc) IsAggregate() bool { if f.isAgg > 0 { return f.isAgg > 1 diff --git a/internal/plugin/portable/runtime/plugin_ins_manager.go b/internal/plugin/portable/runtime/plugin_ins_manager.go index f29941a432..14f494b2e8 100644 --- a/internal/plugin/portable/runtime/plugin_ins_manager.go +++ b/internal/plugin/portable/runtime/plugin_ins_manager.go @@ -301,7 +301,6 @@ func (p *pluginInsManager) GetOrStartProcess(pluginMeta *PluginMeta, pconf *Port cmd.Stdout = conf.Log.Out cmd.Stderr = conf.Log.Out cmd.Dir = filepath.Dir(pluginMeta.Executable) - conf.Log.Println("plugin starting") err = cmd.Start() failpoint.Inject("cmdStartErr", func() {