Skip to content

Commit

Permalink
- [chg]logger redesigned: introduce LogEventErrorInterface and `Log…
Browse files Browse the repository at this point in the history
…EventTransitInterface`, and several structs.

- [chg]introduce `ErrorCollectorInterface` and its implementation `ErrorCollector`.
  • Loading branch information
vistart committed Jan 12, 2024
1 parent 4cee38a commit c5548a7
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 14 deletions.
6 changes: 3 additions & 3 deletions workflow/simple/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (d *DAG[TInput, TOutput]) BuildWorkflow(ctx context.Context) error {
}(t)
d.Log(ctx, &LogEventTransitEnd{transit: t})
if err != nil {
d.Log(ctx, &LogEventTransitReportedError{transit: t, err: err})
d.Log(ctx, NewLogEventTransitReportedError(t, err))
d.context.Cancel(err)
return
}
Expand Down Expand Up @@ -518,8 +518,8 @@ func (d *DAG[TInput, TOutput]) Execute(root context.Context, input *TInput) *TOu
results = &ra
} else {
var a = new(TOutput)
var e = ErrValueType{actual: (*r)[0], expect: *a}
d.Log(ctx, &LogEventFinalErrValueType{err: e})
var e = ErrValueTypeMismatch{actual: (*r)[0], expect: *a}
d.Log(ctx, &LogEventErrorValueTypeMismatch{err: e})
results = nil
}
}(ctx)
Expand Down
6 changes: 3 additions & 3 deletions workflow/simple/dag_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func (e *ErrChannelNameExisted) Error() string {
return fmt.Sprintf("the channel[%s] has existed.", e.name)
}

// ErrValueType defines that the data type output by the node is inconsistent with expectation.
type ErrValueType struct {
// ErrValueTypeMismatch defines that the data type output by the node is inconsistent with expectation.
type ErrValueTypeMismatch struct {
expect any
actual any
error
}

func (e ErrValueType) Error() string {
func (e ErrValueTypeMismatch) Error() string {
return fmt.Sprintf("The type of the value [%s] is inconsistent with expectation [%s].",
reflect.TypeOf(e.actual), reflect.TypeOf(e.expect))
}
Expand Down
97 changes: 91 additions & 6 deletions workflow/simple/dag_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package simple
import (
"context"
"fmt"
"sync"
"time"
)

Expand All @@ -25,34 +26,56 @@ type LogEventInterface interface {
Message() string
}

type LogEventErrorInterface interface {
Error() error
}

type LogEventTransitInterface interface {
Transit() *Transit
}

type LogEventFinalErrValueType struct {
type LogEventError struct {
LogEventErrorInterface
err error
}

func NewLogEventError(err error) *LogEventError {
return &LogEventError{err: err}
}

func (l *LogEventError) Error() error {
return l.err
}

type LogEventErrorValueTypeMismatch struct {
LogEventInterface
err ErrValueType
LogEventError
err ErrValueTypeMismatch
}

func (l *LogEventFinalErrValueType) Message() string {
func (l *LogEventErrorValueTypeMismatch) Message() string {
return l.err.Error()
}

func (l *LogEventFinalErrValueType) Level() LogLevel {
func (l *LogEventErrorValueTypeMismatch) Level() LogLevel {
return LevelError
}

func (l *LogEventFinalErrValueType) Name() string {
func (l *LogEventErrorValueTypeMismatch) Name() string {
return "final chn"
}

type LogEventTransitReportedError struct {
LogEventInterface
LogEventTransitInterface
err error
LogEventError
transit *Transit
}

func NewLogEventTransitReportedError(transit *Transit, err error) *LogEventTransitReportedError {
return &LogEventTransitReportedError{transit: transit, LogEventError: *NewLogEventError(err)}
}

func (l *LogEventTransitReportedError) Message() string {
return l.err.Error()
}
Expand Down Expand Up @@ -125,6 +148,7 @@ func (l *LogEventTransitEnd) Name() string {
type LogEventTransitCanceled struct {
LogEventInterface
LogEventTransitInterface
LogEventError
transit *Transit
}

Expand All @@ -150,6 +174,7 @@ func (l *LogEventTransitCanceled) Name() string {
type LogEventTransitWorkerPanicked struct {
LogEventInterface
LogEventTransitInterface
LogEventError
transit *Transit
err ErrWorkerPanicked
}
Expand All @@ -175,6 +200,7 @@ func (l *LogEventTransitWorkerPanicked) Name() string {
// LoggerInterface defines the logging method and the parameters required by the logger.
// For specific usage, please refer to Logger.
type LoggerInterface interface {
// Log an event.
Log(ctx context.Context, events ...LogEventInterface)

SetFlags(uint)
Expand Down Expand Up @@ -228,3 +254,62 @@ func (l *Logger) Log(ctx context.Context, events ...LogEventInterface) {
l.logEvent(ctx, event)
}
}

// ErrorCollectorInterface defines the methods that error collectors should implement.
// It is also a logger, so it also needs to implement all methods specified by the LoggerInterface.
type ErrorCollectorInterface interface {
LoggerInterface
Listen(ctx context.Context)
Get() []LogEventErrorInterface

Append(event *LogEventErrorInterface)
}

type ErrorCollector struct {
ErrorCollectorInterface
mu sync.RWMutex
errors []LogEventErrorInterface
listener chan LogEventErrorInterface
}

func NewErrorCollector() *ErrorCollector {
return &ErrorCollector{
errors: make([]LogEventErrorInterface, 0),
listener: make(chan LogEventErrorInterface),
}
}

func (l *ErrorCollector) Listen(ctx context.Context) {
var e LogEventErrorInterface
for {
select {
case <-ctx.Done():
return
case e = <-l.listener:
l.Append(&e)
default:
}
}
}

func (l *ErrorCollector) Get() []LogEventErrorInterface {
l.mu.RLock()
defer l.mu.RUnlock()
return l.errors
}

func (l *ErrorCollector) Append(event *LogEventErrorInterface) {
l.mu.Lock()
defer l.mu.Unlock()
l.errors = append(l.errors, *event)
}

func (l *ErrorCollector) Log(ctx context.Context, events ...LogEventInterface) {
for _, event := range events {
if e, ok := event.(LogEventErrorInterface); ok && event != nil {
l.listener <- e
}
}
}

func (l *ErrorCollector) SetFlags(uint) {}
77 changes: 75 additions & 2 deletions workflow/simple/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func NewDAGTwoParallelTransitsWithLogger() *DAG[string, string] {
channelInputs: []string{"t12"},
channelOutputs: []string{"t22"},
worker: func(ctx context.Context, a ...any) (any, error) {
//var e = ErrValueType{actual: new(int), expect: new(string)}
//var t = ErrValueType{actual: new(int), expect: new(string)}
//var e = ErrValueTypeMismatch{actual: new(int), expect: new(string)}
//var t = ErrValueTypeMismatch{actual: new(int), expect: new(string)}
//log.Println(errors.Is(&e, &t))
return a[0], nil
},
Expand Down Expand Up @@ -810,3 +810,76 @@ func TestCancelWorkflowWithNestedWorkflow(t *testing.T) {
time.Sleep(time.Millisecond)
})
}

func TestListenErrorReported(t *testing.T) {
logger := NewLogger()
logger.SetFlags(LDebugEnabled)
errorCollector := NewErrorCollector()
go errorCollector.Listen(context.Background())
worker1 := func(ctx context.Context, a ...any) (any, error) {
log.Println("started at", time.Now())
time.Sleep(time.Duration(a[0].(int)) * time.Second)
log.Println("ended at", time.Now())
return a[0], nil
}
worker2 := func(ctx context.Context, a ...any) (any, error) {
channelInputs1 := []string{"input"}
channelOutputs1 := []string{"t11"}
channelOutputs2 := []string{"output"}
transits := []*Transit{
NewTransit("i:input", WithInputs(channelInputs1...), WithOutputs(channelOutputs1...), WithWorker(worker1)),
NewTransit("i:output", WithInputs(channelOutputs1...), WithOutputs(channelOutputs2...), WithWorker(worker1)),
}
f1, _ := NewDAG[int, int](
WithDefaultChannels[int, int](),
WithChannels[int, int]("t11"),
WithTransits[int, int](transits...),
WithLoggers[int, int](logger))
input := 1
output := f1.Execute(ctx, &input)
if output == nil {
return nil, nil
}
return *output, nil
}
channelInputs1 := []string{"input"}
channelOutputs1 := []string{"t11"}
channelOutputs2 := []string{"t12"}
channelOutputs3 := []string{"output"}
transits := []*Transit{
NewTransit("input",
WithInputs(channelInputs1...),
WithOutputs(channelOutputs1...),
WithWorker(worker1)),
NewTransit("transit",
WithInputs(channelOutputs1...),
WithOutputs(channelOutputs2...),
WithWorker(worker2)),
NewTransit("output", WithInputs(channelOutputs2...), WithOutputs(channelOutputs3...), WithWorker(worker1)),
}
f, _ := NewDAG[int, int](
WithDefaultChannels[int, int](),
WithChannels[int, int]("t11", "t12"),
WithTransits[int, int](transits...),
WithLoggers[int, int](logger, errorCollector))
input := 1
t.Run("cancel before run", func(t *testing.T) {
f.Cancel(errors.New("cancel before run"))
})
t.Run("cancel when running", func(t *testing.T) {
ch1 := make(chan struct{})
go func() {
output := f.Execute(context.Background(), &input)
assert.Nil(t, output)
ch1 <- struct{}{}
}()
go func() {
time.Sleep(time.Millisecond * 1500)
f.Cancel(errors.New("canceled"))
}()
<-ch1
time.Sleep(time.Millisecond)
})
assert.Len(t, errorCollector.Get(), 1)
log.Println("finished")
}

0 comments on commit c5548a7

Please sign in to comment.