Skip to content

Commit

Permalink
api: Add Name method to API interface (#71)
Browse files Browse the repository at this point in the history
* api: Add Name method to API interface

* api: Add Name method to API interface

* undo change for status string value
  • Loading branch information
andrewwormald authored Jan 21, 2025
1 parent 3f5c25c commit f00578e
Show file tree
Hide file tree
Showing 22 changed files with 297 additions and 88 deletions.
6 changes: 3 additions & 3 deletions _examples/schedule/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestExampleWorkflow(t *testing.T) {
// Give time for go routine to spin up
time.Sleep(200 * time.Millisecond)

_, err := recordStore.Latest(ctx, wf.Name, foreignID)
_, err := recordStore.Latest(ctx, wf.Name(), foreignID)
// Expect there to be no entries yet
require.True(t, errors.Is(err, workflow.ErrRecordNotFound))

Expand All @@ -52,7 +52,7 @@ func TestExampleWorkflow(t *testing.T) {
// Allow scheduling to take place
time.Sleep(200 * time.Millisecond)

firstScheduled, err := recordStore.Latest(ctx, wf.Name, foreignID)
firstScheduled, err := recordStore.Latest(ctx, wf.Name(), foreignID)
require.Nil(t, err)

require.Equal(t, "schedule trigger example", firstScheduled.WorkflowName)
Expand All @@ -63,7 +63,7 @@ func TestExampleWorkflow(t *testing.T) {
// Allow scheduling to take place
time.Sleep(200 * time.Millisecond)

secondScheduled, err := recordStore.Latest(ctx, wf.Name, foreignID)
secondScheduled, err := recordStore.Latest(ctx, wf.Name(), foreignID)
require.Nil(t, err)

require.Equal(t, "schedule trigger example", secondScheduled.WorkflowName)
Expand Down
2 changes: 1 addition & 1 deletion adapters/webui/internal/frontend/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<h2 class="text-xl font-bold text-gray-800 dark:text-gray-200 mb-4">Search Filters</h2>

<div class="grid grid-cols-1 sm:grid-cols-2 lg:grid-cols-4 gap-6">
<!-- Workflow Name Input -->
<!-- Workflow name Input -->
<div class="flex flex-col">
<label class="text-gray-600 dark:text-gray-400 font-medium mb-2">Workflow Name</label>
<input type="text" id="workflowName" placeholder="Enter Workflow Name" class="border border-gray-300 dark:border-gray-700 rounded-lg w-full p-2 focus:outline-none focus:ring-2 focus:ring-blue-400 dark:bg-gray-800 dark:text-gray-300">
Expand Down
22 changes: 17 additions & 5 deletions await.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import (
"time"
)

func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Run[Type, Status], error) {
func (w *Workflow[Type, Status]) Await(
ctx context.Context,
foreignID, runID string,
status Status,
opts ...AwaitOption,
) (*Run[Type, Status], error) {
var opt awaitOpts
for _, option := range opts {
option(&opt)
Expand All @@ -18,16 +23,23 @@ func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID str
pollFrequency = opt.pollFrequency
}

role := makeRole("await", w.Name, strconv.FormatInt(int64(status), 10), foreignID)
role := makeRole("await", w.Name(), strconv.FormatInt(int64(status), 10), foreignID)
return awaitWorkflowStatusByForeignID[Type, Status](ctx, w, status, foreignID, runID, role, pollFrequency)
}

func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, Status], status Status, foreignID, runID string, role string, pollFrequency time.Duration) (*Run[Type, Status], error) {
topic := Topic(w.Name, int(status))
func awaitWorkflowStatusByForeignID[Type any, Status StatusType](
ctx context.Context,
w *Workflow[Type, Status],
status Status,
foreignID, runID string,
role string,
pollFrequency time.Duration,
) (*Run[Type, Status], error) {
topic := Topic(w.Name(), int(status))
// Terminal statuses result in the RunState changing to Completed and are stored in the RunStateChangeTopic
// as it is a key event in the Workflow Run's lifecycle.
if w.statusGraph.IsTerminal(int(status)) {
topic = RunStateChangeTopic(w.Name)
topic = RunStateChangeTopic(w.Name())
}

stream, err := w.eventStreamer.NewConsumer(
Expand Down
28 changes: 23 additions & 5 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status] {
return &Builder[Type, Status]{
workflow: &Workflow[Type, Status]{
Name: name,
name: name,
clock: clock.RealClock{},
consumers: make(map[Status]consumerConfig[Type, Status]),
callback: make(map[Status][]callback[Type, Status]),
Expand All @@ -46,7 +46,11 @@ type Builder[Type any, Status StatusType] struct {
workflow *Workflow[Type, Status]
}

func (b *Builder[Type, Status]) AddStep(from Status, c ConsumerFunc[Type, Status], allowedDestinations ...Status) *stepUpdater[Type, Status] {
func (b *Builder[Type, Status]) AddStep(
from Status,
c ConsumerFunc[Type, Status],
allowedDestinations ...Status,
) *stepUpdater[Type, Status] {
if _, exists := b.workflow.consumers[from]; exists {
panic("'AddStep(" + from.String() + ",' already exists. Only one Step can be configured to consume the status")
}
Expand Down Expand Up @@ -101,7 +105,12 @@ func (b *Builder[Type, Status]) AddCallback(from Status, fn CallbackFunc[Type, S
b.workflow.callback[from] = append(b.workflow.callback[from], c)
}

func (b *Builder[Type, Status]) AddTimeout(from Status, timer TimerFunc[Type, Status], tf TimeoutFunc[Type, Status], allowedDestinations ...Status) *timeoutUpdater[Type, Status] {
func (b *Builder[Type, Status]) AddTimeout(
from Status,
timer TimerFunc[Type, Status],
tf TimeoutFunc[Type, Status],
allowedDestinations ...Status,
) *timeoutUpdater[Type, Status] {
timeouts := b.workflow.timeouts[from]

t := timeout[Type, Status]{
Expand Down Expand Up @@ -150,7 +159,11 @@ func (s *timeoutUpdater[Type, Status]) WithOptions(opts ...Option) {
s.workflow.timeouts[s.from] = timeout
}

func (b *Builder[Type, Status]) AddConnector(name string, csc ConnectorConstructor, cf ConnectorFunc[Type, Status]) *connectorUpdater[Type, Status] {
func (b *Builder[Type, Status]) AddConnector(
name string,
csc ConnectorConstructor,
cf ConnectorFunc[Type, Status],
) *connectorUpdater[Type, Status] {
for _, config := range b.workflow.connectorConfigs {
if config.name == name {
panic("connector names need to be unique")
Expand Down Expand Up @@ -199,7 +212,12 @@ func (b *Builder[Type, Status]) OnComplete(hook RunStateChangeHookFunc[Type, Sta
b.workflow.runStateChangeHooks[RunStateCompleted] = hook
}

func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, roleScheduler RoleScheduler, opts ...BuildOption) *Workflow[Type, Status] {
func (b *Builder[Type, Status]) Build(
eventStreamer EventStreamer,
recordStore RecordStore,
roleScheduler RoleScheduler,
opts ...BuildOption,
) *Workflow[Type, Status] {
b.workflow.eventStreamer = eventStreamer
b.workflow.recordStore = recordStore
b.workflow.scheduler = roleScheduler
Expand Down
23 changes: 19 additions & 4 deletions callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,26 @@ type callback[Type any, Status StatusType] struct {

type CallbackFunc[Type any, Status StatusType] func(ctx context.Context, r *Run[Type, Status], reader io.Reader) (Status, error)

func (w *Workflow[Type, Status]) Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error {
func (w *Workflow[Type, Status]) Callback(
ctx context.Context,
foreignID string,
status Status,
payload io.Reader,
) error {
updateFn := newUpdater[Type, Status](w.recordStore.Lookup, w.recordStore.Store, w.statusGraph, w.clock)

for _, s := range w.callback[status] {
err := processCallback(ctx, w, status, s.CallbackFunc, foreignID, payload, w.recordStore.Latest, w.recordStore.Store, updateFn)
err := processCallback(
ctx,
w,
status,
s.CallbackFunc,
foreignID,
payload,
w.recordStore.Latest,
w.recordStore.Store,
updateFn,
)
if err != nil {
return err
}
Expand All @@ -38,7 +53,7 @@ func processCallback[Type any, Status StatusType](
store storeFunc,
updater updater[Type, Status],
) error {
wr, err := latest(ctx, w.Name, foreignID)
wr, err := latest(ctx, w.Name(), foreignID)
if err != nil {
return err
}
Expand Down Expand Up @@ -67,7 +82,7 @@ func processCallback[Type any, Status StatusType](
if skipUpdate(next) {
w.logger.maybeDebug(ctx, "skipping update", map[string]string{
"description": skipUpdateDescription(next),
"workflow_name": w.Name,
"workflow_name": w.Name(),
"foreign_id": run.ForeignID,
"run_id": run.RunID,
"run_state": run.RunState.String(),
Expand Down
2 changes: 1 addition & 1 deletion callback_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestProcessCallback(t *testing.T) {
ctx := context.Background()
w := &Workflow[string, testStatus]{
Name: "example",
name: "example",
ctx: ctx,
clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)),
statusGraph: graph.New(),
Expand Down
12 changes: 8 additions & 4 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ type connectorConfig[Type any, Status StatusType] struct {
lagAlert time.Duration
}

func connectorConsumer[Type any, Status StatusType](w *Workflow[Type, Status], config *connectorConfig[Type, Status], shard, totalShards int) {
func connectorConsumer[Type any, Status StatusType](
w *Workflow[Type, Status],
config *connectorConfig[Type, Status],
shard, totalShards int,
) {
role := makeRole(
config.name,
"connector",
"to",
w.Name,
w.Name(),
"consumer",
strconv.FormatInt(int64(shard), 10),
"of",
Expand Down Expand Up @@ -105,7 +109,7 @@ func connectForever[Type any, Status StatusType](
}

// Push metrics and alerting around the age of the event being processed.
pushLagMetricAndAlerting(w.Name, processName, e.CreatedAt, lagAlert, w.clock)
pushLagMetricAndAlerting(w.Name(), processName, e.CreatedAt, lagAlert, w.clock)

shouldFilter := FilterConnectorEventUsing(e,
shardConnectorEventFilter(shard, totalShards),
Expand All @@ -125,7 +129,7 @@ func connectForever[Type any, Status StatusType](
return err
}

metrics.ProcessLatency.WithLabelValues(w.Name, processName).Observe(w.clock.Since(t2).Seconds())
metrics.ProcessLatency.WithLabelValues(w.Name(), processName).Observe(w.clock.Since(t2).Seconds())
return ack()
}
}
42 changes: 30 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ type consumerConfig[Type any, Status StatusType] struct {
pauseAfterErrCount int
}

func consumer[Type any, Status StatusType](w *Workflow[Type, Status], currentStatus Status, p consumerConfig[Type, Status], shard, totalShards int) {
func consumer[Type any, Status StatusType](
w *Workflow[Type, Status],
currentStatus Status,
p consumerConfig[Type, Status],
shard, totalShards int,
) {
role := makeRole(
w.Name,
w.Name(),
strconv.FormatInt(int64(currentStatus), 10),
"consumer",
strconv.FormatInt(int64(shard), 10),
Expand All @@ -47,7 +52,7 @@ func consumer[Type any, Status StatusType](w *Workflow[Type, Status], currentSta
strconv.FormatInt(int64(totalShards), 10),
)

topic := Topic(w.Name, int(currentStatus))
topic := Topic(w.Name(), int(currentStatus))

errBackOff := w.defaultOpts.errBackOff
if p.errBackOff > 0 {
Expand Down Expand Up @@ -86,7 +91,19 @@ func consumer[Type any, Status StatusType](w *Workflow[Type, Status], currentSta
}
defer streamConsumer.Close()

return consumeForever[Type, Status](ctx, w, p.consumer, lag, lagAlert, pauseAfterErrCount, streamConsumer, currentStatus, processName, shard, totalShards)
return consumeForever[Type, Status](
ctx,
w,
p.consumer,
lag,
lagAlert,
pauseAfterErrCount,
streamConsumer,
currentStatus,
processName,
shard,
totalShards,
)
}, errBackOff)
}

Expand Down Expand Up @@ -127,7 +144,7 @@ func consumeForever[Type any, Status StatusType](
}

// Push metrics and alerting around the age of the event being processed.
pushLagMetricAndAlerting(w.Name, processName, e.CreatedAt, lagAlert, w.clock)
pushLagMetricAndAlerting(w.Name(), processName, e.CreatedAt, lagAlert, w.clock)

shouldFilter := FilterUsing(e,
shardFilter(shard, totalShards),
Expand All @@ -138,7 +155,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "filtered out").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "filtered out").Inc()
continue
}

Expand All @@ -149,7 +166,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "record not found").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "record not found").Inc()
continue
} else if err != nil {
return err
Expand All @@ -163,7 +180,8 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "record status not in expected state").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "record status not in expected state").
Inc()
continue
}

Expand All @@ -183,7 +201,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "record stopped").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "record stopped").Inc()
continue
}

Expand All @@ -193,7 +211,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessLatency.WithLabelValues(w.Name, processName).Observe(w.clock.Since(t2).Seconds())
metrics.ProcessLatency.WithLabelValues(w.Name(), processName).Observe(w.clock.Since(t2).Seconds())
}
}

Expand Down Expand Up @@ -254,14 +272,14 @@ func consume[Type any, Status StatusType](
if skipUpdate(next) {
w.logger.maybeDebug(ctx, "skipping update", map[string]string{
"description": skipUpdateDescription(next),
"workflow_name": w.Name,
"workflow_name": w.Name(),
"foreign_id": run.ForeignID,
"run_id": run.RunID,
"run_state": run.RunState.String(),
"record_status": run.Status.String(),
})

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "next value specified skip").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "next value specified skip").Inc()
return ack()
}

Expand Down
2 changes: 1 addition & 1 deletion consumer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestConsume(t *testing.T) {
processName := "processName"
testErr := errors.New("test error")
w := &Workflow[string, testStatus]{
Name: "example",
name: "example",
ctx: ctx,
clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)),
errorCounter: counter,
Expand Down
16 changes: 13 additions & 3 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

func deleteConsumer[Type any, Status StatusType](w *Workflow[Type, Status]) {
role := makeRole(
w.Name,
w.Name(),
"delete",
"consumer",
)

processName := role
w.run(role, processName, func(ctx context.Context) error {
topic := DeleteTopic(w.Name)
topic := DeleteTopic(w.Name())
consumerStream, err := w.eventStreamer.NewConsumer(
ctx,
topic,
Expand All @@ -31,7 +31,17 @@ func deleteConsumer[Type any, Status StatusType](w *Workflow[Type, Status]) {
}
defer consumerStream.Close()

return DeleteForever(ctx, w.Name, processName, consumerStream, w.recordStore.Store, w.recordStore.Lookup, w.customDelete, w.defaultOpts.lagAlert, w.clock)
return DeleteForever(
ctx,
w.Name(),
processName,
consumerStream,
w.recordStore.Store,
w.recordStore.Lookup,
w.customDelete,
w.defaultOpts.lagAlert,
w.clock,
)
}, w.defaultOpts.errBackOff)
}

Expand Down
Loading

0 comments on commit f00578e

Please sign in to comment.