Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
93390b1
WIP: Informer working
simster7 Nov 4, 2019
8e63ce2
WIP: Working on cron infra
simster7 Nov 5, 2019
8fd042d
WIP: Won't work
simster7 Nov 6, 2019
66ab628
WIP: Toy now working
simster7 Nov 7, 2019
5d98393
WIP: Object deletion
simster7 Nov 7, 2019
c6d65fc
WIP: Trying to make API call
simster7 Nov 12, 2019
eb5a0f5
WIP: Can now run WFs
simster7 Nov 12, 2019
30b6acf
WIP: Concurrency policy
simster7 Nov 12, 2019
5febabd
Merge branch 'master' into cron
simster7 Nov 12, 2019
4358994
WIP
simster7 Nov 12, 2019
96790cd
Minor
simster7 Nov 12, 2019
29165bf
WIP: StartingDeadlineSeconds now works
simster7 Nov 13, 2019
8ef1f61
WIP: Fixed startingDeadlineSeconds
simster7 Nov 14, 2019
61df6fc
WIP: Minor rework
simster7 Nov 14, 2019
0ee7573
WIP: Working on WF informer
simster7 Nov 14, 2019
3af4e0e
WIP: Informer working
simster7 Nov 14, 2019
2b65ae9
WIP: Major refactor, should be way cleaner now
simster7 Nov 14, 2019
5cc826b
Merge branch 'master' into cron
simster7 Nov 14, 2019
c1bfe83
WIP: Lint
simster7 Nov 14, 2019
67f4ab8
WIP: First CLI commands
simster7 Nov 15, 2019
acb4188
WIP: Almost done with CLI
simster7 Nov 15, 2019
0d2dce0
Done with CLI
simster7 Nov 15, 2019
d129c99
WIP
simster7 Nov 17, 2019
b55b6a1
Merge branch 'master' into cron
simster7 Nov 18, 2019
2e7b2ab
WIP: Reworked parent/child relationship
simster7 Nov 18, 2019
497a168
WIP: Lint
simster7 Nov 18, 2019
82c692c
WIP: Removing from Active now works
simster7 Nov 18, 2019
9c26d27
WIP: Remove runtimeGenerateName
simster7 Nov 18, 2019
9ac51b4
MVP Done
simster7 Nov 18, 2019
220f2eb
Lint
simster7 Nov 18, 2019
6f52a21
Merge branch 'master' into cron
simster7 Nov 26, 2019
bcc35e8
Move Options under Spec and Spec under WorkflowSpec
simster7 Nov 26, 2019
d45902c
Merge branch 'master' into cron
simster7 Dec 10, 2019
4c4d618
WIP: History limit
simster7 Dec 10, 2019
e89b4bb
{successful,failed}JobsHistoryLimit
simster7 Dec 11, 2019
35bc731
Fix validation bug
simster7 Dec 12, 2019
f6513c6
Merge branch 'master' into cron
alexec Dec 19, 2019
71d073b
make codegen manifests
alexec Dec 19, 2019
134f573
Alex's comments
simster7 Dec 26, 2019
aabdc66
Merge branch 'master' into cron
simster7 Jan 3, 2020
16789f3
WIP
simster7 Jan 3, 2020
ec1d26c
Added tests
simster7 Jan 3, 2020
192ac2d
Enhanced test
simster7 Jan 3, 2020
6c43791
Test fixes
simster7 Jan 6, 2020
98d7905
Rename function
simster7 Jan 6, 2020
cc7af61
Merge branch 'master' into cron
simster7 Jan 6, 2020
c4ec587
Run CronWorkflow test
simster7 Jan 6, 2020
898f6dd
Run tests in parallel
simster7 Jan 6, 2020
b3f244a
No parallel tests
simster7 Jan 7, 2020
25a035f
Potential fix to test
simster7 Jan 7, 2020
2f6bec2
Increase e2e timeout
simster7 Jan 7, 2020
2c4c7a6
Increase e2e timeout
simster7 Jan 7, 2020
43ba51f
Merge branch 'master' into cron
simster7 Jan 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 161 additions & 125 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ endif

.PHONY: test
test:
go test -covermode=count -coverprofile=coverage.out ./...
go test -covermode=count -coverprofile=coverage.out `go list ./... | grep -v 'test/e2e'`

.PHONY: cover
cover:
Expand Down Expand Up @@ -195,7 +195,7 @@ logs-e2e:

.PHONY: test-e2e
test-e2e:
go test -v -count 1 -p 1 ./test/e2e
go test -timeout 20m -v -count 1 -p 1 ./test/e2e/...

.PHONY: clean
clean:
Expand Down
109 changes: 109 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,115 @@
}
}
},
"io.argoproj.workflow.v1alpha1.CronWorkflow": {
"description": "CronWorkflow is the definition of a scheduled workflow resource",
"type": "object",
"required": [
"spec",
"status"
],
"properties": {
"apiVersion": {
"description": "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources",
"type": "string"
},
"kind": {
"description": "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
"type": "string"
},
"metadata": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta"
},
"spec": {
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.CronWorkflowSpec"
},
"status": {
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.CronWorkflowStatus"
}
}
},
"io.argoproj.workflow.v1alpha1.CronWorkflowList": {
"description": "CronWorkflowList is list of CronWorkflow resources",
"type": "object",
"required": [
"metadata",
"items"
],
"properties": {
"apiVersion": {
"description": "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources",
"type": "string"
},
"items": {
"type": "array",
"items": {
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.CronWorkflow"
}
},
"kind": {
"description": "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds",
"type": "string"
},
"metadata": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.ListMeta"
}
}
},
"io.argoproj.workflow.v1alpha1.CronWorkflowSpec": {
"type": "object",
"required": [
"schedule"
],
"properties": {
"concurrencyPolicy": {
"description": "ConcurrencyPolicy is the K8s-style concurrency policy that will be used",
"type": "string"
},
"failedJobsHistoryLimit": {
"description": "FailedJobsHistoryLimit is the number of successful jobs to be kept at a time",
"type": "integer",
"format": "int32"
},
"schedule": {
"description": "Schedule is a schedule to run the Workflow in Cron format",
"type": "string"
},
"startingDeadlineSeconds": {
"description": "StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its original scheduled time if it is missed.",
"type": "integer",
"format": "int64"
},
"successfulJobsHistoryLimit": {
"description": "SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time",
"type": "integer",
"format": "int32"
},
"suspend": {
"description": "Suspend is a flag that will stop new CronWorkflows from running if set to true",
"type": "boolean"
},
"workflowSpec": {
"description": "WorkflowSpec is the spec of the workflow to be run",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.WorkflowSpec"
}
}
},
"io.argoproj.workflow.v1alpha1.CronWorkflowStatus": {
"type": "object",
"properties": {
"active": {
"description": "Active is a list of active workflows stemming from this CronWorkflow",
"type": "array",
"items": {
"$ref": "#/definitions/io.k8s.api.core.v1.ObjectReference"
}
},
"lastScheduledTime": {
"description": "LastScheduleTime is the last time the CronWorkflow was scheduled",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Time"
}
}
},
"io.argoproj.workflow.v1alpha1.DAGTask": {
"description": "DAGTask represents a node in the graph during DAG execution",
"type": "object",
Expand Down
77 changes: 77 additions & 0 deletions cmd/argo/commands/cron/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cron

import (
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/templateresolution"
"log"

"github.com/argoproj/argo/pkg/client/clientset/versioned"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

// Global variables
var (
restConfig *rest.Config
clientConfig clientcmd.ClientConfig
clientset *kubernetes.Clientset
wfClientset *versioned.Clientset
cronWfClient v1alpha1.CronWorkflowInterface
wftmplClient v1alpha1.WorkflowTemplateInterface
namespace string
)

func initKubeClient() *kubernetes.Clientset {
if clientset != nil {
return clientset
}
var err error
restConfig, err = clientConfig.ClientConfig()
if err != nil {
log.Fatal(err)
}

// create the clientset
clientset, err = kubernetes.NewForConfig(restConfig)
if err != nil {
log.Fatal(err)
}
return clientset
}

// InitCronWorkflowClient creates a new client for the Kubernetes WorkflowTemplate CRD.
func InitCronWorkflowClient(ns ...string) v1alpha1.CronWorkflowInterface {
if cronWfClient != nil {
return cronWfClient
}
initKubeClient()
var err error
if len(ns) > 0 {
namespace = ns[0]
} else {
namespace, _, err = clientConfig.Namespace()
if err != nil {
log.Fatal(err)
}
}
wfClientset = versioned.NewForConfigOrDie(restConfig)
cronWfClient = wfClientset.ArgoprojV1alpha1().CronWorkflows(namespace)
wftmplClient = wfClientset.ArgoprojV1alpha1().WorkflowTemplates(namespace)
return cronWfClient
}

// LazyWorkflowTemplateGetter is a wrapper of v1alpha1.WorkflowTemplateInterface which
// supports lazy initialization.
type LazyWorkflowTemplateGetter struct{}

// Get initializes it just before it's actually used and returns a retrieved workflow template.
func (c LazyWorkflowTemplateGetter) Get(name string) (*wfv1.WorkflowTemplate, error) {
if wftmplClient == nil {
_ = InitCronWorkflowClient()
}
return templateresolution.WrapWorkflowTemplateInterface(wftmplClient).Get(name)
}

var _ templateresolution.WorkflowTemplateNamespacedGetter = &LazyWorkflowTemplateGetter{}
97 changes: 97 additions & 0 deletions cmd/argo/commands/cron/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package cron

import (
"github.com/argoproj/argo/workflow/templateresolution"
"log"
"os"

"github.com/argoproj/pkg/json"
"github.com/spf13/cobra"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
"github.com/argoproj/argo/workflow/validate"
)

type cliCreateOpts struct {
output string // --output
strict bool // --strict
}

func NewCreateCommand() *cobra.Command {
var (
cliCreateOpts cliCreateOpts
)
var command = &cobra.Command{
Use: "create FILE1 FILE2...",
Short: "create a cron workflow",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}

CreateCronWorkflows(args, &cliCreateOpts)
},
}
command.Flags().StringVarP(&cliCreateOpts.output, "output", "o", "", "Output format. One of: name|json|yaml|wide")
command.Flags().BoolVar(&cliCreateOpts.strict, "strict", true, "perform strict workflow validation")
return command
}

func CreateCronWorkflows(filePaths []string, cliOpts *cliCreateOpts) {
defaultCronWfClient := InitCronWorkflowClient()

fileContents, err := util.ReadManifest(filePaths...)
if err != nil {
log.Fatal(err)
}

var cronWorkflows []wfv1.CronWorkflow
for _, body := range fileContents {
cronWfs := unmarshalCronWorkflows(body, cliOpts.strict)
cronWorkflows = append(cronWorkflows, cronWfs...)
}

if len(cronWorkflows) == 0 {
log.Println("No CronWorkflows found in given files")
os.Exit(1)
}

for _, cronWf := range cronWorkflows {
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(wftmplClient)
err := validate.ValidateCronWorkflow(wftmplGetter, &cronWf)
if err != nil {
log.Fatalf("Failed to validate cron workflow: %v", err)
}
cronWfClient := defaultCronWfClient
if cronWf.Namespace != "" {
cronWfClient = InitCronWorkflowClient(cronWf.Namespace)
}
created, err := cronWfClient.Create(&cronWf)
if err != nil {
log.Fatalf("Failed to create workflow template: %v", err)
}
printCronWorkflowTemplate(created, cliOpts.output)
}
}

// unmarshalCronWorkflows unmarshals the input bytes as either json or yaml
func unmarshalCronWorkflows(wfBytes []byte, strict bool) []wfv1.CronWorkflow {
var cronWf wfv1.CronWorkflow
var jsonOpts []json.JSONOpt
if strict {
jsonOpts = append(jsonOpts, json.DisallowUnknownFields)
}
err := json.Unmarshal(wfBytes, &cronWf, jsonOpts...)
if err == nil {
return []wfv1.CronWorkflow{cronWf}
}
yamlWfs, err := common.SplitCronWorkflowYAMLFile(wfBytes, strict)
if err == nil {
return yamlWfs
}
log.Fatalf("Failed to parse workflow template: %v", err)
return nil
}
59 changes: 59 additions & 0 deletions cmd/argo/commands/cron/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package cron

import (
"fmt"
"log"
"os"

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
)

// NewDeleteCommand returns a new instance of an `argo delete` command
func NewDeleteCommand() *cobra.Command {
var (
all bool
)

var command = &cobra.Command{
Use: "delete CRON_WORKFLOW",
Short: "delete a cron workflow",
Run: func(cmd *cobra.Command, args []string) {
cronWfClient := InitCronWorkflowClient()
if all {
deleteCronWorkflows(cronWfClient, metav1.ListOptions{})
} else {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
for _, wftmplName := range args {
deleCronWorkflow(cronWfClient, wftmplName)
}
}
},
}

command.Flags().BoolVar(&all, "all", false, "Delete all workflow templates")
return command
}

func deleCronWorkflow(cronWfClient v1alpha1.CronWorkflowInterface, cronWf string) {
err := cronWfClient.Delete(cronWf, &metav1.DeleteOptions{})
if err != nil {
log.Fatal(err)
}
fmt.Printf("CronWorkflow '%s' deleted\n", cronWf)
}

func deleteCronWorkflows(cronWfClient v1alpha1.CronWorkflowInterface, options metav1.ListOptions) {
cronWfList, err := cronWfClient.List(options)
if err != nil {
log.Fatal(err)
}
for _, cronWf := range cronWfList.Items {
deleCronWorkflow(cronWfClient, cronWf.ObjectMeta.Name)
}
}
Loading