Skip to content

Commit

Permalink
phased task
Browse files Browse the repository at this point in the history
  • Loading branch information
nextzhou committed Mar 23, 2022
1 parent 5f11c0a commit 952571b
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 6 deletions.
76 changes: 73 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ err := wp.Wait() // 在这里等待所有任务完成,并处理错误与 panic
- [x] 收集子任务的错误与 panic,并在 `Workpool.Wait()` 函数中汇总。
- [x] 通过`Context`控制子任务生命周期,使得所有工作协程能保证在 `Workpool.Wait()` 都被即时释放。
- [x] 脱离 `Workpool` 的单个 `Task` 也可以安全地异步执行。
- [x] 分阶段任务,可交互式地从异步任务中获取阶段性结果
- [ ] 支持基于 `channel` 生产-消费者 的任务,生产者任务全部完成后自动通知消费者任务(依赖泛型)

## 设计
Expand Down Expand Up @@ -59,9 +60,10 @@ wp := New(ctx, WithChain(PanicAsErr)) // 配合 WithChain() 使用
wp.Go(PanicAsErr(task)) // 单独对某个 Task 使用
```

|TaskWrapper|功能|
|:----------|:---|
|PanicAsError|子任务 panic 会转换成错误|
|TaskWrapper| 功能 |
|:----------|:-----------------|
|PanicAsError| 子任务 panic 会转换成错误 |
|Phased| 将分阶段任务转成普通任务 |

## 单任务

Expand All @@ -85,3 +87,71 @@ if err := waitCoffee(); err == nil {
```

与在 `Workpool` 中执行 `Task` 一致,`Task` 中的所有错误或 `panic` 都会收集到 `wait()` 中抛出。同时你也可以使用 `PanicAsError` 包装需要异步执行的单任务。

## 分阶段任务

分阶段任务提供一种与异步任务交互的手段,通过一个例子我们就很容易理解:

> 我们有一个异步执行的定时更新数据任务,但在启动时第一次更新必须成功。
在没有分阶段任务时常规的解决方法时将第一次更新单独执行,剩下的部分作为一个`Task`异步执行。
```go
// construct wp、ctx ...

err := initTask(ctx)
if err != nil {
return err
}

wp.Go(func(ctx context.Context) error {
// task balabala
})
```
但这样的问题是初始化部分就无法也异步处理了(如果有多个这样的任务时是很有必要的),
而且单个任务的逻辑被拆散,不方便维护。

如果有了分阶段任务,这个问题就很好解决了:

```go
// construct wp、ctx ...

task, supervisor := Phased(func(ctx context.Context, helper PhasedTaskHelper) error {
err := taskInit(ctx)
if err != nil {
return err
}

// task initialization done, mark a milestone
helper.MarkAMilestone(taskInitOk)

// task balabala
})

wp.Go(task)

initResult, status := supervisor.WaitMilestoneOrCancel(ctx)
```

在分阶段任务中,我们可以通过调用 0 或多次 `helper.MarkAMilestone(interface{})`
来记录阶段性成果。
这有点类似于其他语言中 Generator 中的 yield 操作,
但区别在于分阶段任务在 `MarkAMileston` 之后并不会挂起,而是会继续执行。

在任务外,我们可以通过 `Parsed()` 返回的 `PhasedTaskSupervisor` 来与任务交互,
达到确认阶段性成果、或者设置阶段性成果的 Deadline 超过则取消等操作:


|函数| 功能 |
|:---|:------------------|
|WaitMilestone| 等待一个里程碑 |
|WaitMilestoneOrCancel| 等待一个里程碑,若超时了则取消任务 |

另外,通过 `WaitMilestone` 系列函数中,除了返回里程碑还会返回一个 `PhasedTaskStatus`
通过该值可以判断函数返回时的状态:

|状态|说明| 备注 |
|:---|:---|:--------------------------|
|IsOK()| 成功取到里程碑| |
|IsContextDone()|ctx done 并且未能取到里程碑| 可能与 IsTaskNotRunning() 共存 |
|IsTaskDone()|任务结束了,但并没有产生里程碑||
|IsTaskNotRunning()| ctx done 时还任务还为开始运行| 一定会与 IsContextDone() 共存 |
154 changes: 154 additions & 0 deletions phased_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package workpool

import (
"container/list"
"context"
"sync"
)

type PhasedTask func(ctx context.Context, helper PhasedTaskHelper) error

type PhasedTaskHelper struct {
mu *sync.Mutex
milestones *list.List
milestoneNotify chan struct{}
canceled bool
taskRunning bool
}

func (h *PhasedTaskHelper) getMilestoneNotify() <-chan struct{} {
h.mu.Lock()
defer h.mu.Unlock()

return h.milestoneNotify
}

func (h *PhasedTaskHelper) MarkAMilestone(milestone interface{}) {
h.mu.Lock()
defer h.mu.Unlock()

h.milestones.PushBack(milestone)
close(h.milestoneNotify)
h.milestoneNotify = make(chan struct{})
}

type PhasedTaskSupervisor struct {
handler *PhasedTaskHelper
cancel *context.CancelFunc
taskDone chan struct{}
}

type PhasedTaskStatus uint8

const (
phasedTaskStatusOK PhasedTaskStatus = iota
phasedTaskStatusContextDone
phasedTaskStatusTaskDone
phasedTaskStatusContextDoneAndTaskNotRunning
)

func (s PhasedTaskStatus) IsOK() bool {
return s == phasedTaskStatusOK
}

func (s PhasedTaskStatus) IsContextDone() bool {
return s == phasedTaskStatusContextDone || s == phasedTaskStatusContextDoneAndTaskNotRunning
}

func (s PhasedTaskStatus) IsTaskDone() bool {
return s == phasedTaskStatusTaskDone
}

func (s PhasedTaskStatus) IsTaskNotRunning() bool {
return s == phasedTaskStatusContextDoneAndTaskNotRunning
}

func (o *PhasedTaskSupervisor) WaitMilestone(ctx context.Context) (interface{}, PhasedTaskStatus) {
return o.waitMs(ctx, false)
}

func (o *PhasedTaskSupervisor) WaitMilestoneOrCancel(ctx context.Context) (interface{}, PhasedTaskStatus) {
return o.waitMs(ctx, true)
}

func (o *PhasedTaskSupervisor) waitMs(ctx context.Context, cancelOnCtxDone bool) (interface{}, PhasedTaskStatus) {
for {
if elem := o.handler.popFrontMilestone(); elem != nil {
return elem.Value, phasedTaskStatusOK
}

select {
case <-ctx.Done():
if elem := o.handler.popFrontMilestone(); elem != nil {
return elem.Value, phasedTaskStatusOK
}
return o.onCtxDone(cancelOnCtxDone)
case <-o.taskDone:
if elem := o.handler.popFrontMilestone(); elem != nil {
return elem.Value, phasedTaskStatusOK
}
return nil, phasedTaskStatusTaskDone
case <-o.handler.getMilestoneNotify():
continue
}
}
}

func (o *PhasedTaskSupervisor) onCtxDone(cancelOnCtxDone bool) (interface{}, PhasedTaskStatus) {
o.handler.mu.Lock()
running := o.handler.taskRunning
if cancelOnCtxDone {
o.handler.canceled = true
if *o.cancel != nil {
(*o.cancel)()
}
}
o.handler.mu.Unlock()
if running {
return nil, phasedTaskStatusContextDone
}
return nil, phasedTaskStatusContextDoneAndTaskNotRunning
}

func (h *PhasedTaskHelper) popFrontMilestone() *list.Element {
h.mu.Lock()
defer h.mu.Unlock()

front := h.milestones.Front()
if front != nil {
h.milestones.Remove(front)
}

return front
}

func Phased(task PhasedTask) (Task, PhasedTaskSupervisor) {
handler := PhasedTaskHelper{
mu: new(sync.Mutex),
milestones: list.New(),
milestoneNotify: make(chan struct{}),
}
supervisor := PhasedTaskSupervisor{
handler: &handler,
taskDone: make(chan struct{}),
cancel: new(context.CancelFunc),
}

commonTask := func(ctx context.Context) error {
defer func() { close(supervisor.taskDone) }()

handler.mu.Lock()
if handler.canceled {
handler.mu.Unlock()
return ErrSkipPendingTask{SKippingTaskCount: 1}
}
newCtx, cancel := context.WithCancel(ctx)
*supervisor.cancel = cancel
handler.taskRunning = true
handler.mu.Unlock()

return task(newCtx, handler)
}

return commonTask, supervisor
}
2 changes: 1 addition & 1 deletion task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (t Task) Go(ctx context.Context) TaskWait {
}
}()

//nolint: errorlint
//nolint: errorlint,forcetypeassert
return func() error {
err := <-errC
if err == nil {
Expand Down
Loading

0 comments on commit 952571b

Please sign in to comment.