Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(portable): plugin error message for func #3305

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/plugin/portable/runtime/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
}
}
if err != nil {
return nil, fmt.Errorf("can't send message on function rep socket: %s", err.Error())
return nil, err

Check warning on line 154 in internal/plugin/portable/runtime/connection.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/runtime/connection.go#L154

Added line #L154 was not covered by tests
}
result, e := r.sock.Recv()
if e != nil {
Expand Down
21 changes: 19 additions & 2 deletions internal/plugin/portable/runtime/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

import (
"encoding/json"
"errors"
"fmt"

"github.com/lf-edge/ekuiper/contract/v2/api"
nerrors "go.nanomsg.org/mangos/v3/errors"

"github.com/lf-edge/ekuiper/v2/internal/conf"
kctx "github.com/lf-edge/ekuiper/v2/internal/topo/context"
Expand Down Expand Up @@ -86,7 +88,8 @@
}
res, err := f.dataCh.Req(jsonArg)
if err != nil {
return err
e := handleTimeout(err, f.reg.Name)
return e

Check warning on line 92 in internal/plugin/portable/runtime/function.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/runtime/function.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}
fr := &FuncReply{}
err = json.Unmarshal(res, fr)
Expand All @@ -112,7 +115,8 @@
}
res, err := f.dataCh.Req(jsonArg)
if err != nil {
return err, false
e := handleTimeout(err, f.reg.Name)
return e, false

Check warning on line 119 in internal/plugin/portable/runtime/function.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/runtime/function.go#L118-L119

Added lines #L118 - L119 were not covered by tests
}
fr := &FuncReply{}
err = json.Unmarshal(res, fr)
Expand All @@ -129,6 +133,19 @@
return fr.Result, fr.State
}

func handleTimeout(err error, pname string) error {
if errors.Is(err, nerrors.ErrRecvTimeout) {
pm := GetPluginInsManager()
status, ok := pm.GetPluginInsStatus(pname)
if !ok {
return fmt.Errorf("plugin %s was removed", pname)
} else {
return fmt.Errorf("time out, plugin %s status %s, message: %s", pname, status.Status, status.ErrMsg)
}

Check warning on line 144 in internal/plugin/portable/runtime/function.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/runtime/function.go#L136-L144

Added lines #L136 - L144 were not covered by tests
}
return err

Check warning on line 146 in internal/plugin/portable/runtime/function.go

View check run for this annotation

Codecov / codecov/patch

internal/plugin/portable/runtime/function.go#L146

Added line #L146 was not covered by tests
}

func (f *PortableFunc) IsAggregate() bool {
if f.isAgg > 0 {
return f.isAgg > 1
Expand Down
1 change: 0 additions & 1 deletion internal/plugin/portable/runtime/plugin_ins_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ func (p *pluginInsManager) GetOrStartProcess(pluginMeta *PluginMeta, pconf *Port
cmd.Stdout = conf.Log.Out
cmd.Stderr = conf.Log.Out
cmd.Dir = filepath.Dir(pluginMeta.Executable)

conf.Log.Println("plugin starting")
err = cmd.Start()
failpoint.Inject("cmdStartErr", func() {
Expand Down
Loading