Skip to content

Commit de7ef0e

Browse files
committed
(#51) additional packager settings
This supports Retry Policy and End State Discard settings for packages Did some refactors and updates to make supporting that easier and so also made the cli generation a bit more dynamic. Logging in generated main.go is more detailed Signed-off-by: R.I.Pienaar <[email protected]>
1 parent a562aeb commit de7ef0e

11 files changed

+161
-38
lines changed

ajc/package_command.go

+10
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"os"
1010
"path/filepath"
11+
"strings"
1112

1213
"github.com/choria-io/asyncjobs/generators"
1314
"gopkg.in/alecthomas/kingpin.v2"
@@ -58,6 +59,9 @@ func (c *packageCommand) dockerAction(_ *kingpin.ParseContext) error {
5859
if h.Name == "" {
5960
h.Name = "choria.io/asyncjobs/handlers"
6061
}
62+
if h.RetryPolicy == "" {
63+
h.RetryPolicy = "default"
64+
}
6165

6266
if len(h.TaskHandlers) == 0 {
6367
return fmt.Errorf("no task handlers specified in %s", c.file)
@@ -68,6 +72,12 @@ func (c *packageCommand) dockerAction(_ *kingpin.ParseContext) error {
6872
table.AddRow("NATS Context Name", h.ContextName)
6973
table.AddRow("Work Queue", h.WorkQueue)
7074
table.AddRow("Task Handlers", len(h.TaskHandlers))
75+
table.AddRow("Retry Backoff Policy", h.RetryPolicy)
76+
if len(h.DiscardStates) > 0 {
77+
table.AddRow("End State Discard", strings.Join(h.DiscardStates, ", "))
78+
} else {
79+
table.AddRow("End State Discard", "none")
80+
}
7181
table.AddRow("github.com/choria-io/asyncjobs", h.AJVersion)
7282
fmt.Println(table.Render())
7383

ajc/task_command.go

+22-28
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
"fmt"
1111
"os"
1212
"os/exec"
13+
"strings"
1314
"time"
1415

15-
"github.com/choria-io/asyncjobs"
16+
aj "github.com/choria-io/asyncjobs"
1617
"github.com/dustin/go-humanize"
1718
"github.com/nats-io/jsm.go"
1819
"github.com/xlab/tablewriter"
@@ -79,18 +80,20 @@ func configureTaskCommand(app *kingpin.Application) {
7980
watch := tasks.Command("watch", "Watch job updates in real time").Action(c.watchAction)
8081
watch.Flag("task", "Watch for updates related to a specific task ID").StringVar(&c.id)
8182

83+
policies := aj.RetryPolicyNames()
84+
8285
process := tasks.Command("process", "Process Tasks from a given queue").Action(c.processAction)
8386
process.Arg("type", "Types of Tasks to process").Required().Envar("AJC_TYPE").StringVar(&c.ttype)
8487
process.Arg("queue", "The Queue to consume Tasks from").Required().Envar("AJC_QUEUE").StringVar(&c.queue)
8588
process.Arg("concurrency", "How many concurrent Tasks to process").Required().Envar("AJC_CONCURRENCY").IntVar(&c.concurrency)
8689
process.Arg("command", "The command to invoke for each Task").Envar("AJC_COMMAND").ExistingFileVar(&c.command)
8790
process.Flag("remote", "Process tasks using a remote request-reply callout").BoolVar(&c.remote)
88-
process.Flag("monitor", "Runs monitoring on the given port").IntVar(&c.promPort)
89-
process.Flag("backoff", "Selects a backoff policy to apply (1m, 10m, 1h)").EnumVar(&c.retry, "1m", "10m", "1h")
91+
process.Flag("monitor", "Runs monitoring on the given port").PlaceHolder("PORT").IntVar(&c.promPort)
92+
process.Flag("backoff", fmt.Sprintf("Selects a backoff policy to apply (%s)", strings.Join(policies, ", "))).Default("default").EnumVar(&c.retry, policies...)
9093
}
9194

9295
func (c *taskCommand) retryAction(_ *kingpin.ParseContext) error {
93-
err := prepare(asyncjobs.BindWorkQueue(c.queue))
96+
err := prepare(aj.BindWorkQueue(c.queue))
9497
if err != nil {
9598
return err
9699
}
@@ -104,7 +107,7 @@ func (c *taskCommand) retryAction(_ *kingpin.ParseContext) error {
104107
}
105108

106109
func (c *taskCommand) initAction(_ *kingpin.ParseContext) error {
107-
err := prepare(asyncjobs.NoStorageInit())
110+
err := prepare(aj.NoStorageInit())
108111
if err != nil {
109112
return err
110113
}
@@ -135,9 +138,9 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error {
135138
return err
136139
}
137140

138-
target := asyncjobs.EventsSubjectWildcard
141+
target := aj.EventsSubjectWildcard
139142
if c.id != "" {
140-
target = fmt.Sprintf(asyncjobs.TaskStateChangeEventSubjectPattern, c.id)
143+
target = fmt.Sprintf(aj.TaskStateChangeEventSubjectPattern, c.id)
141144
}
142145

143146
sub, err := mgr.NatsConn().SubscribeSync(target)
@@ -151,13 +154,13 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error {
151154
return err
152155
}
153156

154-
event, kind, err := asyncjobs.ParseEventJSON(msg.Data)
157+
event, kind, err := aj.ParseEventJSON(msg.Data)
155158
if err != nil {
156159
fmt.Printf("Could not parse event: %v\n", err)
157160
}
158161

159162
switch e := event.(type) {
160-
case asyncjobs.TaskStateChangeEvent:
163+
case aj.TaskStateChangeEvent:
161164
if e.LastErr == "" {
162165
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State)
163166
} else {
@@ -170,7 +173,7 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error {
170173
}
171174
}
172175

173-
func (c *taskCommand) commandHandlerFunc(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (interface{}, error) {
176+
func (c *taskCommand) commandHandlerFunc(ctx context.Context, log aj.Logger, task *aj.Task) (interface{}, error) {
174177
tj, err := json.Marshal(task)
175178
if err != nil {
176179
return nil, err
@@ -210,25 +213,16 @@ func (c *taskCommand) processAction(_ *kingpin.ParseContext) error {
210213
return fmt.Errorf("either a command or --remote is required")
211214
}
212215

213-
retryPolicy := asyncjobs.RetryDefault
214-
switch c.retry {
215-
case "1m":
216-
retryPolicy = asyncjobs.RetryLinearOneMinute
217-
case "10m":
218-
retryPolicy = asyncjobs.RetryLinearTenMinutes
219-
case "1h":
220-
retryPolicy = asyncjobs.RetryLinearOneHour
221-
}
222-
223216
err := prepare(
224-
asyncjobs.BindWorkQueue(c.queue),
225-
asyncjobs.PrometheusListenPort(c.promPort),
226-
asyncjobs.RetryBackoffPolicy(retryPolicy))
217+
aj.BindWorkQueue(c.queue),
218+
aj.PrometheusListenPort(c.promPort),
219+
aj.RetryBackoffPolicyName(c.retry),
220+
aj.ClientConcurrency(c.concurrency))
227221
if err != nil {
228222
return err
229223
}
230224

231-
router := asyncjobs.NewTaskRouter()
225+
router := aj.NewTaskRouter()
232226
if c.remote {
233227
err = router.RequestReply(c.ttype, client)
234228
} else {
@@ -401,17 +395,17 @@ func (c *taskCommand) viewAction(_ *kingpin.ParseContext) error {
401395
}
402396

403397
func (c *taskCommand) addAction(_ *kingpin.ParseContext) error {
404-
err := prepare(asyncjobs.BindWorkQueue(c.queue))
398+
err := prepare(aj.BindWorkQueue(c.queue))
405399
if err != nil {
406400
return err
407401
}
408402

409-
var opts []asyncjobs.TaskOpt
403+
var opts []aj.TaskOpt
410404
if c.deadline > 0 {
411-
opts = append(opts, asyncjobs.TaskDeadline(time.Now().UTC().Add(c.deadline)))
405+
opts = append(opts, aj.TaskDeadline(time.Now().UTC().Add(c.deadline)))
412406
}
413407

414-
task, err := asyncjobs.NewTask(c.ttype, c.payload, opts...)
408+
task, err := aj.NewTask(c.ttype, c.payload, opts...)
415409
if err != nil {
416410
return err
417411
}

client_options.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,18 @@ func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt {
138138
}
139139
}
140140

141+
// RetryBackoffPolicyName uses the policy named to schedule job retries by using RetryPolicyLookup(name)
142+
func RetryBackoffPolicyName(name string) ClientOpt {
143+
return func(opts *ClientOpts) error {
144+
p, err := RetryPolicyLookup(name)
145+
if err != nil {
146+
return err
147+
}
148+
149+
return RetryBackoffPolicy(p)(opts)
150+
}
151+
}
152+
141153
// ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling.
142154
// This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a
143155
// queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor
@@ -195,7 +207,7 @@ func BindWorkQueue(queue string) ClientOpt {
195207
}
196208
}
197209

198-
// TaskRetention is the time tasks will be kept with.
210+
// TaskRetention is the time tasks will be kept for in the task storage
199211
//
200212
// Used only when initially creating the underlying streams.
201213
func TaskRetention(r time.Duration) ClientOpt {

errors.go

+3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ var (
6363
// ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered
6464
ErrUnknownEventType = fmt.Errorf("unknown event type")
6565

66+
// ErrUnknownRetryPolicy indicates the requested retry policy does not exist
67+
ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy")
68+
6669
// ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listerners or network error
6770
ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed")
6871
// ErrRequestReplyNoDeadline indicates a request-reply handler was called without a deadline

generators/fs/godocker/Dockerfile.templ

+2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ WORKDIR /usr/src/app
44

55
RUN go mod init "{{ .Package.Name }}" && \
66
{{- range $handler := .Package.TaskHandlers }}
7+
{{- if not $handler.RequestReply }}
78
go get "{{ $handler.Package }}@{{ $handler.Version }}" && \
9+
{{- end }}
810
{{- end }}
911
go get github.com/choria-io/asyncjobs@{{ .Package.AJVersion }}
1012

generators/fs/godocker/main.go.templ

+22-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ var usage = `This is a generated Handler service for the Choria Async Jobs Proje
2626
It hosts the following handlers:
2727
{{ range $handler := .Package.TaskHandlers }}
2828
{{- if $handler.RequestReply }}
29-
- {{ $handler.TaskType }}: {{ $handler.Package }}@{{ $handler.Version }}
30-
{{- else }}
3129
- {{ $handler.TaskType}}: Remote Request-Reply Service
30+
{{- else }}
31+
- {{ $handler.TaskType }}: {{ $handler.Package }}@{{ $handler.Version }}
3232
{{- end }}
3333
{{- end }}
3434
@@ -37,6 +37,7 @@ The following Environment variables are supported:
3737
- AJ_WORK_QUEUE: The Work Queue to consume from, defaults to DEFAULT
3838
- AJ_NATS_CONTEXT: The name of a NATS Context to use for connections
3939
- AJ_CONCURRENCY: The number of concurrent handlers that can be run
40+
- AJ_RETRY_POLICY: The retry policy to use [{{ RetryNamesList }}]
4041
4142
Prometheus statistics are Exposed on port http://0.0.0.0:8080/metrics
4243
@@ -95,15 +96,33 @@ func main() {
9596
TimestampFormat: "15:04:05",
9697
})
9798

99+
retryPolicy := os.Getenv("AJ_RETRY_POLICY")
100+
if retryPolicy == "" {
101+
retryPolicy = "{{ .Package.RetryPolicy }}"
102+
}
103+
98104
log := logrus.NewEntry(logger)
99105

100-
log.Printf("Connecting using Context %s consuming work queue %s with concurrency %d", nctx, wq, concurrency)
106+
log.Printf("Choria Async Jobs Handler Service {{.Package.Name}} build settings")
107+
log.Printf("NATS Context: %s", nctx)
108+
log.Printf("Work Queue: %s", wq)
109+
log.Printf("Concurrency: %d", concurrency)
110+
log.Printf("Retry Policy: %s", retryPolicy)
111+
{{- range $state := .Package.DiscardStates }}
112+
log.Printf("Discard State: {{$state}}")
113+
{{- end }}
114+
log.Printf("Prometheus Port: 8080")
115+
log = log.WithField("queue", wq)
101116

102117
client, err := aj.NewClient(
103118
aj.NatsContext(nctx),
104119
aj.BindWorkQueue(wq),
105120
aj.ClientConcurrency(concurrency),
106121
aj.CustomLogger(log),
122+
aj.RetryBackoffPolicyName(retryPolicy),
123+
{{- range $state := .Package.DiscardStates }}
124+
aj.DiscardTaskStates("{{$state}}"),
125+
{{- end }}
107126
aj.PrometheusListenPort(8080))
108127
usageIfError(err)
109128

generators/godocker.go

+9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"strings"
1414
"text/template"
1515
"time"
16+
17+
aj "github.com/choria-io/asyncjobs"
1618
)
1719

1820
// GoContainer builds docker containers based on the package spec
@@ -43,6 +45,10 @@ func (g *GoContainer) RenderToDirectory(target string) error {
4345
return err
4446
}
4547

48+
if g.Package.RetryPolicy == "" {
49+
g.Package.RetryPolicy = "default"
50+
}
51+
4652
for _, p := range g.Package.TaskHandlers {
4753
if p.RequestReply {
4854
continue
@@ -59,6 +65,9 @@ func (g *GoContainer) RenderToDirectory(target string) error {
5965
}
6066

6167
funcs := map[string]interface{}{
68+
"RetryNamesList": func() string {
69+
return strings.Join(aj.RetryPolicyNames(), ", ")
70+
},
6271
"TypeToPackageName": func(t string) string {
6372
remove := []string{"_", "-", ":", "/", "\\"}
6473
res := t

generators/package.go

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ type Package struct {
2222
Name string `yaml:"name"`
2323
// AJVersion is an optional version to use for the choria-io/asyncjobs dependency
2424
AJVersion string `yaml:"asyncjobs"`
25+
// RetryPolicy is the name of a retry policy, see RetryPolicyNames()
26+
RetryPolicy string `yaml:"retry"`
27+
// DiscardStates indicates what termination states to discard
28+
DiscardStates []string `yaml:"discard"`
2529
}
2630

2731
// TaskHandler is an individual Task Handler

processor_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ var _ = Describe("Processor", func() {
321321

322322
router := NewTaskRouter()
323323
router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, task *Task) (interface{}, error) {
324+
// these will panic as its in a different routine, but they are supposed to pass so thats fine
324325
t, err := client.LoadTaskByID(task.ID)
325326
Expect(err).ToNot(HaveOccurred())
326327
Expect(t.State).To(Equal(TaskStateActive))

retrypolicy.go

+42
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package asyncjobs
66

77
import (
88
"context"
9+
"fmt"
910
"math/rand"
11+
"sort"
1012
"time"
1113
)
1214

@@ -40,8 +42,48 @@ var (
4042

4143
retryLinearTenSeconds = linearPolicy(20, 0.1, 500*time.Millisecond, 10*time.Second)
4244
retryForTesting = linearPolicy(1, 0.1, time.Millisecond, 10*time.Millisecond)
45+
46+
policies = map[string]RetryPolicyProvider{
47+
"default": RetryDefault,
48+
"1m": RetryLinearOneMinute,
49+
"10m": RetryLinearTenMinutes,
50+
"1h": RetryLinearOneHour,
51+
}
4352
)
4453

54+
// RetryPolicyNames returns a list of pre-generated retry policies
55+
func RetryPolicyNames() []string {
56+
var names []string
57+
for k := range policies {
58+
names = append(names, k)
59+
}
60+
61+
sort.Strings(names)
62+
63+
return names
64+
}
65+
66+
// RetryPolicyLookup loads a policy by name
67+
func RetryPolicyLookup(name string) (RetryPolicyProvider, error) {
68+
policy, ok := policies[name]
69+
if !ok {
70+
return nil, fmt.Errorf("%w: %s", ErrUnknownRetryPolicy, name)
71+
}
72+
73+
return policy, nil
74+
}
75+
76+
// IsRetryPolicyKnown determines if the named policy exist
77+
func IsRetryPolicyKnown(name string) bool {
78+
for _, p := range RetryPolicyNames() {
79+
if p == name {
80+
return true
81+
}
82+
}
83+
84+
return false
85+
}
86+
4587
// Duration is the period to sleep for try n, it includes a jitter
4688
func (p RetryPolicy) Duration(n int) time.Duration {
4789
if n >= len(p.Intervals) {

0 commit comments

Comments
 (0)