Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[simple]try to solve #13. #14

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions workflow/simple/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ type Workflow[TInput, TOutput any] struct {
transits *Transits
muLoggers sync.RWMutex
loggers *Loggers
ctx context.Context
close context.CancelFunc
WorkflowInterface[TInput, TOutput]
}

Expand Down Expand Up @@ -475,10 +477,7 @@ func (d *Workflow[TInput, TOutput]) BuildWorkflow(ctx context.Context) error {
// After this method is executed, all input, output channels and transits will be deleted.
// Note, please do not call this method during workflow execution, otherwise it will lead to unpredictable consequences.
func (d *Workflow[TInput, TOutput]) CloseWorkflow() {
d.muChannels.Lock()
defer d.muChannels.Unlock()
d.channels.close()
d.channels = nil
d.close()
}

// Execute the workflow.
Expand All @@ -492,6 +491,11 @@ func (d *Workflow[TInput, TOutput]) CloseWorkflow() {
func (d *Workflow[TInput, TOutput]) Execute(root context.Context, input *TInput) *TOutput {
// The sub-context is introduced and has a cancellation handler, making it easy to terminate the entire process
// at any time.
select {
case <-d.ctx.Done():
return nil
default:
}
ctx, cancel := context.WithCancelCause(root)

// Record the context and cancellation handler, so they can be called at the appropriate time.
Expand Down
8 changes: 7 additions & 1 deletion workflow/simple/dag_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@

package simple

import "context"

// Option defines the option used to instantiate a Workflow.
// If an error occurs during instantiation, it needs to be reported. If no errors occurred, `nil` is returned.
type Option[TInput, TOutput any] func(d *Workflow[TInput, TOutput]) error

// NewWorkflow instantiates a workflow.
func NewWorkflow[TInput, TOutput any](options ...Option[TInput, TOutput]) (*Workflow[TInput, TOutput], error) {
dag := &Workflow[TInput, TOutput]{}
ctx, cancel := context.WithCancel(context.Background())
dag := &Workflow[TInput, TOutput]{
ctx: ctx,
close: cancel,
}
for _, option := range options {
err := option(dag)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions workflow/simple/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestSimpleWorkflowValueTypeError_Error(t *testing.T) {
input := "input"
assert.Nil(t, f.Execute(context.Background(), &input))
assert.Nil(t, f.RunOnce(context.Background(), &input))
assert.Nil(t, f.Execute(context.Background(), &input))
}

// NewWorkflowTwoParallelTransitsWithLogger defines a workflow with logger.
Expand Down
Loading