Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PodSpecPatch functionality #1687

Merged
merged 10 commits into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,10 @@
"type": "integer",
"format": "int64"
},
"podSpecPatch": {
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
},
"priority": {
"description": "Priority to apply to workflow pods.",
"type": "integer",
Expand Down Expand Up @@ -1489,6 +1493,10 @@
"description": "PriorityClassName to apply to workflow pods.",
"type": "string"
},
"podSpecPatch": {
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
},
"priority": {
"description": "Priority is used if controller is configured to process limited number of workflows in parallel. Workflows with higher priority are processed first.",
"type": "integer",
Expand Down
25 changes: 25 additions & 0 deletions examples/pod-spec-patch-wf-tmpl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: cpu-limit
value: 100m
- name: mem-limit
value: 100Mi
podSpecPatch: |
containers:
- name: main
resources:
limits:
memory: "{{workflow.parameters.mem-limit}}"
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
17 changes: 17 additions & 0 deletions examples/pod-spec-patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: cpu-limit
value: 100m
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "{{workflow.parameters.cpu-limit}}" }}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
22 changes: 22 additions & 0 deletions examples/pod-spec-yaml-patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-spec-patch-
spec:
entrypoint: whalesay
arguments:
parameters:
- name: mem-limit
value: 100Mi
podSpecPatch: |
containers:
- name: main
resources:
limits:
memory: "{{workflow.parameters.mem-limit}}"
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
14 changes: 14 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ type WorkflowSpec struct {
// Optional: Defaults to empty. See type description for default values of each field.
// +optional
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`

// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty"`
}

func (wfs *WorkflowSpec) HasPodSpecPatch() bool {
return wfs.PodSpecPatch != ""
}

// Template is a reusable and composable unit of execution in a workflow
Expand Down Expand Up @@ -351,6 +359,10 @@ type Template struct {
// Optional: Defaults to empty. See type description for default values of each field.
// +optional
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`

// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty"`
}

var _ TemplateHolder = &Template{}
Expand All @@ -374,6 +386,10 @@ func (tmpl *Template) GetBaseTemplate() *Template {
return baseTemplate
}

func (tmpl *Template) HasPodSpecPatch() bool {
return tmpl.PodSpecPatch != ""
}

// Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another
type Inputs struct {
// Parameters are a list of parameters passed as inputs
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,10 +1128,10 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}

newTmplCtx, basedTmpl, err := woc.getResolvedTemplate(node, orgTmpl, tmplCtx, args)

if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, wfv1.NodeTypeSkipped, orgTmpl, boundaryID, err), err
}

localParams := make(map[string]string)
if basedTmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName)
Expand Down
45 changes: 45 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasttemplate"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -79,6 +81,10 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume {
}
}

func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool {
return woc.wf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch()
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)
Expand Down Expand Up @@ -221,6 +227,45 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
}
}

// Apply the patch string from template
if woc.hasPodSpecPatch(tmpl) {
jsonstr, err := json.Marshal(pod.Spec)
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec")
}

tmpl.PodSpecPatch, err = util.PodSpecPatchMerge(woc.wf, tmpl)

if err != nil {
return nil, errors.Wrap(err, "", "Fail to marshal the Pod spec")
}

// Final substitution for workflow level PodSpecPatch
localParams := make(map[string]string)
if tmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.wf.NodeID(nodeName)
}
tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false)
if err != nil {
return nil, errors.Wrap(err, "", "Fail to substitute the PodSpecPatch variables")
}

var spec apiv1.PodSpec

if !util.ValidateJsonStr(tmpl.PodSpecPatch, spec) {
return nil, errors.New("", "Invalid PodSpecPatch String")
}

modJson, err := strategicpatch.StrategicMergePatch(jsonstr, []byte(tmpl.PodSpecPatch), apiv1.PodSpec{})

if err != nil {
return nil, errors.Wrap(err, "", "Error occurred during strategic merge patch")
}
err = json.Unmarshal(modJson, &pod.Spec)
if err != nil {
return nil, errors.Wrap(err, "", "Error in Unmarshalling after merge the patch")
}
}
created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(pod)
if err != nil {
if apierr.IsAlreadyExists(err) {
Expand Down
90 changes: 84 additions & 6 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"sigs.k8s.io/yaml"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
)

func unmarshalTemplate(yamlStr string) *wfv1.Template {
Expand Down Expand Up @@ -106,15 +106,16 @@ script:
source: |
ls -al
`

// TestScriptTemplateWithVolume ensure we can a script pod with input artifacts
func TestScriptTemplateWithoutVolumeOptionalArtifact(t *testing.T) {
volumeMount := apiv1.VolumeMount{
Name: "input-artifacts",
ReadOnly: false,
MountPath: "/manifest",
SubPath: "manifest",
Name: "input-artifacts",
ReadOnly: false,
MountPath: "/manifest",
SubPath: "manifest",
MountPropagation: nil,
SubPathExpr: "",
SubPathExpr: "",
}

// Ensure that volume mount is added when artifact is provided
Expand Down Expand Up @@ -880,3 +881,80 @@ func TestTmplLevelSecurityContext(t *testing.T) {
assert.NotNil(t, pod.Spec.SecurityContext)
assert.Equal(t, runAsUser, *pod.Spec.SecurityContext.RunAsUser)
}

var helloWorldWfWithPatch = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

var helloWorldWfWithWFPatch = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"cpu": "800m"}}}]}'
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

var helloWorldWfWithWFYMALPatch = `
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
podSpecPatch: |
containers:
- name: main
resources:
limits:
cpu: "800m"
templates:
- name: whalesay
podSpecPatch: '{"containers":[{"name":"main", "resources":{"limits":{"memory": "100Mi"}}}]}'
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

func TestPodSpecPatch(t *testing.T) {
wf := unmarshalWF(helloWorldWfWithPatch)
woc := newWoc(*wf)
mainCtr := woc.wf.Spec.Templates[0].Container
pod, _ := woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false)
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())

wf = unmarshalWF(helloWorldWfWithWFPatch)
woc = newWoc(*wf)
mainCtr = woc.wf.Spec.Templates[0].Container
pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false)
assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())

wf = unmarshalWF(helloWorldWfWithWFYMALPatch)
woc = newWoc(*wf)
mainCtr = woc.wf.Spec.Templates[0].Container
pod, _ = woc.createWorkflowPod(wf.Name, *mainCtr, &wf.Spec.Templates[0], false)

assert.Equal(t, "0.800", pod.Spec.Containers[1].Resources.Limits.Cpu().AsDec().String())
assert.Equal(t, "104857600", pod.Spec.Containers[1].Resources.Limits.Memory().AsDec().String())

}
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
Loading