Skip to content

Commit

Permalink
- [chg]logger redesigned: introduce LogEventInterface and several p…
Browse files Browse the repository at this point in the history
…redefined events.
  • Loading branch information
vistart committed Jan 12, 2024
1 parent 968d46a commit 3f5373b
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 89 deletions.
76 changes: 50 additions & 26 deletions workflow/simple/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,27 @@ func (d *Context) Cancel(cause error) {
}

type Transits struct {
muTransits sync.RWMutex
workflowTransits []*Transit
muTransits sync.RWMutex
transits []*Transit
}

type Loggers struct {
muLoggers sync.RWMutex
loggers []LoggerInterface
}

func (d *Loggers) Log(ctx context.Context, events ...LogEventInterface) {
d.muLoggers.RLock()
defer d.muLoggers.RUnlock()
if d.loggers == nil || len(d.loggers) == 0 {
return
}
for _, logger := range d.loggers {
if logger == nil {
continue
}
logger.Log(ctx, events...)
}
}

// DAG defines a generic directed acyclic graph of proposals.
Expand All @@ -233,10 +252,23 @@ type DAG[TInput, TOutput any] struct {
context *Context
muTransits sync.RWMutex
transits *Transits
logger LoggerInterface
muLoggers sync.RWMutex
loggers *Loggers
DAGInterface[TInput, TOutput]
}

func (d *DAG[TInput, TOutput]) Log(ctx context.Context, events ...LogEventInterface) {
if events == nil || len(events) == 0 {
return
}
d.muLoggers.RLock()
defer d.muLoggers.RUnlock()
if d.loggers == nil {
return
}
d.loggers.Log(ctx, events...)
}

// AttachChannels attaches the channels to the workflow.
// The parameter `channels` is the name list of the channels to be initialized for the first time, and the order of
// each element is irrelevant. If you don't pass any parameter, it will have no effect.
Expand All @@ -256,7 +288,7 @@ func (d *DAG[TInput, TOutput]) AttachWorkflowTransit(transits ...*Transit) {
if d.transits == nil {
d.transits = &Transits{}
}
d.transits.workflowTransits = append(d.transits.workflowTransits, transits...)
d.transits.transits = append(d.transits.transits, transits...)
}

// BuildWorkflowInput feeds the result to each input channel in turn.
Expand Down Expand Up @@ -375,10 +407,10 @@ func (d *DAG[TInput, TOutput]) BuildWorkflow(ctx context.Context) error {
d.transits.muTransits.RLock()
defer d.transits.muTransits.RUnlock()
// Checks the Transits
if len(d.transits.workflowTransits) == 0 {
if len(d.transits.transits) == 0 {
return nil
}
for _, t := range d.transits.workflowTransits {
for _, t := range d.transits.transits {
for _, name := range t.channelInputs {
if _, existed := d.channels.channels[name]; !existed {
return &ErrChannelNotExist{name: name}
Expand All @@ -391,17 +423,15 @@ func (d *DAG[TInput, TOutput]) BuildWorkflow(ctx context.Context) error {
}
}

for _, t := range d.transits.workflowTransits {
for _, t := range d.transits.transits {
go func(ctx context.Context, t *Transit) {
// Waiting for the results of input channels to be ready.
// If there is a channel with no output, the current coroutine will be blocked here.
// If done notification is received, return immediately and no longer wait for the channel.
results := d.BuildWorkflowOutput(ctx, t.channelInputs...)
select {
case <-ctx.Done(): // If the cancellation notification has been received, it will exit directly.
if d.logger != nil {
d.logger.Trace(ctx, LevelWarning, t, "cancellation notified.")
}
d.Log(ctx, &LogEventTransitCanceled{transit: t})
return
default:
//log.Println("build workflow:", t.channelInputs, " selected.")
Expand All @@ -412,28 +442,20 @@ func (d *DAG[TInput, TOutput]) BuildWorkflow(ctx context.Context) error {
var work = func(t *Transit) (any, error) {
return t.worker(workerCtx, *results...)
}
if d.logger != nil {
d.logger.Trace(ctx, LevelDebug, t, "is starting...")
}
d.Log(ctx, &LogEventTransitStart{transit: t})
var result, err = func(t *Transit) (any, error) {
defer func() {
if err := recover(); err != nil {
e := ErrWorkerPanicked{}
if d.logger != nil {
d.logger.Trace(ctx, LevelError, t, e.Error(), err)
}
e := ErrWorkerPanicked{panic: err}
d.Log(ctx, &LogEventTransitWorkerPanicked{transit: t, err: e})
d.context.Cancel(&e)
}
}()
return work(t)
}(t)
if d.logger != nil {
d.logger.Trace(ctx, LevelDebug, t, "ended.")
}
d.Log(ctx, &LogEventTransitEnd{transit: t})
if err != nil {
if d.logger != nil {
d.logger.Trace(ctx, LevelWarning, t, "worker error(s) reported.", err)
}
d.Log(ctx, &LogEventTransitReportedError{transit: t, err: err})
d.context.Cancel(err)
return
}
Expand All @@ -451,6 +473,10 @@ func (d *DAG[TInput, TOutput]) CloseWorkflow() {
d.channels = nil
}

const (
DAGContextLogger = "logger"
)

// Execute the workflow.
//
// root is the highest-level context.Context for this execution. Each transit worker will receive its child context.
Expand Down Expand Up @@ -490,9 +516,7 @@ func (d *DAG[TInput, TOutput]) Execute(root context.Context, input *TInput) *TOu
} else {
var a = new(TOutput)
var e = ErrValueType{actual: (*r)[0], expect: *a}
if d.logger != nil {
d.logger.Log(ctx, LevelError, e.Error(), e, e.actual, e.expect)
}
d.Log(ctx, &LogEventErrValueType{err: e})
results = nil
}
}(ctx)
Expand Down
1 change: 1 addition & 0 deletions workflow/simple/dag_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var ErrChannelOutputEmpty = errors.New("the output channel is empty")

// ErrWorkerPanicked reports when the worker is panicked.
type ErrWorkerPanicked struct {
panic any
error
}

Expand Down
198 changes: 141 additions & 57 deletions workflow/simple/dag_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,10 @@ package simple

import (
"context"
"encoding/json"
"fmt"
"runtime"
"time"
)

// LoggerInterface defines the logging method and the parameters required by the logger.
// For specific usage, please refer to Logger.
type LoggerInterface interface {
Log(ctx context.Context, level LogLevel, message string, args ...any)
Trace(ctx context.Context, level LogLevel, transit *Transit, message string, args ...any)

SetFlags(uint)
}

type LogLevel int

const (
Expand All @@ -30,51 +19,133 @@ const (
LevelError
)

type LoggerParams struct {
TimestampFormat string
Caller bool
logDebugEnabled bool
ExtraParams map[string]any
type LogEventInterface interface {
Transit() *Transit
Level() LogLevel
Message() string
}

type Logger struct {
params LoggerParams
LoggerInterface
type LogEventErrValueType struct {
LogEventInterface
err ErrValueType
}

const (
LDebugEnabled = 2
)
func (l *LogEventErrValueType) Message() string {
return l.err.Error()
}

func (l *Logger) SetFlags(flags uint) {
l.params.logDebugEnabled = flags&LDebugEnabled > 0
func (l *LogEventErrValueType) Level() LogLevel {
return LevelError
}

func (l *Logger) Log(ctx context.Context, level LogLevel, message string, args ...any) {
if !l.params.logDebugEnabled && (level == LevelDebug) {
return
}
data := map[string]any{
"timestamp": time.Now().Format(l.params.TimestampFormat),
"message": message,
}
func (l *LogEventErrValueType) Transit() *Transit {
return nil
}

if len(args) > 0 {
data["args"] = args
}
type LogEventTransitReportedError struct {
LogEventInterface
err error
transit *Transit
}

if l.params.Caller {
if pc, _, _, ok := runtime.Caller(1); ok {
fn := runtime.FuncForPC(pc)
data["caller"] = fn.Name()
}
}
func (l *LogEventTransitReportedError) Message() string {
return l.err.Error()
}

if b, err := json.Marshal(data); err != nil {
panic(err)
} else {
fmt.Print(string(b))
}
func (l *LogEventTransitReportedError) Level() LogLevel {
return LevelWarning
}

func (l *LogEventTransitReportedError) Transit() *Transit {
return l.transit
}

type LogEventTransitStart struct {
LogEventInterface
transit *Transit
}

func (l *LogEventTransitStart) Message() string {
return "starting..."
}

func (l *LogEventTransitStart) Transit() *Transit {
return l.transit
}

func (l *LogEventTransitStart) Level() LogLevel {
return LevelDebug
}

type LogEventTransitEnd struct {
LogEventInterface
transit *Transit
}

func (l *LogEventTransitEnd) Message() string {
return "ended."
}

func (l *LogEventTransitEnd) Transit() *Transit {
return l.transit
}

func (l *LogEventTransitEnd) Level() LogLevel {
return LevelDebug
}

type LogEventTransitCanceled struct {
LogEventInterface
transit *Transit
}

func (l *LogEventTransitCanceled) Message() string {
return "cancellation notified."
}

func (l *LogEventTransitCanceled) Transit() *Transit {
return l.transit
}

func (l *LogEventTransitCanceled) Level() LogLevel {
return LevelWarning
}

type LogEventTransitWorkerPanicked struct {
LogEventInterface
transit *Transit
err ErrWorkerPanicked
}

func (l *LogEventTransitWorkerPanicked) Message() string {
return "worker panicked."
}

func (l *LogEventTransitWorkerPanicked) Transit() *Transit {
return l.transit
}

func (l *LogEventTransitWorkerPanicked) Level() LogLevel {
return LevelError
}

// LoggerInterface defines the logging method and the parameters required by the logger.
// For specific usage, please refer to Logger.
type LoggerInterface interface {
Log(ctx context.Context, events ...LogEventInterface)

SetFlags(uint)
}

type LoggerParams struct {
TimestampFormat string
Caller bool
logDebugEnabled bool
}

type Logger struct {
params LoggerParams
LoggerInterface
}

const (
Expand All @@ -88,21 +159,34 @@ const (
reset = "\033[0m"
)

// Trace output trace information.
// level refers to the log information level.
// transit refers to the tracking transit.
// message refers to the tracking message.
// args refers to other parameters.
// By default, LevelDebug logs are not displayed. If you want to display, call SetFlags(LDebugEnabled)
func (l *Logger) Trace(ctx context.Context, level LogLevel, transit *Transit, message string, args ...any) {
if !l.params.logDebugEnabled && (level == LevelDebug) {
const (
LDebugEnabled = 2
)

func (l *Logger) SetFlags(flags uint) {
l.params.logDebugEnabled = flags&LDebugEnabled > 0
}

func (l *Logger) logEvent(ctx context.Context, event LogEventInterface) {
if !l.params.logDebugEnabled && (event.Level() == LevelDebug) {
return
}
color := green
if level == LevelWarning {
if event.Level() == LevelWarning {
color = yellow
} else if level == LevelError {
} else if event.Level() == LevelError {
color = red
}
fmt.Printf("[GO-DAG] %v |%s %10s %s| %s\n", time.Now().Format(l.params.TimestampFormat), color, transit.name, reset, message)
transit := event.Transit()
name := "<nil>"
if transit != nil {
name = transit.name
}
fmt.Printf("[GO-DAG] %v |%s %10s %s| %s\n", time.Now().Format(l.params.TimestampFormat), color, name, reset, event.Message())
}

func (l *Logger) Log(ctx context.Context, events ...LogEventInterface) {
for _, event := range events {
l.logEvent(ctx, event)
}
}
Loading

0 comments on commit 3f5373b

Please sign in to comment.