Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions pkg/kbagent/service/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sync"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"golang.org/x/exp/maps"

"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
Expand Down Expand Up @@ -85,17 +84,14 @@ func (s *actionService) HandleRequest(ctx context.Context, payload []byte) ([]by
}
resp, err := s.handleRequest(ctx, req)
result := string(resp)
if err != nil {
result = err.Error()
}
s.logger.Info("Action Executed", "action", req.Action, "result", result)
s.logger.Info("Action Executed", "action", req.Action, "result", result, "err", err)
return s.encode(resp, err), nil
}

func (s *actionService) decode(payload []byte) (*proto.ActionRequest, error) {
req := &proto.ActionRequest{}
if err := json.Unmarshal(payload, req); err != nil {
return nil, errors.Wrapf(proto.ErrBadRequest, "unmarshal action request error: %s", err.Error())
return nil, fmt.Errorf("%w: unmarshal action request error: %w", proto.ErrBadRequest, err)
}
return req, nil
}
Expand All @@ -114,11 +110,11 @@ func (s *actionService) encode(out []byte, err error) []byte {

func (s *actionService) handleRequest(ctx context.Context, req *proto.ActionRequest) ([]byte, error) {
if _, ok := s.actions[req.Action]; !ok {
return nil, errors.Wrapf(proto.ErrNotDefined, "%s is not defined", req.Action)
return nil, fmt.Errorf("%w: %s is not defined", proto.ErrNotDefined, req.Action)
}
action := s.actions[req.Action]
if action.Exec == nil {
return nil, errors.Wrap(proto.ErrNotImplemented, "only exec action is supported")
return nil, fmt.Errorf("%w: only exec action is supported", proto.ErrNotImplemented)
}
// HACK: pre-check for the reconfigure action
if err := checkReconfigure(ctx, req); err != nil {
Expand Down Expand Up @@ -154,8 +150,5 @@ func (s *actionService) handleExecActionNonBlocking(ctx context.Context, req *pr
return nil, proto.ErrInProgress
}
delete(s.runningActions, req.Action)
if (*result).err != nil {
return nil, (*result).err
}
return (*result).stdout.Bytes(), nil
return (*result).stdout.Bytes(), wrapExecError((*result).err, (*result).stderr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not operate on stdout when a failure occurs. This is the behavior defined by the API.

}
31 changes: 16 additions & 15 deletions pkg/kbagent/service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ package service
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strings"
"syscall"
"time"

"github.com/pkg/errors"

"github.com/apecloud/kubeblocks/pkg/kbagent/proto"
"github.com/apecloud/kubeblocks/pkg/kbagent/util"
)
Expand Down Expand Up @@ -63,19 +63,15 @@ func runCommand(ctx context.Context, action *proto.ExecAction, parameters map[st
return nil, err
}
result := <-resultChan
err = result.err
if err != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
errMsg := fmt.Sprintf("exit code: %d", exitErr.ExitCode())
if stderrMsg := result.stderr.String(); len(stderrMsg) > 0 {
errMsg += fmt.Sprintf(", stderr: %s", stderrMsg)
}
return nil, errors.Wrapf(proto.ErrFailed, errMsg)
}
return nil, err
return result.stdout.Bytes(), wrapExecError(result.err, result.stderr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}

func wrapExecError(err error, stderr *bytes.Buffer) error {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
return fmt.Errorf("%w: exit code: %d, stderr: %s", proto.ErrFailed, exitErr.ExitCode(), stderr.String())
}
return result.stdout.Bytes(), nil
return err
}

func runCommandNonBlocking(ctx context.Context, action *proto.ExecAction, parameters map[string]string, timeout *int32) (chan *commandResult, error) {
Expand Down Expand Up @@ -149,6 +145,11 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
cmd.Stdin = stdinReader
cmd.Stdout = stdoutWriter
cmd.Stderr = stderrWriter
cmd.WaitDelay = time.Second * 1
// gracefully terminate, go will kill it after waitDelay
cmd.Cancel = func() error {
return cmd.Process.Signal(syscall.SIGTERM)
}

errChan := make(chan error, 1)
go func() {
Expand All @@ -159,7 +160,7 @@ func runCommandX(ctx context.Context, action *proto.ExecAction, parameters map[s
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
errChan <- proto.ErrTimedOut
} else {
errChan <- errors.Wrapf(proto.ErrFailed, "failed to start command: %v", err)
errChan <- fmt.Errorf("%w: failed to start command: %w", proto.ErrFailed, err)
}
return
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/kbagent/service/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,22 +271,20 @@ var _ = Describe("command", func() {
action := &proto.ExecAction{
Commands: []string{"/bin/bash", "-c", "command-not-found"},
}
output, err := runCommand(ctx, action, nil, nil)
_, err := runCommand(ctx, action, nil, nil)
Expect(err).ShouldNot(BeNil())
Expect(errors.Is(err, proto.ErrFailed)).Should(BeTrue())
Expect(err.Error()).Should(ContainSubstring("command not found"))
Expect(output).Should(BeNil())
})

It("timeout", func() {
action := &proto.ExecAction{
Commands: []string{"/bin/bash", "-c", "sleep 60"},
}
timeout := int32(1)
output, err := runCommand(ctx, action, nil, &timeout)
_, err := runCommand(ctx, action, nil, &timeout)
Expect(err).ShouldNot(BeNil())
Expect(errors.Is(err, proto.ErrTimedOut)).Should(BeTrue())
Expect(output).Should(BeNil())
})
})
})