Skip to content

Commit

Permalink
Merge pull request #1 from n0rdy/fix/timeouts_as_durations
Browse files Browse the repository at this point in the history
Switched timeouts data type from int64 to time.Duration
  • Loading branch information
n0rdy authored Nov 19, 2023
2 parents 75bd0a8 + 65be3ac commit 5d87754
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 14 deletions.
6 changes: 4 additions & 2 deletions configs/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package configs

import "time"

// PipelineConfig is a struct that contains the configuration for a pipeline
//
// [PipelineConfig.ManualStart] is a boolean that indicates whether the pipeline should be started manually.
Expand All @@ -15,11 +17,11 @@ package configs
// If it is passed as 0 or less, then there is no limit.
// It is possible to change the limit for each stage individually - see [StageConfig.MaxGoroutines].
//
// [PipelineConfig.TimeoutInMillis] is an integer that indicates the timeout (in milliseconds) for the pipeline.
// [PipelineConfig.Timeout] indicates the timeout for the pipeline.
// If it is passed as 0 or less, then there is no timeout.
type PipelineConfig struct {
ManualStart bool
MaxGoroutinesTotal int
MaxGoroutinesPerStage int
TimeoutInMillis int64
Timeout time.Duration
}
10 changes: 6 additions & 4 deletions configs/stage.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package configs

import "time"

// StageConfig is a struct that holds the configuration for a stage.
//
// [StageConfig.MaxGoroutines] is the maximum number of goroutines that can be spawned within the stage.
// If it is passed as 0 or less, then there is no limit.
// This config option can be used to change the limit for each stage that comes from the [PipelineConfig.MaxGoroutinesPerStage] option (if provided).
//
// [StageConfig.TimeoutInMillis] is the timeout (in milliseconds) for the stage.
// [StageConfig.Timeout] is the timeout for the stage.
// If it is passed as 0 or less, then there is no timeout.
//
// [StageConfig.CustomId] is a custom ID for the stage.
Expand All @@ -15,7 +17,7 @@ package configs
// The initial stage (the one that is created first) has an ID of 1.
// It is recommended to either rely on the auto-generated IDs or to provide a custom ID for each stage, otherwise the IDs might be messed up due to the (1 + the ID of the previous stage) logic mentioned above.
type StageConfig struct {
MaxGoroutines int
TimeoutInMillis int64
CustomId int64
MaxGoroutines int
Timeout time.Duration
CustomId int64
}
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func parseConfigs(confs ...configs.PipelineConfig) *parsedConfigs {
if conf.MaxGoroutinesPerStage > 0 {
stageRateLimiter = ratelimiter.NewRateLimiter(conf.MaxGoroutinesPerStage)
}
if conf.TimeoutInMillis > 0 {
timeout = time.Duration(conf.TimeoutInMillis) * time.Millisecond
if conf.Timeout > 0 {
timeout = conf.Timeout
}
}

Expand Down
6 changes: 3 additions & 3 deletions pippin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_PipelineTimeoutReached_Success
p := pipeline.FromSlice(
[]string{"1", "a", "2", "-3", "4", "5", "b"},
configs.PipelineConfig{
TimeoutInMillis: 1000,
Timeout: time.Duration(1000) * time.Millisecond,
},
)

Expand Down Expand Up @@ -590,7 +590,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageTimeoutReached_Success(t
p := pipeline.FromSlice(
[]string{"1", "a", "2", "-3", "4", "5", "b"},
configs.PipelineConfig{
TimeoutInMillis: 100000,
Timeout: time.Duration(100000) * time.Millisecond,
},
)

Expand Down Expand Up @@ -637,7 +637,7 @@ func TestFromSlice_AllPossibleTransformations_Sum_StageTimeoutReached_Success(t
return []int{42}
},
configs.StageConfig{
TimeoutInMillis: 1000,
Timeout: time.Duration(1000) * time.Millisecond,
},
)
// [0, 2], [42], [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
Expand Down
2 changes: 1 addition & 1 deletion stages/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func localTimeout(confs ...configs.StageConfig) time.Duration {
}

conf := confs[0]
return time.Duration(conf.TimeoutInMillis) * time.Millisecond
return conf.Timeout
}

func customStageId(confs ...configs.StageConfig) int64 {
Expand Down
2 changes: 1 addition & 1 deletion stages/asyncaggregate/asyncaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func localTimeout(confs ...configs.StageConfig) time.Duration {
}

conf := confs[0]
return time.Duration(conf.TimeoutInMillis) * time.Millisecond
return conf.Timeout
}

func customStageId(confs ...configs.StageConfig) int64 {
Expand Down
2 changes: 1 addition & 1 deletion stages/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func localTimeout(confs ...configs.StageConfig) time.Duration {
}

conf := confs[0]
return time.Duration(conf.TimeoutInMillis) * time.Millisecond
return conf.Timeout
}

func customStageId(confs ...configs.StageConfig) int64 {
Expand Down

0 comments on commit 5d87754

Please sign in to comment.