Skip to content

Commit

Permalink
- [chg]instantiating a workflow does not require first specifying all…
Browse files Browse the repository at this point in the history
… channels involved in the transit.
  • Loading branch information
vistart committed Jan 27, 2024
1 parent 498e525 commit a5d0f7f
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 6 deletions.
34 changes: 29 additions & 5 deletions workflow/simple/dag_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package simple

// Option defines the option used to instantiate a Workflow.
// If an error occurs during instantiation, it needs to be reported. If no errors occurred, `nil` is returned.
type Option[TInput, TOutput any] func(d *Workflow[TInput, TOutput]) error

// NewWorkflow instantiates a workflow.
Expand All @@ -24,6 +25,9 @@ func NewWorkflow[TInput, TOutput any](options ...Option[TInput, TOutput]) (*Work
// Note that names cannot be repeated, and repeated names are counted once.
// All specified names must be used in the transit and may appear only once on the input and once on the output.
// At least two channel names must be specified, for input and output.
//
// This method is not required. Because WithTransits() will automatically add the channels mentioned by each transit
// to the channel list.
func WithChannels[TInput, TOutput any](names ...string) Option[TInput, TOutput] {
return func(d *Workflow[TInput, TOutput]) error {
if d.channels == nil {
Expand All @@ -41,9 +45,6 @@ func WithChannels[TInput, TOutput any](names ...string) Option[TInput, TOutput]
// The channel with the same name will be reinitialized.
func WithChannelInput[TInput, TOutput any](name string) Option[TInput, TOutput] {
return func(d *Workflow[TInput, TOutput]) error {
//if !d.channels.exists(name) {
// return &ErrChannelNotExist{name: name}
//}
if d.channels == nil {
d.channels = NewWorkflowChannels()
}
Expand Down Expand Up @@ -88,8 +89,15 @@ func WithDefaultChannels[TInput, TOutput any]() Option[TInput, TOutput] {

// WithTransits specifies specific nodes for the entire workflow.
//
// You can just call this method without calling WithChannels() to specify the input channel name.
// This method will automatically add unregistered channel names to the channel list.
//
// This method can be executed multiple times. Those executed later will be merged with those executed earlier.
// The channel with the same name will be reinitialized.
//
// You need to ensure that the input channel name list involved in each transit does not intersect with
// the input channel name list of other transits, otherwise unpredictable consequences will occur during execution.
//
// So as the output channel list of every transit.
func WithTransits[TInput, TOutput any](transits ...TransitInterface) Option[TInput, TOutput] {
return func(d *Workflow[TInput, TOutput]) error {
lenTransits := len(transits)
Expand All @@ -99,7 +107,23 @@ func WithTransits[TInput, TOutput any](transits ...TransitInterface) Option[TInp
if d.transits == nil {
d.transits = &Transits{transits: make([]TransitInterface, 0)}
}
d.transits.transits = append(d.transits.transits, transits...)
for _, t := range transits {
for _, c := range t.GetChannelInputs() {
if !d.channels.exists(c) {
if err := d.channels.add(c); err != nil {
return err
}
}
}
for _, c := range t.GetChannelOutputs() {
if !d.channels.exists(c) {
if err := d.channels.add(c); err != nil {
return err
}
}
}
d.transits.transits = append(d.transits.transits, t)
}
return nil
}
}
Expand Down
116 changes: 115 additions & 1 deletion workflow/simple/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestSimpleWorkflowChannel_Exists(t *testing.T) {
func TestSimpleWorkflowValueTypeError_Error(t *testing.T) {
logger := NewLogger()
logger.SetFlags(LDebugEnabled)
f, _ := NewWorkflow[string, string](
f, err := NewWorkflow[string, string](
WithDefaultChannels[string, string](),
WithChannels[string, string]("t11"),
WithTransits[string, string](), // Testing with empty input will have no effect.
Expand All @@ -54,6 +54,7 @@ func TestSimpleWorkflowValueTypeError_Error(t *testing.T) {
},
}),
WithLoggers[string, string](logger))
assert.Nil(t, err)
input := "input"
assert.Nil(t, f.Execute(context.Background(), &input))
assert.Nil(t, f.RunOnce(context.Background(), &input))
Expand Down Expand Up @@ -1205,3 +1206,116 @@ func TestTransitAllowFailure(t *testing.T) {
})
log.Println("finished")
}

func TestNewWorkflowNotWithChannels(t *testing.T) {
logger := NewLogger()
logger.SetFlags(LDebugEnabled)
now := time.Now()
errorCollector := NewErrorCollector()
go errorCollector.Listen(context.Background())
worker1 := func(ctx context.Context, a ...any) (any, error) {
log.Println("started at", time.Now())
s := 1
if a[0] != nil {
s = a[0].(int)
}
time.Sleep(time.Duration(s) * time.Second)
log.Println("ended at", time.Now())
since := time.Since(now)
if since.Milliseconds() > 4000 {
panic(fmt.Errorf("panicked at %v", time.Since(now)))
}
return s, fmt.Errorf("error occurred at %v", time.Since(now))
}
worker2 := func(ctx context.Context, a ...any) (any, error) {
channelInputs1 := []string{"i:input"}
channelOutputs1 := []string{"i:t11"}
channelOutputs2 := []string{"i:output"}
transits := []TransitInterface{
NewTransit("i:input", WithInputs(channelInputs1...), WithOutputs(channelOutputs1...), WithWorker(worker1),
WithAllowFailure(true)),
NewTransit("i:output", WithInputs(channelOutputs1...), WithOutputs(channelOutputs2...), WithWorker(worker1),
WithAllowFailure(true)),
}
f1, _ := NewWorkflow[int, int](
WithChannelInput[int, int]("i:input"),
WithChannelOutput[int, int]("i:output"),
WithTransits[int, int](transits...),
WithLoggers[int, int](logger, errorCollector))
input := 1
if a[0] != nil {
input = a[0].(int)
}
output := f1.Execute(ctx, &input)
if output == nil {
return nil, nil
}
return *output, nil
}
channelInputs1 := []string{"input"}
channelOutputs1 := []string{"t11"}
channelOutputs2 := []string{"t12"}
channelOutputs3 := []string{"output"}
transits := []TransitInterface{
NewTransit("input",
WithInputs(channelInputs1...),
WithOutputs(channelOutputs1...),
WithWorker(worker1), WithAllowFailure(true)),
NewTransit("transit",
WithInputs(channelOutputs1...),
WithOutputs(channelOutputs2...),
WithWorker(worker2), WithAllowFailure(true)),
NewTransit("output", WithInputs(channelOutputs2...), WithOutputs(channelOutputs3...), WithWorker(worker1),
WithAllowFailure(true)),
}
f, _ := NewWorkflow[int, int](
WithDefaultChannels[int, int](),
WithTransits[int, int](transits...),
WithLoggers[int, int](logger, errorCollector))
input := 1
t.Run("cancel before run", func(t *testing.T) {
f.Cancel(errors.New("cancel before run"))
})
t.Run("cancel when running", func(t *testing.T) {
ch1 := make(chan struct{})
var output = new(int)
go func() {
output = f.Execute(context.Background(), &input)
assert.Nil(t, output)
ch1 <- struct{}{}
}()
<-ch1
time.Sleep(time.Millisecond)

errors1 := errorCollector.Get()
assert.Len(t, errors1, 5)
})
log.Println("finished")
}

func TestNewWorkflowReportedError(t *testing.T) {
t.Run("empty parameters without error(s)", func(t *testing.T) {
f, err := NewWorkflow[int, int]()
assert.NotNil(t, f)
assert.IsType(t, &Workflow[int, int]{}, f)
assert.Nil(t, err)
})
t.Run("with error", func(t *testing.T) {
f, err := NewWorkflow[int, int](func(d *Workflow[int, int]) error {
return errors.New("initializes a workflow with error")
})
assert.Nil(t, f)
assert.Errorf(t, err, "initializes a workflow with error")
})
t.Run("with channel output", func(t *testing.T) {
f, err := NewWorkflow[int, int](WithChannelOutput[int, int]("output"))
assert.NotNil(t, f)
assert.IsType(t, &Workflow[int, int]{}, f)
assert.Nil(t, err)
})
t.Run("with default channels after channel input specified", func(t *testing.T) {
f, err := NewWorkflow[int, int](WithChannels[int, int]("output"), WithDefaultChannels[int, int]())
assert.Nil(t, f)
assert.Errorf(t, err, "the channel[input] has existed")
})
}

0 comments on commit a5d0f7f

Please sign in to comment.