diff --git a/README.md b/README.md index 8636073..334cb1b 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,117 @@ if err != nil { // 12 ``` +### More detailed example + +```go +// create a new pipeline from a slice of integers: +p := pipeline.FromSlice( + []string{"1", "a", "2", "-3", "4", "5", "b"}, +) +// result: +// "1", "a", "2", "-3", "4", "5", "b" + +atoiStage := transform.MapWithError( + p.InitStage, + func(input string) (int, error) { + return strconv.Atoi(input) + }, + func(err error) { + fmt.Println(err) + }, +) +// result: +// 1, 2, -3, 4, 5 +// printed to the console: +// strconv.Atoi: parsing "a": invalid syntax +// strconv.Atoi: parsing "b": invalid syntax + +oddNumsStage := transform.Filter(atoiStage, func(input int) bool { + return input%2 != 0 +}) +// result: +// 1, -3, 5 + +multipliedByTwoStage := transform.Map(oddNumsStage, func(input int) int { + return input * 2 +}) +// result: +// 2, -6, 10 + +toMatrixStage := transform.MapWithErrorMapper( + multipliedByTwoStage, + func(input int) ([]int, error) { + if input < 0 { + return nil, fmt.Errorf("negative number %d", input) + } + + res := make([]int, input) + for i := 0; i < input; i++ { + res[i] = input * i + } + return res, nil + }, + func(err error) []int { + return []int{42} + }, +) +// result: +// [0, 2], [42], [0, 10, 20, 30, 40, 50, 60, 70, 80, 90] + +plusOneStage := transform.FlatMapWithError( + toMatrixStage, + func(input int) ([]int, error) { + if input == 0 { + return nil, fmt.Errorf("zero") + } + + return []int{input + 1}, nil + }, + func(err error) { + fmt.Println(err) + }, +) +// result: +// [3], [43], [11], [21], [31], [41], [51], [61], [71], [81], [91] +// printed to the console: +// zero +// zero + +greaterThan42Stage := transform.FlatMapWithErrorMapper( + plusOneStage, + func(input int) ([]int, error) { + if input <= 42 { + return nil, fmt.Errorf("42") + } + return []int{input}, nil + }, + func(err error) []int { + return []int{0} + }, +) +// result: +// [0], [43], [0], [0], [0], [0], [51], [61], [71], [81], [91] + +flattenedStage := transform.FlatMap(greaterThan42Stage, func(input int) int { + return input +}) +// result: +// [0, 43, 0, 0, 0, 0, 51, 61, 71, 81, 91] + +futureSum := asyncaggregate.Sum(flattenedStage) +// result: +// 398 + +result, err := futureSum.GetWithTimeout(time.Duration(10)*time.Second) +if err != nil { + fmt.Println(err) +} else { + fmt.Println(*result) +} +// printed to the console: +// 398 +``` + ## Documentation Find the full documentation [here](https://pkg.go.dev/github.com/n0rdy/pippin). @@ -300,7 +411,7 @@ This is the way to avoid blocking the execution and a way to early return from a There are two ways to do that: - by calling `Get()` method. This method will block until the value is available. It returns either the pointer to the value or an error. In Pippin the error means that the pipeline was interrupted before it could complete that's why the value is not available. -- by calling `GetWithTimeout(timeoutInMillis int)` method. This method will block until the value is available or the timeout is reached. +- by calling `GetWithTimeout(timeout time.Duration)` method. This method will block until the value is available or the timeout is reached. The recommended way to obtain the value is by calling `GetWithTimeout` method, as otherwise the execution might be blocked forever. diff --git a/types/future.go b/types/future.go index ce9dc5f..c97380f 100644 --- a/types/future.go +++ b/types/future.go @@ -16,7 +16,7 @@ import ( // There are two ways to do that: // 1. By calling Get() method. This method will block until the value is available. It returns either the pointer to the value or an error. // In pippin the error means that the pipeline was interrupted before it could complete that's why the value is not available. -// 2. By calling GetWithTimeout(timeoutInMillis int) method. This method will block until the value is available or the timeout is reached. +// 2. By calling GetWithTimeout(timeout time.Duration) method. This method will block until the value is available or the timeout is reached. // // The recommended way to obtain the value is by calling GetWithTimeout() method, as otherwise the execution might be blocked forever. // @@ -62,9 +62,9 @@ func (f *Future[T]) Get() (*T, error) { // The error means that the pipeline was interrupted before it could complete that's why the value is not available. // // This is the recommended way to obtain the value from the future. -func (f *Future[T]) GetWithTimeout(timeoutInMillis int) (*T, error) { +func (f *Future[T]) GetWithTimeout(timeout time.Duration) (*T, error) { if !f.done { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutInMillis)*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() err := f.sem.Acquire(ctx, 1) diff --git a/types/future_test.go b/types/future_test.go index c2910b6..4bd0fcf 100644 --- a/types/future_test.go +++ b/types/future_test.go @@ -1,6 +1,9 @@ package types -import "testing" +import ( + "testing" + "time" +) func TestFuture_Success_Get(t *testing.T) { future := NewFuture[int]() @@ -33,7 +36,7 @@ func TestFuture_Success_GetWithTimeout(t *testing.T) { t.Error("Future should be completed") } - result, err := future.GetWithTimeout(1000) + result, err := future.GetWithTimeout(time.Duration(1000) * time.Millisecond) if err != nil { t.Error("Expected no error, got: ", err) } @@ -48,7 +51,7 @@ func TestFuture_Failure_GetWithTimeout(t *testing.T) { t.Error("Future should not be completed") } - result, err := future.GetWithTimeout(1000) + result, err := future.GetWithTimeout(time.Duration(1000) * time.Millisecond) if err == nil { t.Error("Expected error, got: ", err) } @@ -58,7 +61,7 @@ func TestFuture_Failure_GetWithTimeout(t *testing.T) { future.Complete(42) - result, err = future.GetWithTimeout(1000) + result, err = future.GetWithTimeout(time.Duration(1000) * time.Millisecond) if err != nil { t.Error("Expected no error, got: ", err) }