Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: Add Name method to API interface #71

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions _examples/schedule/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestExampleWorkflow(t *testing.T) {
// Give time for go routine to spin up
time.Sleep(200 * time.Millisecond)

_, err := recordStore.Latest(ctx, wf.Name, foreignID)
_, err := recordStore.Latest(ctx, wf.Name(), foreignID)
// Expect there to be no entries yet
require.True(t, errors.Is(err, workflow.ErrRecordNotFound))

Expand All @@ -52,7 +52,7 @@ func TestExampleWorkflow(t *testing.T) {
// Allow scheduling to take place
time.Sleep(200 * time.Millisecond)

firstScheduled, err := recordStore.Latest(ctx, wf.Name, foreignID)
firstScheduled, err := recordStore.Latest(ctx, wf.Name(), foreignID)
require.Nil(t, err)

require.Equal(t, "schedule trigger example", firstScheduled.WorkflowName)
Expand All @@ -63,7 +63,7 @@ func TestExampleWorkflow(t *testing.T) {
// Allow scheduling to take place
time.Sleep(200 * time.Millisecond)

secondScheduled, err := recordStore.Latest(ctx, wf.Name, foreignID)
secondScheduled, err := recordStore.Latest(ctx, wf.Name(), foreignID)
require.Nil(t, err)

require.Equal(t, "schedule trigger example", secondScheduled.WorkflowName)
Expand Down
2 changes: 1 addition & 1 deletion adapters/webui/internal/frontend/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<h2 class="text-xl font-bold text-gray-800 dark:text-gray-200 mb-4">Search Filters</h2>

<div class="grid grid-cols-1 sm:grid-cols-2 lg:grid-cols-4 gap-6">
<!-- Workflow Name Input -->
<!-- Workflow name Input -->
<div class="flex flex-col">
<label class="text-gray-600 dark:text-gray-400 font-medium mb-2">Workflow Name</label>
<input type="text" id="workflowName" placeholder="Enter Workflow Name" class="border border-gray-300 dark:border-gray-700 rounded-lg w-full p-2 focus:outline-none focus:ring-2 focus:ring-blue-400 dark:bg-gray-800 dark:text-gray-300">
Expand Down
22 changes: 17 additions & 5 deletions await.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import (
"time"
)

func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Run[Type, Status], error) {
func (w *Workflow[Type, Status]) Await(
ctx context.Context,
foreignID, runID string,
status Status,
opts ...AwaitOption,
) (*Run[Type, Status], error) {
var opt awaitOpts
for _, option := range opts {
option(&opt)
Expand All @@ -18,16 +23,23 @@ func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID str
pollFrequency = opt.pollFrequency
}

role := makeRole("await", w.Name, strconv.FormatInt(int64(status), 10), foreignID)
role := makeRole("await", w.Name(), strconv.FormatInt(int64(status), 10), foreignID)
return awaitWorkflowStatusByForeignID[Type, Status](ctx, w, status, foreignID, runID, role, pollFrequency)
}

func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, Status], status Status, foreignID, runID string, role string, pollFrequency time.Duration) (*Run[Type, Status], error) {
topic := Topic(w.Name, int(status))
func awaitWorkflowStatusByForeignID[Type any, Status StatusType](
ctx context.Context,
w *Workflow[Type, Status],
status Status,
foreignID, runID string,
role string,
pollFrequency time.Duration,
) (*Run[Type, Status], error) {
topic := Topic(w.Name(), int(status))
// Terminal statuses result in the RunState changing to Completed and are stored in the RunStateChangeTopic
// as it is a key event in the Workflow Run's lifecycle.
if w.statusGraph.IsTerminal(int(status)) {
topic = RunStateChangeTopic(w.Name)
topic = RunStateChangeTopic(w.Name())
}

stream, err := w.eventStreamer.NewConsumer(
Expand Down
28 changes: 23 additions & 5 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status] {
return &Builder[Type, Status]{
workflow: &Workflow[Type, Status]{
Name: name,
name: name,
clock: clock.RealClock{},
consumers: make(map[Status]consumerConfig[Type, Status]),
callback: make(map[Status][]callback[Type, Status]),
Expand All @@ -46,7 +46,11 @@ type Builder[Type any, Status StatusType] struct {
workflow *Workflow[Type, Status]
}

func (b *Builder[Type, Status]) AddStep(from Status, c ConsumerFunc[Type, Status], allowedDestinations ...Status) *stepUpdater[Type, Status] {
func (b *Builder[Type, Status]) AddStep(
from Status,
c ConsumerFunc[Type, Status],
allowedDestinations ...Status,
) *stepUpdater[Type, Status] {
if _, exists := b.workflow.consumers[from]; exists {
panic("'AddStep(" + from.String() + ",' already exists. Only one Step can be configured to consume the status")
}
Expand Down Expand Up @@ -101,7 +105,12 @@ func (b *Builder[Type, Status]) AddCallback(from Status, fn CallbackFunc[Type, S
b.workflow.callback[from] = append(b.workflow.callback[from], c)
}

func (b *Builder[Type, Status]) AddTimeout(from Status, timer TimerFunc[Type, Status], tf TimeoutFunc[Type, Status], allowedDestinations ...Status) *timeoutUpdater[Type, Status] {
func (b *Builder[Type, Status]) AddTimeout(
from Status,
timer TimerFunc[Type, Status],
tf TimeoutFunc[Type, Status],
allowedDestinations ...Status,
) *timeoutUpdater[Type, Status] {
timeouts := b.workflow.timeouts[from]

t := timeout[Type, Status]{
Expand Down Expand Up @@ -150,7 +159,11 @@ func (s *timeoutUpdater[Type, Status]) WithOptions(opts ...Option) {
s.workflow.timeouts[s.from] = timeout
}

func (b *Builder[Type, Status]) AddConnector(name string, csc ConnectorConstructor, cf ConnectorFunc[Type, Status]) *connectorUpdater[Type, Status] {
func (b *Builder[Type, Status]) AddConnector(
name string,
csc ConnectorConstructor,
cf ConnectorFunc[Type, Status],
) *connectorUpdater[Type, Status] {
for _, config := range b.workflow.connectorConfigs {
if config.name == name {
panic("connector names need to be unique")
Expand Down Expand Up @@ -199,7 +212,12 @@ func (b *Builder[Type, Status]) OnComplete(hook RunStateChangeHookFunc[Type, Sta
b.workflow.runStateChangeHooks[RunStateCompleted] = hook
}

func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, roleScheduler RoleScheduler, opts ...BuildOption) *Workflow[Type, Status] {
func (b *Builder[Type, Status]) Build(
eventStreamer EventStreamer,
recordStore RecordStore,
roleScheduler RoleScheduler,
opts ...BuildOption,
) *Workflow[Type, Status] {
b.workflow.eventStreamer = eventStreamer
b.workflow.recordStore = recordStore
b.workflow.scheduler = roleScheduler
Expand Down
23 changes: 19 additions & 4 deletions callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,26 @@ type callback[Type any, Status StatusType] struct {

type CallbackFunc[Type any, Status StatusType] func(ctx context.Context, r *Run[Type, Status], reader io.Reader) (Status, error)

func (w *Workflow[Type, Status]) Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error {
func (w *Workflow[Type, Status]) Callback(
ctx context.Context,
foreignID string,
status Status,
payload io.Reader,
) error {
updateFn := newUpdater[Type, Status](w.recordStore.Lookup, w.recordStore.Store, w.statusGraph, w.clock)

for _, s := range w.callback[status] {
err := processCallback(ctx, w, status, s.CallbackFunc, foreignID, payload, w.recordStore.Latest, w.recordStore.Store, updateFn)
err := processCallback(
ctx,
w,
status,
s.CallbackFunc,
foreignID,
payload,
w.recordStore.Latest,
w.recordStore.Store,
updateFn,
)
if err != nil {
return err
}
Expand All @@ -38,7 +53,7 @@ func processCallback[Type any, Status StatusType](
store storeFunc,
updater updater[Type, Status],
) error {
wr, err := latest(ctx, w.Name, foreignID)
wr, err := latest(ctx, w.Name(), foreignID)
if err != nil {
return err
}
Expand Down Expand Up @@ -67,7 +82,7 @@ func processCallback[Type any, Status StatusType](
if skipUpdate(next) {
w.logger.maybeDebug(ctx, "skipping update", map[string]string{
"description": skipUpdateDescription(next),
"workflow_name": w.Name,
"workflow_name": w.Name(),
"foreign_id": run.ForeignID,
"run_id": run.RunID,
"run_state": run.RunState.String(),
Expand Down
2 changes: 1 addition & 1 deletion callback_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestProcessCallback(t *testing.T) {
ctx := context.Background()
w := &Workflow[string, testStatus]{
Name: "example",
name: "example",
ctx: ctx,
clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)),
statusGraph: graph.New(),
Expand Down
12 changes: 8 additions & 4 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ type connectorConfig[Type any, Status StatusType] struct {
lagAlert time.Duration
}

func connectorConsumer[Type any, Status StatusType](w *Workflow[Type, Status], config *connectorConfig[Type, Status], shard, totalShards int) {
func connectorConsumer[Type any, Status StatusType](
w *Workflow[Type, Status],
config *connectorConfig[Type, Status],
shard, totalShards int,
) {
role := makeRole(
config.name,
"connector",
"to",
w.Name,
w.Name(),
"consumer",
strconv.FormatInt(int64(shard), 10),
"of",
Expand Down Expand Up @@ -105,7 +109,7 @@ func connectForever[Type any, Status StatusType](
}

// Push metrics and alerting around the age of the event being processed.
pushLagMetricAndAlerting(w.Name, processName, e.CreatedAt, lagAlert, w.clock)
pushLagMetricAndAlerting(w.Name(), processName, e.CreatedAt, lagAlert, w.clock)

shouldFilter := FilterConnectorEventUsing(e,
shardConnectorEventFilter(shard, totalShards),
Expand All @@ -125,7 +129,7 @@ func connectForever[Type any, Status StatusType](
return err
}

metrics.ProcessLatency.WithLabelValues(w.Name, processName).Observe(w.clock.Since(t2).Seconds())
metrics.ProcessLatency.WithLabelValues(w.Name(), processName).Observe(w.clock.Since(t2).Seconds())
return ack()
}
}
42 changes: 30 additions & 12 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ type consumerConfig[Type any, Status StatusType] struct {
pauseAfterErrCount int
}

func consumer[Type any, Status StatusType](w *Workflow[Type, Status], currentStatus Status, p consumerConfig[Type, Status], shard, totalShards int) {
func consumer[Type any, Status StatusType](
w *Workflow[Type, Status],
currentStatus Status,
p consumerConfig[Type, Status],
shard, totalShards int,
) {
role := makeRole(
w.Name,
w.Name(),
strconv.FormatInt(int64(currentStatus), 10),
"consumer",
strconv.FormatInt(int64(shard), 10),
Expand All @@ -47,7 +52,7 @@ func consumer[Type any, Status StatusType](w *Workflow[Type, Status], currentSta
strconv.FormatInt(int64(totalShards), 10),
)

topic := Topic(w.Name, int(currentStatus))
topic := Topic(w.Name(), int(currentStatus))

errBackOff := w.defaultOpts.errBackOff
if p.errBackOff > 0 {
Expand Down Expand Up @@ -86,7 +91,19 @@ func consumer[Type any, Status StatusType](w *Workflow[Type, Status], currentSta
}
defer streamConsumer.Close()

return consumeForever[Type, Status](ctx, w, p.consumer, lag, lagAlert, pauseAfterErrCount, streamConsumer, currentStatus, processName, shard, totalShards)
return consumeForever[Type, Status](
ctx,
w,
p.consumer,
lag,
lagAlert,
pauseAfterErrCount,
streamConsumer,
currentStatus,
processName,
shard,
totalShards,
)
}, errBackOff)
}

Expand Down Expand Up @@ -127,7 +144,7 @@ func consumeForever[Type any, Status StatusType](
}

// Push metrics and alerting around the age of the event being processed.
pushLagMetricAndAlerting(w.Name, processName, e.CreatedAt, lagAlert, w.clock)
pushLagMetricAndAlerting(w.Name(), processName, e.CreatedAt, lagAlert, w.clock)

shouldFilter := FilterUsing(e,
shardFilter(shard, totalShards),
Expand All @@ -138,7 +155,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "filtered out").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "filtered out").Inc()
continue
}

Expand All @@ -149,7 +166,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "record not found").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "record not found").Inc()
continue
} else if err != nil {
return err
Expand All @@ -163,7 +180,8 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "record status not in expected state").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "record status not in expected state").
Inc()
continue
}

Expand All @@ -183,7 +201,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "record stopped").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "record stopped").Inc()
continue
}

Expand All @@ -193,7 +211,7 @@ func consumeForever[Type any, Status StatusType](
return err
}

metrics.ProcessLatency.WithLabelValues(w.Name, processName).Observe(w.clock.Since(t2).Seconds())
metrics.ProcessLatency.WithLabelValues(w.Name(), processName).Observe(w.clock.Since(t2).Seconds())
}
}

Expand Down Expand Up @@ -254,14 +272,14 @@ func consume[Type any, Status StatusType](
if skipUpdate(next) {
w.logger.maybeDebug(ctx, "skipping update", map[string]string{
"description": skipUpdateDescription(next),
"workflow_name": w.Name,
"workflow_name": w.Name(),
"foreign_id": run.ForeignID,
"run_id": run.RunID,
"run_state": run.RunState.String(),
"record_status": run.Status.String(),
})

metrics.ProcessSkippedEvents.WithLabelValues(w.Name, processName, "next value specified skip").Inc()
metrics.ProcessSkippedEvents.WithLabelValues(w.Name(), processName, "next value specified skip").Inc()
return ack()
}

Expand Down
2 changes: 1 addition & 1 deletion consumer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestConsume(t *testing.T) {
processName := "processName"
testErr := errors.New("test error")
w := &Workflow[string, testStatus]{
Name: "example",
name: "example",
ctx: ctx,
clock: clock_testing.NewFakeClock(time.Date(2024, time.April, 19, 0, 0, 0, 0, time.UTC)),
errorCounter: counter,
Expand Down
16 changes: 13 additions & 3 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

func deleteConsumer[Type any, Status StatusType](w *Workflow[Type, Status]) {
role := makeRole(
w.Name,
w.Name(),
"delete",
"consumer",
)

processName := role
w.run(role, processName, func(ctx context.Context) error {
topic := DeleteTopic(w.Name)
topic := DeleteTopic(w.Name())
consumerStream, err := w.eventStreamer.NewConsumer(
ctx,
topic,
Expand All @@ -31,7 +31,17 @@ func deleteConsumer[Type any, Status StatusType](w *Workflow[Type, Status]) {
}
defer consumerStream.Close()

return DeleteForever(ctx, w.Name, processName, consumerStream, w.recordStore.Store, w.recordStore.Lookup, w.customDelete, w.defaultOpts.lagAlert, w.clock)
return DeleteForever(
ctx,
w.Name(),
processName,
consumerStream,
w.recordStore.Store,
w.recordStore.Lookup,
w.customDelete,
w.defaultOpts.lagAlert,
w.clock,
)
}, w.defaultOpts.errBackOff)
}

Expand Down
Loading
Loading