Skip to content

Commit

Permalink
add Options.DontSkipTask()
Browse files Browse the repository at this point in the history
  • Loading branch information
nextzhou committed May 5, 2022
1 parent 3c05cf7 commit 93199ea
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 11 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ err := wp.Wait() // 在这里等待所有任务完成,并处理错误与 panic

`Option` 可在 `New()` 传入,例如 `wp := New(ctx, Options.TaskTimeout(time.Second), Options.Chain(Wraps.PanicAsErr))`

| Option | 功能 |
|:---------------------------------------|:-----------------------------------------|
| Options.TaskTimeout(time.Duration) | 为每个任务设置独立的超时 |
| Options.ParallelLimit(uint) | 子任务最大并发限制 |
| Options.ExitTogether() | 当有任意子任务完成时通知其他子任务退出,一般在启动多个常驻服务时使用 |
| Options.WrapsChain(...wpcore.TaskWrap) | 为每个`Task`添加传入的`wpcore.TaskWrap`,作用顺序从做至右 |
| Options.Recover(wpcore.Recover) | 自定义当子任务panic时如何处理 |
| Options.IgnoreSkippingPendingErr() | 跳过了部分未执行任务不视为错误 |
| Option | 功能 |
|:---------------------------------------|:-----------------------------------------------------|
| Options.TaskTimeout(time.Duration) | 为每个任务设置独立的超时 |
| Options.ParallelLimit(uint) | 子任务最大并发限制 |
| Options.ExitTogether() | 当有任意子任务完成时通知其他子任务退出,一般在启动多个常驻服务时使用 |
| Options.WrapsChain(...wpcore.TaskWrap) | 为每个`Task`添加传入的`wpcore.TaskWrap`,作用顺序从左至右 |
| Options.Recover(wpcore.Recover) | 自定义当子任务panic时如何处理 |
| Options.IgnoreSkippingPendingErr() | 跳过了部分未执行任务不视为错误 |
| Options.DontSkipTask() | 默认情况下若`ctx`结束了,则后续添加的 `Task` 会直接跳过。添加该选项后则任何情况下都不会跳过 |

### Wraps

Expand Down
4 changes: 4 additions & 0 deletions export.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ func (options) IgnoreSkippingPendingErr() wpcore.Option {
func (options) ParallelLimit(limit uint) wpcore.Option {
return wpcore.WithParallelLimit(limit)
}

func (options) DontSkipTask() wpcore.Option {
return wpcore.WithDontSkipTask()
}
19 changes: 19 additions & 0 deletions workpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,25 @@ func TestOptions_WrapsChain(t *testing.T) {
})
}

func TestOptions_DontSkipTask(t *testing.T) {
Convey("Options.DontSkipTask", t, func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()

Convey("skip padding task when ctx done", func() {
wp := New(ctx)
wp.Go(emptyTask)
So(wp.Wait(), ShouldBeError, wpcore.ErrSkipPendingTask{SKippingTaskCount: 1})
})

Convey("dont skip any task even though ctx done", func() {
wp := New(ctx, Options.DontSkipTask())
wp.Go(emptyTask)
So(wp.Wait(), ShouldBeNil)
})
})
}

func TestTask_Go(t *testing.T) {
Convey("go single task without pool", t, func() {
Convey("just go", func() {
Expand Down
8 changes: 8 additions & 0 deletions wpcore/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,11 @@ func WithParallelLimit(limit uint) Option {
},
}
}

func WithDontSkipTask() Option {
return commonOption{
f: func(w *Workpool) {
w.conf.dontSkip = true
},
}
}
7 changes: 4 additions & 3 deletions wpcore/workpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type conf struct {
recover Recover
exitTogether bool
ignoreSkipping bool
dontSkip bool
}

func New(ctx context.Context, opts ...Option) *Workpool {
Expand All @@ -49,7 +50,7 @@ func New(ctx context.Context, opts ...Option) *Workpool {
}

func (w *Workpool) Go(task Task) {
if w.ctx.Err() != nil {
if w.ctx.Err() != nil && !w.conf.dontSkip {
atomic.AddUint64(&w.skippingNum, 1)
return
}
Expand Down Expand Up @@ -130,9 +131,9 @@ func (w *Workpool) setErr(err error) {

func (w *Workpool) runTask(task Task) error {
ctx := w.ctx
if ctx.Err() != nil {
if ctx.Err() != nil && !w.conf.dontSkip {
atomic.AddUint64(&w.skippingNum, 1)
return nil //nolint:nilerr
return nil
}
if timeout := w.conf.taskTimeout; timeout > 0 {
var cancel context.CancelFunc
Expand Down

0 comments on commit 93199ea

Please sign in to comment.