From 2d2e003705faaaf92f44970141a85ddc180742bb Mon Sep 17 00:00:00 2001 From: n0rdy Date: Sun, 19 Nov 2023 22:43:23 +0100 Subject: [PATCH 1/2] Implemented logging --- README.md | 21 ++- configs/pipeline.go | 9 +- configs/stage.go | 10 +- logging/channel.go | 68 ++++++++ logging/console.go | 61 +++++++ logging/logging.go | 10 ++ logging/noops.go | 23 +++ pipeline/pipeline.go | 32 +++- pippin_test.go | 220 ++++++++++++++++++++++++ stages/aggregate/aggregate.go | 34 +++- stages/asyncaggregate/asyncaggregate.go | 43 ++++- stages/stage.go | 6 +- stages/transform/transform.go | 70 +++++++- types/loglevels/loglevel.go | 36 ++++ utils/datetime.go | 7 + 15 files changed, 634 insertions(+), 16 deletions(-) create mode 100644 logging/channel.go create mode 100644 logging/console.go create mode 100644 logging/logging.go create mode 100644 logging/noops.go create mode 100644 types/loglevels/loglevel.go create mode 100644 utils/datetime.go diff --git a/README.md b/README.md index 105aef6..5d4d29b 100644 --- a/README.md +++ b/README.md @@ -240,18 +240,23 @@ and even if the pipeline rate limiter is full, the program will spawn a new goro - `MaxGoroutinesPerStage` - is an integer that indicates the maximum number of goroutines that can be spawned by each stage. If it is passed as `0` or less, then there is no limit. It is possible to change the limit for each stage individually - see `configs.StageConfig.MaxGoroutines`. -- `TimeoutInMillis` - is an integer that indicates the timeout in milliseconds for the entire pipeline. -If it is passed as `0` or less, then there is no timeout. +- `Timeout` - indicates the timeout for the entire pipeline. If it is passed as `0` or less, then there is no timeout. +- `Logger` is a logger that will be used by the pipeline. +If it is passed as nil, then the `logging.NoOpsLogger` logger will be used that does nothing. +Check `logging` package for more details and predefined loggers. If you pipeline performs any network calls within its transformation/aggregation logic, I'd suggest configuring the maximum number of goroutines to prevent the possible DDoS attack on the target server or reaching the maximum number of open files on the client machine. To create a pipeline with a custom configuration: ```go +pipelineConsoleLogger := logging.NewConsoleLogger(loglevels.DEBUG) + p := pipeline.FromSlice[int]([]int{1, 2, 3, 4, 5}, configs.PipelineConfig{ ManualStart: true, MaxGoroutinesTotal: 100, MaxGoroutinesPerStage: 10, - TimeoutInMillis: 1000, + Timeout: duration.Duration(1000) * time.Millisecond, + Logger: &pipelineConsoleLogger, }) ``` @@ -429,22 +434,28 @@ It is represented by the `configs.StageConfig` struct, which contains the follow If it is passed as `0` or less, then there is no limit. This config option can be used to change the limit for each stage that comes from the `configs.PipelineConfig.MaxGoroutinesPerStage` option (if provided). Please, note that the real number of goroutines might be higher than the number specified here, as the library spawns additional goroutines for internal purposes. -- `TimeoutInMillis` - is an integer that indicates the timeout in milliseconds for the stage. If it is passed as `0` or less, then there is no timeout. +- `Timeout` - indicates the timeout for the stage. If it is passed as `0` or less, then there is no timeout. - `StageConfig.CustomId` - is a custom ID for the stage. If it is passed as 0, then the stage will be assigned an ID automatically. Auto-generated IDs are calculated as follows: 1 + the ID of the previous stage. The initial stage (the one that is created first) has an ID of 1. It is recommended to either rely on the auto-generated IDs or to provide a custom ID for each stage, otherwise the IDs might be messed up due to the (1 + the ID of the previous stage) logic mentioned above. +- `Logger` is a logger that will be used by the pipeline. +If it is passed as nil, then the `logging.NoOpsLogger` logger will be used that does nothing. +Check `logging` package for more details and predefined loggers. +This config option can be used to change the logger for each stage that comes from the `configs.PipelineConfig.Logger` option (if provided). To create a transformation with a custom configuration: ```go p := pipeline.FromSlice[int]([]int{1, 2, 3, 4, 5}) +stageConsoleLogger := logging.NewConsoleLogger(loglevels.INFO) // multiplies each number by 2: mappingStage := transform.Map[int, int](filteringStage, func(i int) int { return i * 2 }, configs.StageConfig{ MaxGoroutines: 10, - TimeoutInMillis: 1000, + Timeout: time.Duration(1000) * time.Millisecond, CustomId: 1, + Logger: &stageConsoleLogger, }) ``` diff --git a/configs/pipeline.go b/configs/pipeline.go index a31a8b5..d9f94f4 100644 --- a/configs/pipeline.go +++ b/configs/pipeline.go @@ -1,6 +1,9 @@ package configs -import "time" +import ( + "github.com/n0rdy/pippin/logging" + "time" +) // PipelineConfig is a struct that contains the configuration for a pipeline // @@ -19,9 +22,13 @@ import "time" // // [PipelineConfig.Timeout] indicates the timeout for the pipeline. // If it is passed as 0 or less, then there is no timeout. +// +// [PipelineConfig.Logger] is a logger that will be used by the pipeline. +// If it is passed as nil, then the [logging.NoOpsLogger] logger will be used that does nothing. type PipelineConfig struct { ManualStart bool MaxGoroutinesTotal int MaxGoroutinesPerStage int Timeout time.Duration + Logger logging.Logger } diff --git a/configs/stage.go b/configs/stage.go index c1013c7..8a0bcdb 100644 --- a/configs/stage.go +++ b/configs/stage.go @@ -1,6 +1,9 @@ package configs -import "time" +import ( + "github.com/n0rdy/pippin/logging" + "time" +) // StageConfig is a struct that holds the configuration for a stage. // @@ -16,8 +19,13 @@ import "time" // Auto-generated IDs are calculated as follows: 1 + the ID of the previous stage. // The initial stage (the one that is created first) has an ID of 1. // It is recommended to either rely on the auto-generated IDs or to provide a custom ID for each stage, otherwise the IDs might be messed up due to the (1 + the ID of the previous stage) logic mentioned above. +// +// [StageConfig.Logger] is a logger that will be used by the stage. +// If it is passed as nil, then the [logging.NoOpsLogger] logger will be used that does nothing. +// This config option can be used to change the logger for each stage that comes from the [PipelineConfig.Logger] option (if provided). type StageConfig struct { MaxGoroutines int Timeout time.Duration CustomId int64 + Logger logging.Logger } diff --git a/logging/channel.go b/logging/channel.go new file mode 100644 index 0000000..e198f3c --- /dev/null +++ b/logging/channel.go @@ -0,0 +1,68 @@ +package logging + +import ( + "fmt" + "github.com/n0rdy/pippin/types/loglevels" + "github.com/n0rdy/pippin/utils" +) + +// ChannelLogger is a logger that writes to a channel. +// Please, make sure that you read from the channel, otherwise it will block the pipeline execution. +// +// ChannelLogger is handy when you want to write logs to a file or do some heavy actions on them, but don't want to affect performance of the pipeline. +// +// ChannelLogger accepts a log level as a parameter. +// It will print only those logs that have a level equal or higher than the specified one. +// Check [loglevels.LogLevel] for more details. +// +// Make sure to call the ChannelLogger.Close() method when you are done with the logger - this will close the channel. +type ChannelLogger struct { + ch chan<- string + level loglevels.LogLevel +} + +func NewChannelLogger(ch chan<- string, level loglevels.LogLevel) Logger { + return &ChannelLogger{ + ch: ch, + level: level, + } +} + +func (cl *ChannelLogger) Trace(message string) { + if cl.level <= loglevels.TRACE { + cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.TracePrefix + message + } +} + +func (cl *ChannelLogger) Debug(message string) { + if cl.level <= loglevels.DEBUG { + cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.DebugPrefix + message + } +} + +func (cl *ChannelLogger) Info(message string) { + if cl.level <= loglevels.INFO { + cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.InfoPrefix + message + } +} + +func (cl *ChannelLogger) Warn(message string, errs ...error) { + if cl.level <= loglevels.WARN { + cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.WarnPrefix + cl.messageWithErrors(message, errs...) + } +} + +func (cl *ChannelLogger) Error(message string, errs ...error) { + if cl.level <= loglevels.ERROR { + cl.ch <- utils.TimeNowAsRFC3339NanoString() + loglevels.ErrorPrefix + cl.messageWithErrors(message, errs...) + } +} + +func (cl *ChannelLogger) Close() error { + close(cl.ch) + return nil +} + +func (cl *ChannelLogger) messageWithErrors(message string, errs ...error) string { + return fmt.Sprintf(message, errs) +} diff --git a/logging/console.go b/logging/console.go new file mode 100644 index 0000000..7b548dc --- /dev/null +++ b/logging/console.go @@ -0,0 +1,61 @@ +package logging + +import ( + "fmt" + "github.com/n0rdy/pippin/types/loglevels" + "github.com/n0rdy/pippin/utils" +) + +// ConsoleLogger is a logger that prints logs to console. +// Basically, it uses fmt.Println() to print logs under the hood. +// +// ConsoleLogger accepts a log level as a parameter. +// It will print only those logs that have a level equal or higher than the specified one. +// Check [loglevels.LogLevel] for more details. +// +// The format of logs is: "time [log level] message" +// Example: +// 2006-01-02 15:04:05:000 [INFO] some cool info message +type ConsoleLogger struct { + level loglevels.LogLevel +} + +func NewConsoleLogger(level loglevels.LogLevel) Logger { + return &ConsoleLogger{ + level: level, + } +} + +func (cl *ConsoleLogger) Trace(message string) { + if cl.level <= loglevels.TRACE { + fmt.Println(utils.TimeNowAsRFC3339NanoString() + loglevels.TracePrefix + message) + } +} + +func (cl *ConsoleLogger) Debug(message string) { + if cl.level <= loglevels.DEBUG { + fmt.Println(utils.TimeNowAsRFC3339NanoString() + loglevels.DebugPrefix + message) + } +} + +func (cl *ConsoleLogger) Info(message string) { + if cl.level <= loglevels.INFO { + fmt.Println(utils.TimeNowAsRFC3339NanoString() + loglevels.InfoPrefix + message) + } +} + +func (cl *ConsoleLogger) Warn(message string, errs ...error) { + if cl.level <= loglevels.WARN { + fmt.Println(utils.TimeNowAsRFC3339NanoString()+loglevels.WarnPrefix+message, errs) + } +} + +func (cl *ConsoleLogger) Error(message string, errs ...error) { + if cl.level <= loglevels.ERROR { + fmt.Println(utils.TimeNowAsRFC3339NanoString()+loglevels.ErrorPrefix+message, errs) + } +} + +func (cl *ConsoleLogger) Close() error { + return nil +} diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 0000000..7b6fc5b --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,10 @@ +package logging + +type Logger interface { + Trace(message string) + Debug(message string) + Info(message string) + Warn(message string, errs ...error) + Error(message string, errs ...error) + Close() error +} diff --git a/logging/noops.go b/logging/noops.go new file mode 100644 index 0000000..b6ebefc --- /dev/null +++ b/logging/noops.go @@ -0,0 +1,23 @@ +package logging + +// NoOpsLogger is a logger that does nothing. +// This is a default logger for the service if no logger is configured. +type NoOpsLogger struct{} + +func NewNoOpsLogger() Logger { + return &NoOpsLogger{} +} + +func (nol *NoOpsLogger) Trace(message string) {} + +func (nol *NoOpsLogger) Debug(message string) {} + +func (nol *NoOpsLogger) Info(message string) {} + +func (nol *NoOpsLogger) Warn(message string, errs ...error) {} + +func (nol *NoOpsLogger) Error(message string, errs ...error) {} + +func (nol *NoOpsLogger) Close() error { + return nil +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index a720401..f2798a0 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -3,6 +3,7 @@ package pipeline import ( "context" "github.com/n0rdy/pippin/configs" + "github.com/n0rdy/pippin/logging" "github.com/n0rdy/pippin/ratelimiter" "github.com/n0rdy/pippin/stages" "github.com/n0rdy/pippin/types" @@ -34,6 +35,7 @@ type Pipeline[A any] struct { ctx context.Context ctxCancelFunc context.CancelFunc statusChan chan statuses.Status + logger logging.Logger } type parsedConfigs struct { @@ -41,6 +43,7 @@ type parsedConfigs struct { pipelineRateLimiter *ratelimiter.RateLimiter stageRateLimiter *ratelimiter.RateLimiter timeout time.Duration + logger logging.Logger } // Start starts the pipeline if it was created with the delayed manual start. @@ -78,8 +81,10 @@ func (p *Pipeline[A]) listenToStatusUpdates() { switch status { case statuses.Done, statuses.Interrupted, statuses.TimedOut: + p.logger.Info("Pipeline: finished with status " + status.String()) close(p.statusChan) utils.StopSafely(p.timeoutTimer) + p.logger.Close() } } } @@ -133,7 +138,7 @@ func FromChannel[T any](fromCh <-chan T, confs ...configs.PipelineConfig) *Pipel // There is a possibility to change the limit for each stage individually - see [configs.StageConfig.MaxGoroutines]. // If the limit is reached, then the pipeline will wait until the number of goroutines is decreased. // -// The [configs.PipelineConfig.TimeoutInMillis] config can be used to set the timeout for the pipeline. +// The [configs.PipelineConfig.Timeout] config can be used to set the timeout for the pipeline. func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfig) *Pipeline[T] { initChan := make(chan T) pipelineStatusChan := make(chan statuses.Status) @@ -146,6 +151,9 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi if starter != nil { stageStarter = make(chan struct{}) } + logger := pc.logger + + logger.Debug("Pipeline: initiating...") var status statuses.Status if starter == nil { @@ -156,8 +164,8 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi p := &Pipeline[T]{ InitStage: stages.NewInitStage( - initChan, pc.pipelineRateLimiter, pc.stageRateLimiter, - stageStarter, ctx, ctxCancelFunc, pipelineStatusChan, + initChan, pc.pipelineRateLimiter, pc.stageRateLimiter, stageStarter, + ctx, ctxCancelFunc, pipelineStatusChan, logger, ), Status: status, rateLimiter: pc.pipelineRateLimiter, @@ -165,6 +173,7 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi ctx: ctx, ctxCancelFunc: ctxCancelFunc, statusChan: pipelineStatusChan, + logger: logger, } go p.listenToStatusUpdates() @@ -173,22 +182,29 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi defer close(initChan) if starter != nil { + logger.Debug("Pipeline: waiting for the start signal...") + select { case _, ok := <-starter: if ok { + logger.Debug("Pipeline: start signal received") stageStarter <- struct{}{} close(starter) } case <-ctx.Done(): + logger.Debug("Pipeline: interrupted before the start signal") // if the pipeline is interrupted before it is started, then return close(starter) return } } + logger.Info("Pipeline: started") + go func() { if pc.timeout > 0 { p.timeoutTimer = time.AfterFunc(pc.timeout, func() { + logger.Info("Pipeline: timeout reached for pipeline - interrupting the pipeline") ctxCancelFunc() pipelineStatusChan <- statuses.TimedOut }) @@ -196,6 +212,7 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi }() pipelineInitFunc(initChan) + logger.Info("Stage 1: finished") }() return p @@ -206,6 +223,7 @@ func parseConfigs(confs ...configs.PipelineConfig) *parsedConfigs { var pipelineRateLimiter *ratelimiter.RateLimiter var stageRateLimiter *ratelimiter.RateLimiter var timeout time.Duration + var logger logging.Logger if len(confs) > 0 { conf := confs[0] @@ -221,6 +239,13 @@ func parseConfigs(confs ...configs.PipelineConfig) *parsedConfigs { if conf.Timeout > 0 { timeout = conf.Timeout } + if conf.Logger != nil { + logger = conf.Logger + } + } + + if logger == nil { + logger = logging.NewNoOpsLogger() } return &parsedConfigs{ @@ -228,5 +253,6 @@ func parseConfigs(confs ...configs.PipelineConfig) *parsedConfigs { pipelineRateLimiter: pipelineRateLimiter, stageRateLimiter: stageRateLimiter, timeout: timeout, + logger: logger, } } diff --git a/pippin_test.go b/pippin_test.go index b80b37b..d37beb0 100644 --- a/pippin_test.go +++ b/pippin_test.go @@ -3,11 +3,13 @@ package main import ( "fmt" "github.com/n0rdy/pippin/configs" + "github.com/n0rdy/pippin/logging" "github.com/n0rdy/pippin/pipeline" "github.com/n0rdy/pippin/stages/aggregate" "github.com/n0rdy/pippin/stages/asyncaggregate" "github.com/n0rdy/pippin/stages/transform" "github.com/n0rdy/pippin/types" + "github.com/n0rdy/pippin/types/loglevels" "github.com/n0rdy/pippin/types/statuses" "github.com/n0rdy/pippin/utils" "go.uber.org/goleak" @@ -1432,6 +1434,224 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageRateLimiting_PerStage_Ove } } +func TestFromSlice_AllPossibleTransformations_Sum_PipelineConsoleLogger_Success(t *testing.T) { + pipelineConsoleLogger := logging.NewConsoleLogger(loglevels.TRACE) + + p := pipeline.FromSlice( + []string{"1", "a", "2", "-3", "4", "5", "b"}, + configs.PipelineConfig{ + Logger: pipelineConsoleLogger, + }, + ) + + if p.Status != statuses.Running { + t.Errorf("expected status to be Running, got %s", p.Status.String()) + } + + atoiStage := transform.MapWithError( + p.InitStage, + func(input string) (int, error) { + return strconv.Atoi(input) + }, + func(err error) { + fmt.Println(err) + }, + ) + // 1, 2, -3, 4, 5 + + oddNumsStage := transform.Filter(atoiStage, func(input int) bool { + return input%2 != 0 + }) + // 1, -3, 5 + + multipliedByTwoStage := transform.Map(oddNumsStage, func(input int) int { + return input * 2 + }) + // 2, -6, 10 + + toMatrixStage := transform.MapWithErrorMapper( + multipliedByTwoStage, + func(input int) ([]int, error) { + if input < 0 { + return nil, fmt.Errorf("negative number %d", input) + } + + res := make([]int, input) + for i := 0; i < input; i++ { + res[i] = input * i + } + return res, nil + }, + func(err error) []int { + return []int{42} + }, + ) + // [0, 2], [42], [0, 10, 20, 30, 40, 50, 60, 70, 80, 90] + + plusOneStage := transform.FlatMapWithError( + toMatrixStage, + func(input int) ([]int, error) { + if input == 0 { + return nil, fmt.Errorf("zero") + } + + return []int{input + 1}, nil + }, + func(err error) { + fmt.Println(err) + }, + ) + // [3], [43], [11], [21], [31], [41], [51], [61], [71], [81], [91] + + greaterThan42Stage := transform.FlatMapWithErrorMapper( + plusOneStage, + func(input int) ([]int, error) { + if input <= 42 { + return nil, fmt.Errorf("42") + } + return []int{input}, nil + }, + func(err error) []int { + return []int{0} + }, + ) + // [0], [43], [0], [0], [0], [0], [51], [61], [71], [81], [91] + + flattenedStage := transform.FlatMap(greaterThan42Stage, func(input int) int { + return input + }) + // [0, 43, 0, 0, 0, 0, 51, 61, 71, 81, 91] + + sum, err := aggregate.Sum(flattenedStage) + // 398 + + if err != nil { + t.Errorf("expected no error, got %v", err) + } else { + if *sum != 398 { + t.Errorf("expected sum to be 398, got %d", *sum) + } + } + + // to sync with the pipeline + time.Sleep(1 * time.Second) + + if p.Status != statuses.Done { + t.Errorf("expected status to be done, got %s", p.Status.String()) + } +} + +func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndStageConsoleLogger_Success(t *testing.T) { + pipelineConsoleLogger := logging.NewConsoleLogger(loglevels.TRACE) + stageConsoleLogger := logging.NewConsoleLogger(loglevels.INFO) + + p := pipeline.FromSlice( + []string{"1", "a", "2", "-3", "4", "5", "b"}, + configs.PipelineConfig{ + Logger: pipelineConsoleLogger, + }, + ) + + if p.Status != statuses.Running { + t.Errorf("expected status to be Running, got %s", p.Status.String()) + } + + atoiStage := transform.MapWithError( + p.InitStage, + func(input string) (int, error) { + return strconv.Atoi(input) + }, + func(err error) { + fmt.Println(err) + }, + ) + // 1, 2, -3, 4, 5 + + oddNumsStage := transform.Filter(atoiStage, func(input int) bool { + return input%2 != 0 + }) + // 1, -3, 5 + + multipliedByTwoStage := transform.Map(oddNumsStage, func(input int) int { + return input * 2 + }) + // 2, -6, 10 + + toMatrixStage := transform.MapWithErrorMapper( + multipliedByTwoStage, + func(input int) ([]int, error) { + if input < 0 { + return nil, fmt.Errorf("negative number %d", input) + } + + res := make([]int, input) + for i := 0; i < input; i++ { + res[i] = input * i + } + return res, nil + }, + func(err error) []int { + return []int{42} + }, + configs.StageConfig{ + Logger: stageConsoleLogger, + }, + ) + // [0, 2], [42], [0, 10, 20, 30, 40, 50, 60, 70, 80, 90] + + plusOneStage := transform.FlatMapWithError( + toMatrixStage, + func(input int) ([]int, error) { + if input == 0 { + return nil, fmt.Errorf("zero") + } + + return []int{input + 1}, nil + }, + func(err error) { + fmt.Println(err) + }, + ) + // [3], [43], [11], [21], [31], [41], [51], [61], [71], [81], [91] + + greaterThan42Stage := transform.FlatMapWithErrorMapper( + plusOneStage, + func(input int) ([]int, error) { + if input <= 42 { + return nil, fmt.Errorf("42") + } + return []int{input}, nil + }, + func(err error) []int { + return []int{0} + }, + ) + // [0], [43], [0], [0], [0], [0], [51], [61], [71], [81], [91] + + flattenedStage := transform.FlatMap(greaterThan42Stage, func(input int) int { + return input + }) + // [0, 43, 0, 0, 0, 0, 51, 61, 71, 81, 91] + + sum, err := aggregate.Sum(flattenedStage) + // 398 + + if err != nil { + t.Errorf("expected no error, got %v", err) + } else { + if *sum != 398 { + t.Errorf("expected sum to be 398, got %d", *sum) + } + } + + // to sync with the pipeline + time.Sleep(1 * time.Second) + + if p.Status != statuses.Done { + t.Errorf("expected status to be done, got %s", p.Status.String()) + } +} + func TestFromSlice_Map_Sync_SumComplexType_Success(t *testing.T) { p := pipeline.FromSlice( []string{"1", "2", "-3", "4", "5"}, diff --git a/stages/aggregate/aggregate.go b/stages/aggregate/aggregate.go index 5701031..0a1af58 100644 --- a/stages/aggregate/aggregate.go +++ b/stages/aggregate/aggregate.go @@ -4,11 +4,13 @@ import ( "cmp" "github.com/n0rdy/pippin/configs" "github.com/n0rdy/pippin/functions" + "github.com/n0rdy/pippin/logging" "github.com/n0rdy/pippin/stages" "github.com/n0rdy/pippin/types" "github.com/n0rdy/pippin/types/statuses" "github.com/n0rdy/pippin/utils" "sort" + "strconv" "sync" "time" ) @@ -511,17 +513,27 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR validate(prevStage) inChan := prevStage.Chan + localLogger, stageSpecific := localLogger(prevStage.Logger, confs...) + if stageSpecific { + defer localLogger.Close() + } localTimeout := localTimeout(confs...) + var stageIdAsString string customStageId := customStageId(confs...) if customStageId != 0 { - prevStage.Id = customStageId + stageIdAsString = "stage " + strconv.FormatInt(customStageId, 10) + ": " + } else { + stageIdAsString = "stage " + strconv.FormatInt(prevStage.Id+1, 10) + ": " } + localLogger.Debug(stageIdAsString + "initiating...") + var timeoutTimer *time.Timer go func() { if localTimeout > 0 { timeoutTimer = time.AfterFunc(localTimeout, func() { + localLogger.Info(stageIdAsString + "timeout reached for stage - interrupting the pipeline") prevStage.InterruptPipeline() prevStage.SetPipelineStatus(statuses.TimedOut) }) @@ -535,11 +547,15 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR select { case in, ok := <-inChan: if ok { + localLogger.Debug(stageIdAsString + "input received") result = aggFunc(result, in) + localLogger.Debug(stageIdAsString + "input processed") } else { + localLogger.Debug(stageIdAsString + "input channel closed") running = false } case <-prevStage.Context().Done(): + localLogger.Debug(stageIdAsString + "context done signal received") utils.StopSafely(timeoutTimer) return nil, prevStage.Context().Err() } @@ -550,15 +566,31 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR res := resFunc(result) prevStage.SetPipelineStatus(statuses.Done) + localLogger.Info(stageIdAsString + "finished") + return &res, nil } func validate[In any](prevStage stages.Stage[In]) { if prevStage.Starter != nil { + prevStage.Logger.Error("Sync aggregation doesn't support manual delayed start - use async aggregation from the [asyncaggregate] package instead") panic("Sync aggregation doesn't support manual delayed start - use async aggregation from the [asyncaggregate] package instead") } } +func localLogger(stageLogger logging.Logger, confs ...configs.StageConfig) (logging.Logger, bool) { + if len(confs) == 0 { + return stageLogger, false + } + + conf := confs[0] + if conf.Logger != nil { + // stage configs overrides pipeline configs for logger + return conf.Logger, true + } + return stageLogger, false +} + func localTimeout(confs ...configs.StageConfig) time.Duration { if len(confs) == 0 { return 0 diff --git a/stages/asyncaggregate/asyncaggregate.go b/stages/asyncaggregate/asyncaggregate.go index 65c5aec..66056c4 100644 --- a/stages/asyncaggregate/asyncaggregate.go +++ b/stages/asyncaggregate/asyncaggregate.go @@ -4,12 +4,14 @@ import ( "cmp" "github.com/n0rdy/pippin/configs" "github.com/n0rdy/pippin/functions" + "github.com/n0rdy/pippin/logging" "github.com/n0rdy/pippin/ratelimiter" "github.com/n0rdy/pippin/stages" "github.com/n0rdy/pippin/types" "github.com/n0rdy/pippin/types/statuses" "github.com/n0rdy/pippin/utils" "sort" + "strconv" "sync" "sync/atomic" "time" @@ -532,23 +534,36 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR errChan := make(chan error) localRateLimiter := localRateLimiter(prevStage.StageRateLimiter, confs...) + localLogger, stageSpecific := localLogger(prevStage.Logger, confs...) + if stageSpecific { + defer localLogger.Close() + } localTimeout := localTimeout(confs...) + var stageIdAsString string customStageId := customStageId(confs...) if customStageId != 0 { - prevStage.Id = customStageId + stageIdAsString = "stage " + strconv.FormatInt(customStageId, 10) + ": " + } else { + stageIdAsString = "stage " + strconv.FormatInt(prevStage.Id+1, 10) + ": " } + localLogger.Debug(stageIdAsString + "initiating...") + go func() { defer ratelimiter.CloseSafely(localRateLimiter) if prevStage.Starter != nil { + localLogger.Debug(stageIdAsString + "waiting for the start signal...") + select { case _, ok := <-prevStage.Starter: if ok { + localLogger.Debug(stageIdAsString + "start signal received") close(prevStage.Starter) } case <-prevStage.Context().Done(): - utils.DrainChan(inChan) + localLogger.Debug(stageIdAsString + "context done signal received before the start signal") + errChan <- prevStage.Context().Err() close(prevStage.Starter) // if the pipeline is interrupted before it is started, then return @@ -556,10 +571,13 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR } } + localLogger.Info(stageIdAsString + "started") + var timeoutTimer *time.Timer go func() { if localTimeout > 0 { timeoutTimer = time.AfterFunc(localTimeout, func() { + localLogger.Info(stageIdAsString + "timeout reached for stage - interrupting the pipeline") prevStage.InterruptPipeline() prevStage.SetPipelineStatus(statuses.TimedOut) }) @@ -573,6 +591,7 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR select { case in, ok := <-inChan: if ok { + localLogger.Debug(stageIdAsString + "input received") // to make sure that at least 1 goroutine is running regardless of the pipeline rate limiter (if configured) acquired := ratelimiter.AcquireSafelyIfRunning(prevStage.PipelineRateLimiter, numOfWorkers) ratelimiter.AcquireSafely(localRateLimiter) @@ -586,14 +605,18 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR m.Lock() defer m.Unlock() aggrRes = aggFunc(aggrRes, inArg, localWg) + localLogger.Debug(stageIdAsString + "input processed") }(in, acquired) } else { + localLogger.Debug(stageIdAsString + "input channel closed") // without this wait, the future might be completed before all the goroutines are finished localWg.Wait() doneChan <- struct{}{} running = false } case <-prevStage.Context().Done(): + localLogger.Debug(stageIdAsString + "context done signal received") + utils.DrainChan(inChan) errChan <- prevStage.Context().Err() running = false } @@ -610,11 +633,14 @@ func aggregate[In, Aggr, Res any](prevStage stages.Stage[In], aggFunc func(aggrR select { case <-doneChan: + localLogger.Debug(stageIdAsString + "future completed successfully") futureRes.Complete(resFunc(aggrRes)) prevStage.SetPipelineStatus(statuses.Done) case err := <-errChan: + localLogger.Error(stageIdAsString + "future completed with error") futureRes.Fail(err) } + localLogger.Info(stageIdAsString + "finished") }() return &futureRes @@ -634,6 +660,19 @@ func localRateLimiter(stageRateLimiter *ratelimiter.RateLimiter, confs ...config return stageRateLimiter } +func localLogger(stageLogger logging.Logger, confs ...configs.StageConfig) (logging.Logger, bool) { + if len(confs) == 0 { + return stageLogger, false + } + + conf := confs[0] + if conf.Logger != nil { + // stage configs overrides pipeline configs for logger + return conf.Logger, true + } + return stageLogger, false +} + func localTimeout(confs ...configs.StageConfig) time.Duration { if len(confs) == 0 { return 0 diff --git a/stages/stage.go b/stages/stage.go index 2312c0c..38c406b 100644 --- a/stages/stage.go +++ b/stages/stage.go @@ -2,6 +2,7 @@ package stages import ( "context" + "github.com/n0rdy/pippin/logging" "github.com/n0rdy/pippin/ratelimiter" "github.com/n0rdy/pippin/types/statuses" ) @@ -52,6 +53,7 @@ type Stage[T any] struct { PipelineRateLimiter *ratelimiter.RateLimiter StageRateLimiter *ratelimiter.RateLimiter Starter chan struct{} + Logger logging.Logger stageCtx context.Context stageCtxCancelFunc context.CancelFunc pipelineCtxCancelFunc context.CancelFunc @@ -59,7 +61,7 @@ type Stage[T any] struct { } // NewInitStage is a function that creates a new stage based on the provided parameters. -func NewInitStage[T any](ch <-chan T, pipelineRateLimiter *ratelimiter.RateLimiter, stageRateLimiter *ratelimiter.RateLimiter, starter chan struct{}, pipelineCtx context.Context, pipelineCtxCancelFunc context.CancelFunc, pipelineStatusChan chan<- statuses.Status) Stage[T] { +func NewInitStage[T any](ch <-chan T, pipelineRateLimiter *ratelimiter.RateLimiter, stageRateLimiter *ratelimiter.RateLimiter, starter chan struct{}, pipelineCtx context.Context, pipelineCtxCancelFunc context.CancelFunc, pipelineStatusChan chan<- statuses.Status, logger logging.Logger) Stage[T] { stageCtx, stageCtxCancelFunc := context.WithCancel(pipelineCtx) return Stage[T]{ @@ -68,6 +70,7 @@ func NewInitStage[T any](ch <-chan T, pipelineRateLimiter *ratelimiter.RateLimit PipelineRateLimiter: pipelineRateLimiter, StageRateLimiter: stageRateLimiter, Starter: starter, + Logger: logger, stageCtx: stageCtx, stageCtxCancelFunc: stageCtxCancelFunc, pipelineCtxCancelFunc: pipelineCtxCancelFunc, @@ -91,6 +94,7 @@ func FromStage[In, Out any](stage Stage[In], ch <-chan Out, starter chan struct{ PipelineRateLimiter: stage.PipelineRateLimiter, StageRateLimiter: ratelimiter.Copy(stage.StageRateLimiter), Starter: starter, + Logger: stage.Logger, stageCtx: stageCtx, stageCtxCancelFunc: stageCtxCancelFunc, pipelineCtxCancelFunc: stage.pipelineCtxCancelFunc, diff --git a/stages/transform/transform.go b/stages/transform/transform.go index dde9a3b..a61d352 100644 --- a/stages/transform/transform.go +++ b/stages/transform/transform.go @@ -3,10 +3,12 @@ package transform import ( "github.com/n0rdy/pippin/configs" "github.com/n0rdy/pippin/functions" + "github.com/n0rdy/pippin/logging" "github.com/n0rdy/pippin/ratelimiter" "github.com/n0rdy/pippin/stages" "github.com/n0rdy/pippin/types/statuses" "github.com/n0rdy/pippin/utils" + "strconv" "sync" "sync/atomic" "time" @@ -242,35 +244,52 @@ func transform[In, Out any](prevStage stages.Stage[In], transformFunc func(inArg } localRateLimiter := localRateLimiter(prevStage.StageRateLimiter, confs...) + localLogger, stageSpecific := localLogger(prevStage.Logger, confs...) + if stageSpecific { + defer localLogger.Close() + } localTimeout := localTimeout(confs...) + var stageIdAsString string customStageId := customStageId(confs...) if customStageId != 0 { - prevStage.Id = customStageId + stageIdAsString = "stage " + strconv.FormatInt(customStageId, 10) + ": " + } else { + stageIdAsString = "stage " + strconv.FormatInt(prevStage.Id+1, 10) + ": " } + localLogger.Debug(stageIdAsString + "initiating...") + go func() { defer ratelimiter.CloseSafely(localRateLimiter) defer close(outChan) if prevStage.Starter != nil { + localLogger.Debug(stageIdAsString + "waiting for the start signal...") + select { case _, ok := <-prevStage.Starter: if ok { + localLogger.Debug(stageIdAsString + "start signal received") nextStageStarter <- struct{}{} close(prevStage.Starter) } case <-prevStage.Context().Done(): + localLogger.Debug(stageIdAsString + "context done signal received before the start signal") + close(prevStage.Starter) // if the pipeline is interrupted before it is started, then return return } } + localLogger.Info(stageIdAsString + "started") + var timeoutTimer *time.Timer go func() { if localTimeout > 0 { timeoutTimer = time.AfterFunc(localTimeout, func() { + localLogger.Info(stageIdAsString + "timeout reached for stage - interrupting the pipeline") prevStage.InterruptPipeline() prevStage.SetPipelineStatus(statuses.TimedOut) }) @@ -284,6 +303,7 @@ func transform[In, Out any](prevStage stages.Stage[In], transformFunc func(inArg select { case in, ok := <-inChan: if ok { + localLogger.Debug(stageIdAsString + "input received") // to make sure that at least 1 goroutine is running regardless of the pipeline rate limiter (if configured) acquired := ratelimiter.AcquireSafelyIfRunning(prevStage.PipelineRateLimiter, numOfWorkers) ratelimiter.AcquireSafely(localRateLimiter) @@ -295,11 +315,16 @@ func transform[In, Out any](prevStage stages.Stage[In], transformFunc func(inArg defer ratelimiter.ReleaseSafely(localRateLimiter) transformFunc(inArg, outChan, localWg) + + localLogger.Debug(stageIdAsString + "input processed") }(in, acquired) } else { + localLogger.Debug(stageIdAsString + "input channel closed") running = false } case <-prevStage.Context().Done(): + localLogger.Debug(stageIdAsString + "context done signal received") + utils.DrainChan(inChan) running = false } @@ -307,6 +332,8 @@ func transform[In, Out any](prevStage stages.Stage[In], transformFunc func(inArg localWg.Wait() utils.StopSafely(timeoutTimer) + + localLogger.Info(stageIdAsString + "finished") }() return stages.FromStage(prevStage, outChan, nextStageStarter) @@ -322,35 +349,54 @@ func transformAsync[In, Out any](prevStage stages.Stage[In], transformAsyncFunc } localRateLimiter := localRateLimiter(prevStage.StageRateLimiter, confs...) + localLogger, stageSpecific := localLogger(prevStage.Logger, confs...) + if stageSpecific { + defer localLogger.Close() + } localTimeout := localTimeout(confs...) + var stageIdAsString string customStageId := customStageId(confs...) if customStageId != 0 { - prevStage.Id = customStageId + stageIdAsString = "stage " + strconv.FormatInt(customStageId, 10) + ": " + } else { + stageIdAsString = "stage " + strconv.FormatInt(prevStage.Id+1, 10) + ": " } + localLogger.Debug(stageIdAsString + "initiating...") + go func() { defer ratelimiter.CloseSafely(localRateLimiter) defer close(outChan) if prevStage.Starter != nil { + localLogger.Debug(stageIdAsString + "waiting for the start signal...") + select { case _, ok := <-prevStage.Starter: if ok { + localLogger.Debug(stageIdAsString + "start signal received") + nextStageStarter <- struct{}{} close(prevStage.Starter) } case <-prevStage.Context().Done(): + localLogger.Debug(stageIdAsString + "context done signal received before the start signal") + close(prevStage.Starter) // if the pipeline is interrupted before it is started, then return return } } + localLogger.Info(stageIdAsString + "started") + var timeoutTimer *time.Timer go func() { if localTimeout > 0 { timeoutTimer = time.AfterFunc(localTimeout, func() { + localLogger.Info(stageIdAsString + "timeout reached for stage - interrupting the pipeline") + prevStage.InterruptPipeline() prevStage.SetPipelineStatus(statuses.TimedOut) }) @@ -365,6 +411,7 @@ func transformAsync[In, Out any](prevStage stages.Stage[In], transformAsyncFunc select { case in, ok := <-inChan: if ok { + localLogger.Debug(stageIdAsString + "input received") // to make sure that at least 1 goroutine is running regardless of the pipeline rate limiter (if configured) acquired := ratelimiter.AcquireSafelyIfRunning(prevStage.PipelineRateLimiter, numOfWorkers) ratelimiter.AcquireSafely(localRateLimiter) @@ -376,11 +423,15 @@ func transformAsync[In, Out any](prevStage stages.Stage[In], transformAsyncFunc defer ratelimiter.ReleaseSafely(localRateLimiter) transformAsyncFunc(inArg, outChan, localWg, prevStage.PipelineRateLimiter, localRateLimiter, numOfAsyncWorkersArg) + + localLogger.Debug(stageIdAsString + "input processed") }(in, acquired, numOfAsyncWorkers) } else { + localLogger.Debug(stageIdAsString + "input channel closed") running = false } case <-prevStage.Context().Done(): + localLogger.Debug(stageIdAsString + "context done signal received") utils.DrainChan(inChan) running = false } @@ -388,6 +439,8 @@ func transformAsync[In, Out any](prevStage stages.Stage[In], transformAsyncFunc localWg.Wait() utils.StopSafely(timeoutTimer) + + localLogger.Info(stageIdAsString + "finished") }() return stages.FromStage(prevStage, outChan, nextStageStarter) @@ -407,6 +460,19 @@ func localRateLimiter(stageRateLimiter *ratelimiter.RateLimiter, confs ...config return stageRateLimiter } +func localLogger(stageLogger logging.Logger, confs ...configs.StageConfig) (logging.Logger, bool) { + if len(confs) == 0 { + return stageLogger, false + } + + conf := confs[0] + if conf.Logger != nil { + // stage configs overrides pipeline configs for logger + return conf.Logger, true + } + return stageLogger, false +} + func localTimeout(confs ...configs.StageConfig) time.Duration { if len(confs) == 0 { return 0 diff --git a/types/loglevels/loglevel.go b/types/loglevels/loglevel.go new file mode 100644 index 0000000..4404ce4 --- /dev/null +++ b/types/loglevels/loglevel.go @@ -0,0 +1,36 @@ +package loglevels + +const ( + TRACE = iota + DEBUG + INFO + WARN + ERROR + + TracePrefix = " [TRACE] " + DebugPrefix = " [DEBUG] " + InfoPrefix = " [INFO] " + WarnPrefix = " [WARN] " + ErrorPrefix = " [ERROR] " +) + +// LogLevel is a type that represents a log level. +// The hierarchy of log levels is the following: TRACE < DEBUG < INFO < WARN < ERROR. +type LogLevel int + +func (ll LogLevel) String() string { + switch ll { + case TRACE: + return "TRACE" + case DEBUG: + return "DEBUG" + case INFO: + return "INFO" + case WARN: + return "WARN" + case ERROR: + return "ERROR" + default: + return "UNKNOWN" + } +} diff --git a/utils/datetime.go b/utils/datetime.go new file mode 100644 index 0000000..5a56b91 --- /dev/null +++ b/utils/datetime.go @@ -0,0 +1,7 @@ +package utils + +import "time" + +func TimeNowAsRFC3339NanoString() string { + return time.Now().Format(time.RFC3339Nano) +} From eff01d93ab77bd08da95d897f4dfdc13b175e52b Mon Sep 17 00:00:00 2001 From: n0rdy Date: Sun, 19 Nov 2023 22:50:59 +0100 Subject: [PATCH 2/2] Fixed README file examples for logging configs --- README.md | 7 ++----- pipeline/pipeline.go | 1 + pippin_test.go | 37 ++++++++++++++++--------------------- types/statuses/status.go | 12 ++++++------ 4 files changed, 25 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 5d4d29b..763ee97 100644 --- a/README.md +++ b/README.md @@ -249,14 +249,12 @@ If you pipeline performs any network calls within its transformation/aggregation To create a pipeline with a custom configuration: ```go -pipelineConsoleLogger := logging.NewConsoleLogger(loglevels.DEBUG) - p := pipeline.FromSlice[int]([]int{1, 2, 3, 4, 5}, configs.PipelineConfig{ ManualStart: true, MaxGoroutinesTotal: 100, MaxGoroutinesPerStage: 10, Timeout: duration.Duration(1000) * time.Millisecond, - Logger: &pipelineConsoleLogger, + Logger: logging.NewConsoleLogger(loglevels.DEBUG), }) ``` @@ -447,7 +445,6 @@ To create a transformation with a custom configuration: ```go p := pipeline.FromSlice[int]([]int{1, 2, 3, 4, 5}) -stageConsoleLogger := logging.NewConsoleLogger(loglevels.INFO) // multiplies each number by 2: mappingStage := transform.Map[int, int](filteringStage, func(i int) int { return i * 2 @@ -455,7 +452,7 @@ mappingStage := transform.Map[int, int](filteringStage, func(i int) int { MaxGoroutines: 10, Timeout: time.Duration(1000) * time.Millisecond, CustomId: 1, - Logger: &stageConsoleLogger, + Logger: logging.NewConsoleLogger(loglevels.INFO), }) ``` diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index f2798a0..d947d7d 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -200,6 +200,7 @@ func from[T any](pipelineInitFunc func(chan<- T), confs ...configs.PipelineConfi } logger.Info("Pipeline: started") + logger.Info("Stage 1: started") go func() { if pc.timeout > 0 { diff --git a/pippin_test.go b/pippin_test.go index d37beb0..61fbea8 100644 --- a/pippin_test.go +++ b/pippin_test.go @@ -119,7 +119,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_NoConfigs_Success(t *testing.T time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -235,7 +235,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_ManualStart_Success(t *testing time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -350,7 +350,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_ManualStart_InterruptedBeforeS time.Sleep(1 * time.Second) if p.Status != statuses.Interrupted { - t.Errorf("expected status to be interrupted, got %s", p.Status.String()) + t.Errorf("expected status to be Interrupted, got %s", p.Status.String()) } } @@ -483,7 +483,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_ManualStart_InterruptedAfterSt time.Sleep(1 * time.Second) if p.Status != statuses.Interrupted { - t.Errorf("expected status to be interrupted, got %s", p.Status.String()) + t.Errorf("expected status to be Interrupted, got %s", p.Status.String()) } } @@ -793,7 +793,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineRateLimiting_TooLowThr time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -898,7 +898,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineRateLimiting_HighThres time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -1004,7 +1004,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndStageRateLimiting_S time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -1109,7 +1109,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageRateLimiting_PerStage_Hig time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -1214,7 +1214,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageRateLimiting_PerStage_Low time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -1322,7 +1322,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageRateLimiting_ForStage_Suc time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } @@ -1430,17 +1430,15 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageRateLimiting_PerStage_Ove time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } func TestFromSlice_AllPossibleTransformations_Sum_PipelineConsoleLogger_Success(t *testing.T) { - pipelineConsoleLogger := logging.NewConsoleLogger(loglevels.TRACE) - p := pipeline.FromSlice( []string{"1", "a", "2", "-3", "4", "5", "b"}, configs.PipelineConfig{ - Logger: pipelineConsoleLogger, + Logger: logging.NewConsoleLogger(loglevels.TRACE), }, ) @@ -1537,18 +1535,15 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineConsoleLogger_Success( time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndStageConsoleLogger_Success(t *testing.T) { - pipelineConsoleLogger := logging.NewConsoleLogger(loglevels.TRACE) - stageConsoleLogger := logging.NewConsoleLogger(loglevels.INFO) - p := pipeline.FromSlice( []string{"1", "a", "2", "-3", "4", "5", "b"}, configs.PipelineConfig{ - Logger: pipelineConsoleLogger, + Logger: logging.NewConsoleLogger(loglevels.TRACE), }, ) @@ -1594,7 +1589,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndStageConsoleLogger_ return []int{42} }, configs.StageConfig{ - Logger: stageConsoleLogger, + Logger: logging.NewConsoleLogger(loglevels.INFO), }, ) // [0, 2], [42], [0, 10, 20, 30, 40, 50, 60, 70, 80, 90] @@ -1648,7 +1643,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineAndStageConsoleLogger_ time.Sleep(1 * time.Second) if p.Status != statuses.Done { - t.Errorf("expected status to be done, got %s", p.Status.String()) + t.Errorf("expected status to be Done, got %s", p.Status.String()) } } diff --git a/types/statuses/status.go b/types/statuses/status.go index ec69c10..35a02ae 100644 --- a/types/statuses/status.go +++ b/types/statuses/status.go @@ -21,16 +21,16 @@ type Status int func (s Status) String() string { switch s { case Pending: - return "pending" + return "Pending" case Running: - return "running" + return "Running" case Done: - return "done" + return "Done" case Interrupted: - return "interrupted" + return "Interrupted" case TimedOut: - return "timedOut" + return "TimedOut" default: - return "unknown" + return "Unknown" } }