Skip to content

Commit

Permalink
feat(controller): configurable terminationGracePeriodSeconds (#4940)
Browse files Browse the repository at this point in the history
Signed-off-by: Tianchu Zhao <[email protected]>
  • Loading branch information
tczhao authored Feb 8, 2021
1 parent 5824fc6 commit 5915a21
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 33 deletions.
33 changes: 33 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,39 @@ func (s *FunctionalSuite) TestK8SJSONPatch() {
})
}

func (s *FunctionalSuite) TestWorkflowPodSpecPatch() {
s.Given().
Workflow(`apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: basic
labels:
argo-e2e: true
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
args:
- echo
- ":) Hello Argo!"
podSpecPatch: '{"terminationGracePeriodSeconds":5, "containers":[{"name":"main", "resources":{"limits":{"cpu": "100m"}}}]}'
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflowNode(wfv1.SucceededPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
assert.Equal(t, *p.Spec.TerminationGracePeriodSeconds, int64(5))
for _, c := range p.Spec.Containers {
if c.Name == "main" {
assert.Equal(t, c.Resources.Limits.Cpu().String(), "100m")
}
}
})
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
10 changes: 3 additions & 7 deletions workflow/executor/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ const (
containerShimPrefix = "://"
)

// killGracePeriod is the time in seconds after sending SIGTERM before
// forcefully killing the sidecar with SIGKILL (value matches k8s)
const KillGracePeriod = 30

// GetContainerID returns container ID of a ContainerStatus resource
func GetContainerID(container *v1.ContainerStatus) string {
i := strings.Index(container.ContainerID, containerShimPrefix)
Expand Down Expand Up @@ -94,13 +90,13 @@ func TerminatePodWithContainerID(ctx context.Context, c KubernetesClientInterfac
}

// KillGracefully kills a container gracefully.
func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerID string) error {
func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerID string, terminationGracePeriodDuration time.Duration) error {
log.Infof("SIGTERM containerID %q: %s", containerID, syscall.SIGTERM.String())
err := TerminatePodWithContainerID(ctx, c, containerID, syscall.SIGTERM)
if err != nil {
return err
}
err = WaitForTermination(ctx, c, containerID, time.Second*KillGracePeriod)
err = WaitForTermination(ctx, c, containerID, terminationGracePeriodDuration*time.Second)
if err == nil {
log.Infof("ContainerID %q successfully killed", containerID)
return nil
Expand All @@ -110,7 +106,7 @@ func KillGracefully(ctx context.Context, c KubernetesClientInterface, containerI
if err != nil {
return err
}
err = WaitForTermination(ctx, c, containerID, time.Second*KillGracePeriod)
err = WaitForTermination(ctx, c, containerID, terminationGracePeriodDuration*time.Second)
if err != nil {
return err
}
Expand Down
50 changes: 50 additions & 0 deletions workflow/executor/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -121,3 +122,52 @@ func TestTerminatePodWithContainerID(t *testing.T) {
err = TerminatePodWithContainerID(ctx, mock, "container-id", syscall.SIGTERM)
assert.NoError(t, err)
}

// TestWaitForTermination ensure we SIGTERM container with input wait time
func TestWaitForTermination(t *testing.T) {
// Successfully SIGTERM Container
mock := &MockKC{
getContainerStatusContainerStatus: &v1.ContainerStatus{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
},
},
}
ctx := context.Background()
err := WaitForTermination(ctx, mock, "container-id", time.Duration(2)*time.Second)
assert.NoError(t, err)

// Fail SIGTERM Container
mock = &MockKC{
getContainerStatusContainerStatus: &v1.ContainerStatus{
State: v1.ContainerState{
Terminated: nil,
},
},
}
err = WaitForTermination(ctx, mock, "container-id", time.Duration(1)*time.Second)
assert.EqualError(t, err, "timeout after 1s")
}

// TestKillGracefully ensure we kill container gracefully with input wait time
func TestKillGracefully(t *testing.T) {
// Graceful SIGTERM Container
mock := &MockKC{
getContainerStatusPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Spec: v1.PodSpec{
RestartPolicy: "Never",
},
},
getContainerStatusContainerStatus: &v1.ContainerStatus{
State: v1.ContainerState{
Terminated: nil,
},
},
}
ctx := context.Background()
err := KillGracefully(ctx, mock, "container-id", 1)
assert.EqualError(t, err, "timeout after 1s")
}
9 changes: 4 additions & 5 deletions workflow/executor/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/argoproj/argo/v3/util"
"github.com/argoproj/argo/v3/util/file"
"github.com/argoproj/argo/v3/workflow/common"
execcommon "github.com/argoproj/argo/v3/workflow/executor/common"
)

type DockerExecutor struct{}
Expand Down Expand Up @@ -173,7 +172,7 @@ func (d *DockerExecutor) Wait(ctx context.Context, containerID string) error {
}

// killContainers kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
killArgs := append([]string{"kill", "--signal", "TERM"}, containerIDs...)
// docker kill will return with an error if a container has terminated already, which is not an error in this case.
// We therefore ignore any error. docker wait that follows will re-raise any other error with the container.
Expand All @@ -188,7 +187,7 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error
return errors.InternalWrapError(err)
}
// waitCh needs buffer of 1 so it can always send the result of waitCmd.Wait() without blocking.
// Otherwise, if the KillGracePeriod elapses and the forced kill branch is run, there would
// Otherwise, if the terminationGracePeriodSeconds elapses and the forced kill branch is run, there would
// be no receiver for waitCh and the goroutine would block forever
waitCh := make(chan error, 1)
go func() {
Expand All @@ -198,8 +197,8 @@ func (d *DockerExecutor) Kill(ctx context.Context, containerIDs []string) error
select {
case err = <-waitCh:
// waitCmd completed
case <-time.After(execcommon.KillGracePeriod * time.Second):
log.Infof("Timed out (%ds) for containers to terminate gracefully. Killing forcefully", execcommon.KillGracePeriod)
case <-time.After(terminationGracePeriodDuration * time.Second):
log.Infof("Timed out (%ds) for containers to terminate gracefully. Killing forcefully", terminationGracePeriodDuration)
forceKillArgs := append([]string{"kill", "--signal", "KILL"}, containerIDs...)
forceKillCmd := exec.Command("docker", forceKillArgs...)
log.Info(forceKillCmd.Args)
Expand Down
18 changes: 15 additions & 3 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type ContainerRuntimeExecutor interface {
Wait(ctx context.Context, containerID string) error

// Kill a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
Kill(ctx context.Context, containerIDs []string) error
Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error
}

// NewExecutor instantiates a new workflow executor
Expand Down Expand Up @@ -679,6 +679,16 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key
return val, nil
}

// GetTerminationGracePeriodDuration returns the terminationGracePeriodSeconds of podSpec in Time.Duration format
func (we *WorkflowExecutor) GetTerminationGracePeriodDuration(ctx context.Context) (time.Duration, error) {
pod, err := we.getPod(ctx)
if err != nil {
return time.Duration(0), err
}
terminationGracePeriodDuration := time.Duration(*pod.Spec.TerminationGracePeriodSeconds)
return terminationGracePeriodDuration, nil
}

// GetMainContainerStatus returns the container status of the main container, nil if the main container does not exist
func (we *WorkflowExecutor) GetMainContainerStatus(ctx context.Context) (*apiv1.ContainerStatus, error) {
pod, err := we.getPod(ctx)
Expand Down Expand Up @@ -1180,7 +1190,8 @@ func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, annotationsUpda
_ = we.AddAnnotation(ctx, common.AnnotationKeyNodeMessage, message)
log.Infof("Killing main container")
mainContainerID, _ := we.GetMainContainerID(ctx)
err := we.RuntimeExecutor.Kill(ctx, []string{mainContainerID})
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
err := we.RuntimeExecutor.Kill(ctx, []string{mainContainerID}, terminationGracePeriodDuration)
if err != nil {
log.Warnf("Failed to kill main container: %v", err)
}
Expand Down Expand Up @@ -1214,7 +1225,8 @@ func (we *WorkflowExecutor) KillSidecars(ctx context.Context) error {
if len(sidecarIDs) == 0 {
return nil
}
return we.RuntimeExecutor.Kill(ctx, sidecarIDs)
terminationGracePeriodDuration, _ := we.GetTerminationGracePeriodDuration(ctx)
return we.RuntimeExecutor.Kill(ctx, sidecarIDs, terminationGracePeriodDuration)
}

// LoadExecutionControl reads the execution control definition from the the Kubernetes downward api annotations volume file
Expand Down
5 changes: 3 additions & 2 deletions workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"syscall"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -89,6 +90,6 @@ func (c *k8sAPIClient) KillContainer(pod *corev1.Pod, container *corev1.Containe
return err
}

func (c *k8sAPIClient) killGracefully(ctx context.Context, containerID string) error {
return execcommon.KillGracefully(ctx, c, containerID)
func (c *k8sAPIClient) killGracefully(ctx context.Context, containerID string, terminationGracePeriodDuration time.Duration) error {
return execcommon.KillGracefully(ctx, c, containerID, terminationGracePeriodDuration)
}
5 changes: 3 additions & 2 deletions workflow/executor/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -66,10 +67,10 @@ func (k *K8sAPIExecutor) Wait(ctx context.Context, containerID string) error {
}

// Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (k *K8sAPIExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (k *K8sAPIExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
log.Infof("Killing containers %s", containerIDs)
for _, containerID := range containerIDs {
err := k.client.killGracefully(ctx, containerID)
err := k.client.killGracefully(ctx, containerID, terminationGracePeriodDuration)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/executor/kubelet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ func (k *kubeletClient) KillContainer(pod *corev1.Pod, container *corev1.Contain
return err
}

func (k *kubeletClient) KillGracefully(ctx context.Context, containerID string) error {
return execcommon.KillGracefully(ctx, k, containerID)
func (k *kubeletClient) KillGracefully(ctx context.Context, containerID string, terminationGracePeriodDuration time.Duration) error {
return execcommon.KillGracefully(ctx, k, containerID, terminationGracePeriodDuration)
}

func (k *kubeletClient) CopyArchive(ctx context.Context, containerID, sourcePath, destPath string) error {
Expand Down
5 changes: 3 additions & 2 deletions workflow/executor/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"time"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -62,9 +63,9 @@ func (k *KubeletExecutor) Wait(ctx context.Context, containerID string) error {
}

// Kill kills a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (k *KubeletExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (k *KubeletExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
for _, containerID := range containerIDs {
err := k.cli.KillGracefully(ctx, containerID)
err := k.cli.KillGracefully(ctx, containerID, terminationGracePeriodDuration)
if err != nil {
return err
}
Expand Down
12 changes: 7 additions & 5 deletions workflow/executor/mocks/ContainerRuntimeExecutor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ func (p *PNSExecutor) GetExitCode(ctx context.Context, containerID string) (stri
}

// Kill a list of containerIDs first with a SIGTERM then with a SIGKILL after a grace period
func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string) error {
func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string, terminationGracePeriodDuration time.Duration) error {
var asyncErr error
wg := sync.WaitGroup{}
for _, cid := range containerIDs {
wg.Add(1)
go func(containerID string) {
err := p.killContainer(containerID)
err := p.killContainer(containerID, terminationGracePeriodDuration)
if err != nil && asyncErr != nil {
asyncErr = err
}
Expand All @@ -260,7 +260,7 @@ func (p *PNSExecutor) Kill(ctx context.Context, containerIDs []string) error {
return asyncErr
}

func (p *PNSExecutor) killContainer(containerID string) error {
func (p *PNSExecutor) killContainer(containerID string, terminationGracePeriodDuration time.Duration) error {
pid, err := p.getContainerPID(containerID)
if err != nil {
log.Warnf("Ignoring kill container failure of %s: %v. Process assumed to have completed", containerID, err)
Expand All @@ -274,8 +274,7 @@ func (p *PNSExecutor) killContainer(containerID string) error {
if err != nil {
log.Warnf("Failed to SIGTERM pid %d: %v", pid, err)
}

waitPIDOpts := executil.WaitPIDOpts{Timeout: execcommon.KillGracePeriod * time.Second}
waitPIDOpts := executil.WaitPIDOpts{Timeout: terminationGracePeriodDuration * time.Second}
err = executil.WaitPID(pid, waitPIDOpts)
if err == nil {
log.Infof("PID %d completed", pid)
Expand Down

0 comments on commit 5915a21

Please sign in to comment.