Skip to content

Commit

Permalink
fix: Fork sub-process. Fixes #8454 (#8906)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Jun 10, 2022
1 parent 750c4e1 commit 416fce7
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 77 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,11 @@ lint: server/static/files.go $(GOPATH)/bin/golangci-lint

# for local we have a faster target that prints to stdout, does not use json, and can cache because it has no coverage
.PHONY: test
test: server/static/files.go dist/argosay
test: server/static/files.go
go build ./...
env KUBECONFIG=/dev/null $(GOTEST) ./...
# marker file, based on it's modification time, we know how long ago this target was run
@mkdir -p dist
touch dist/test

.PHONY: install
Expand Down
51 changes: 22 additions & 29 deletions cmd/argoexec/commands/emissary.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"syscall"
"time"

"github.com/argoproj/argo-workflows/v3/util/errors"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -60,18 +62,6 @@ func NewEmissaryCommand() *cobra.Command {

name, args := args[0], args[1:]

signals := make(chan os.Signal, 1)
defer close(signals)
signal.Notify(signals)
defer signal.Reset()
go func() {
for s := range signals {
if !osspecific.IsSIGCHLD(s) {
_ = osspecific.Kill(-os.Getpid(), s.(syscall.Signal))
}
}
}()

data, err := ioutil.ReadFile(varRunArgo + "/template")
if err != nil {
return fmt.Errorf("failed to read template: %w", err)
Expand Down Expand Up @@ -127,25 +117,28 @@ func NewEmissaryCommand() *cobra.Command {
return fmt.Errorf("failed to get retry strategy: %w", err)
}

var command *exec.Cmd
var stdout *os.File
var combined *os.File
cmdErr := retry.OnError(backoff, func(error) bool { return true }, func() error {
if stdout != nil {
stdout.Close()
}
if combined != nil {
combined.Close()
}
command, stdout, combined, err = createCommand(name, args, template)
command, stdout, combined, err := createCommand(name, args, template)
if err != nil {
return fmt.Errorf("failed to create command: %w", err)
}

defer stdout.Close()
defer combined.Close()
signals := make(chan os.Signal, 1)
defer close(signals)
signal.Notify(signals)
defer signal.Reset()
if err := command.Start(); err != nil {
return err
}

go func() {
for s := range signals {
if !osspecific.IsSIGCHLD(s) {
_ = osspecific.Kill(command.Process.Pid, s.(syscall.Signal))
}
}
}()
pid := command.Process.Pid
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand All @@ -158,16 +151,16 @@ func NewEmissaryCommand() *cobra.Command {
_ = os.Remove(varRunArgo + "/ctr/" + containerName + "/signal")
s, _ := strconv.Atoi(string(data))
if s > 0 {
_ = osspecific.Kill(command.Process.Pid, syscall.Signal(s))
_ = osspecific.Kill(pid, syscall.Signal(s))
}
time.Sleep(2 * time.Second)
}
}
}()
return command.Wait()
return osspecific.Wait(command.Process)

})
defer stdout.Close()
defer combined.Close()
logger.WithError(err).Info("sub-process exited")

if _, ok := os.LookupEnv("ARGO_DEBUG_PAUSE_AFTER"); ok {
for {
Expand All @@ -184,7 +177,7 @@ func NewEmissaryCommand() *cobra.Command {

if cmdErr == nil {
exitCode = 0
} else if exitError, ok := cmdErr.(*exec.ExitError); ok {
} else if exitError, ok := cmdErr.(errors.Exited); ok {
if exitError.ExitCode() >= 0 {
exitCode = exitError.ExitCode()
} else {
Expand Down
44 changes: 19 additions & 25 deletions cmd/argoexec/commands/emissary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package commands
import (
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"testing"
"time"

"github.com/argoproj/argo-workflows/v3/util/errors"

"github.com/stretchr/testify/assert"
)
Expand All @@ -20,45 +19,40 @@ func TestEmissary(t *testing.T) {
varRunArgo = tmp
includeScriptOutput = true

wd, err := os.Getwd()
assert.NoError(t, err)

x := filepath.Join(wd, "../../../dist/argosay")

err = ioutil.WriteFile(varRunArgo+"/template", []byte(`{}`), 0o600)
err := ioutil.WriteFile(varRunArgo+"/template", []byte(`{}`), 0o600)
assert.NoError(t, err)

t.Run("Exit0", func(t *testing.T) {
err := run(x, []string{"exit"})
err := run("exit")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "0", string(data))
})

t.Run("Exit1", func(t *testing.T) {
err := run(x, []string{"exit", "1"})
assert.Equal(t, 1, err.(*exec.ExitError).ExitCode())
err := run("exit 1")
assert.Equal(t, 1, err.(errors.Exited).ExitCode())
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/exitcode")
assert.NoError(t, err)
assert.Equal(t, "1", string(data))
})
t.Run("Stdout", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stdout"})
err := run("echo hello")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/stdout")
assert.NoError(t, err)
assert.Contains(t, string(data), "hello")
})
t.Run("Comined", func(t *testing.T) {
err := run(x, []string{"echo", "hello", "/dev/stderr"})
err := run("echo hello > /dev/stderr")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/ctr/main/combined")
assert.NoError(t, err)
assert.Contains(t, string(data), "hello")
})
t.Run("Signal", func(t *testing.T) {
for signal, message := range map[syscall.Signal]string{
for signal := range map[syscall.Signal]string{
syscall.SIGTERM: "terminated",
syscall.SIGKILL: "killed",
} {
Expand All @@ -68,10 +62,10 @@ func TestEmissary(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
err := run(x, []string{"sleep", "5s"})
assert.EqualError(t, err, "signal: "+message)
err := run("sleep 3")
assert.NoError(t, err)
}()
time.Sleep(time.Second)
wg.Wait()
}
})
t.Run("Artifact", func(t *testing.T) {
Expand All @@ -85,7 +79,7 @@ func TestEmissary(t *testing.T) {
}
`), 0o600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/artifact"})
err := run("echo hello > /tmp/artifact")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
Expand All @@ -102,7 +96,7 @@ func TestEmissary(t *testing.T) {
}
`), 0o600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/artifact"})
err := run("echo hello > /tmp/artifact")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
Expand All @@ -121,7 +115,7 @@ func TestEmissary(t *testing.T) {
}
`), 0o600)
assert.NoError(t, err)
err := run(x, []string{"echo", "hello", "/tmp/parameter"})
err := run("echo hello > /tmp/parameter")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/parameters/tmp/parameter")
assert.NoError(t, err)
Expand Down Expand Up @@ -151,7 +145,7 @@ func TestEmissary(t *testing.T) {
`), 0o600)
assert.NoError(t, err)
_ = os.Remove("test.txt")
err = run(x, []string{"sh", "./test/containerSetRetryTest.sh", "/tmp/artifact"})
err = run("sh ./test/containerSetRetryTest.sh /tmp/artifact")
assert.Error(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
Expand Down Expand Up @@ -181,16 +175,16 @@ func TestEmissary(t *testing.T) {
`), 0o600)
assert.NoError(t, err)
_ = os.Remove("test.txt")
err = run(x, []string{"sh", "./test/containerSetRetryTest.sh", "/tmp/artifact"})
err = run("sh ./test/containerSetRetryTest.sh /tmp/artifact")
assert.NoError(t, err)
data, err := ioutil.ReadFile(varRunArgo + "/outputs/artifacts/tmp/artifact.tgz")
assert.NoError(t, err)
assert.NotEmpty(t, string(data)) // data is tgz format
})
}

func run(name string, args []string) error {
func run(script string) error {
cmd := NewEmissaryCommand()
containerName = "main"
return cmd.RunE(cmd, append([]string{name}, args...))
return cmd.RunE(cmd, append([]string{"sh", "-c"}, script))
}
5 changes: 3 additions & 2 deletions cmd/argoexec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package main

import (
"os"
"os/exec"

"github.com/argoproj/argo-workflows/v3/util/errors"

// load authentication plugin for obtaining credentials from cloud providers.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand All @@ -14,7 +15,7 @@ import (
func main() {
err := commands.NewRootCommand().Execute()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
if exitError, ok := err.(errors.Exited); ok {
if exitError.ExitCode() >= 0 {
os.Exit(exitError.ExitCode())
} else {
Expand Down
8 changes: 8 additions & 0 deletions test/e2e/signals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ func (s *SignalsSuite) TestInjectedSidecarKillAnnotation() {
WaitForWorkflow(fixtures.ToBeSucceeded, kill2xDuration)
}

func (s *SignalsSuite) TestSubProcess() {
s.Given().
Workflow("@testdata/subprocess-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow()
}

func TestSignalsSuite(t *testing.T) {
suite.Run(t, new(SignalsSuite))
}
15 changes: 15 additions & 0 deletions test/e2e/testdata/subprocess-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: subprocess-
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v1
command: [ sh, -c ]
args:
- |
sleep 60 &
ps -aef
24 changes: 24 additions & 0 deletions util/errors/exec_err.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package errors

import "fmt"

type Exited interface {
ExitCode() int
}

func NewExitErr(exitCode int) error {
if exitCode > 0 {
return execErr(exitCode)
}
return nil
}

type execErr int

func (e execErr) ExitCode() int {
return int(e)
}

func (e execErr) Error() string {
return fmt.Sprintf("exit status %d", e)
}
31 changes: 31 additions & 0 deletions workflow/executor/os-specific/signal_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package os_specific
import (
"os"
"syscall"
"time"

"github.com/argoproj/argo-workflows/v3/util/errors"
)

func IsSIGCHLD(s os.Signal) bool { return s == syscall.SIGCHLD }
Expand All @@ -18,3 +21,31 @@ func Kill(pid int, s syscall.Signal) error {
func Setpgid(a *syscall.SysProcAttr) {
a.Setpgid = true
}

func Wait(process *os.Process) error {
// We must copy the behaviour of Kubernetes in how we handle sub-processes.
// Kubernetes only waits on PID 1, not on any sub-process that process might fork.
// The only way for those forked processes to run in the background is to background the
// sub-process by calling Process.Release.
// Background processes always become zombies when they exit.
// Because the sub-process is now running in the background it will become a zombie,
// so we must wait for it.
// Because we run the process in the background, we cannot Process.Wait for it to get the exit code.
// Instead, we can reap it to get the exit code
pid := process.Pid
if err := process.Release(); err != nil {
return err
}

for {
var s syscall.WaitStatus
wpid, err := syscall.Wait4(-1, &s, syscall.WNOHANG, nil)
if err != nil {
return err
}
if wpid == pid {
return errors.NewExitErr(s.ExitStatus())
}
time.Sleep(time.Second)
}
}
20 changes: 0 additions & 20 deletions workflow/executor/os-specific/signal_linux.go

This file was deleted.

1 change: 1 addition & 0 deletions workflow/executor/os-specific/signal_linux.go
Loading

0 comments on commit 416fce7

Please sign in to comment.