Skip to content

Commit

Permalink
[standard]provide worker for channel
Browse files Browse the repository at this point in the history
  • Loading branch information
vistart committed Jun 13, 2024
1 parent 13215db commit 254e702
Show file tree
Hide file tree
Showing 4 changed files with 1,294 additions and 0 deletions.
163 changes: 163 additions & 0 deletions workflow/standard/worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# worker

`worker` is a package providing a robust and efficient worker pool implementation.
It supports dynamic resizing of the pool, task submission, worker metrics tracking, and graceful shutdown.

## Features

- Dynamic resizing of the worker pool
- Task submission with results
- Worker-specific operations (stop, exit, rename)
- Metrics tracking and retrieval
- Graceful shutdown of the pool

## Installation

To install the package, run:

```shell
go get github.com/rhosocial/go-dag/workflow/standard/worker
```

## Usage

### Creating a Worker Pool

To create a new worker pool, use the `NewPool` function with optional configuration options:

```go
package main

import (
"github.com/rhosocial/go-dag/workflow/standard/worker"
)

func main() {
pool := worker.NewPool(
worker.WithMaxWorkers(5),
worker.WithWorkerMetrics(worker.NewWorkerMetrics()),
)
defer pool.Shutdown()
}

```

> Note that while `WithMaxWorkers` is optional, it must be provided at least once to ensure the pool has workers.
> If not provided, the pool will be created with zero workers.
> `WithWorkerMetrics` is also optional. If not provided, metrics will not be recorded.
### Defining a Task

Tasks need to implement the `Task` interface:

```go
package main

import "context"

type MyTask struct{}

func (t *MyTask) Execute(ctx context.Context, args ...interface{}) (interface{}, error) {
// Task execution logic
return nil, nil
}
```

### Submitting a Task

Submit a task to the pool using the `Submit` method:

```go
task := &MyTask{}
resultChan := pool.Submit(context.Background(), task, "arg1", "arg2")

result := <-resultChan
if result.Err != nil {
fmt.Println("Task failed:", result.Err)
} else {
fmt.Println("Task succeeded:", result.Result)
}
```

### Resizing the Pool

Adjust the number of workers in the pool:

```go
err := pool.Resize(10)
if err != nil {
fmt.Println("Resize error:", err)
}
```

> Note that if you want to decrease the pool capacity and some of the workers being removed are busy,
> those workers will not stop immediately.
> The resizing will complete only after all such busy workers have finished their tasks.
### Listing Workers

Retrieve the list of worker IDs:

```go
workerIDs := pool.ListWorkers()
fmt.Println("Worker IDs:", workerIDs)
```

### Stopping a Worker by ID

Stop a worker by its ID without removing it from the pool:

```go
err := pool.StopWorkerByID("worker-1")
if err != nil {
fmt.Println("StopWorkerByID error:", err)
}
```

### Exiting a Worker by ID

```go
err := pool.ExitWorkerByID("worker-1", true)
if err != nil {
fmt.Println("Exit error:", err)
}
```

This method terminates and exits a worker. If `stopWorker` is true, the task's context will be cancelled;
otherwise, it will not be notified. The method immediately removes the worker, even if the task is still running.
Ensure that tasks can handle context's `Done` signal or complete within a reasonable time.

### Metrics

If you specified `WithWorkerMetrics`, you can retrieve metrics:

```go
metrics := metricsProvider.GetMetrics()
fmt.Printf("Current Capacity: %d\n", metrics.CurrentCapacity)
fmt.Printf("Working Workers: %d\n", metrics.WorkingWorkers)
fmt.Printf("Idle Workers: %d\n", metrics.IdleWorkers)
fmt.Printf("Waiting Tasks: %d\n", metrics.WaitingTasks)
fmt.Printf("Total Submitted Tasks: %d\n", metrics.TotalSubmittedTasks)
fmt.Printf("Total Completed Tasks: %d\n", metrics.TotalCompletedTasks)
fmt.Printf("Successful Tasks: %d\n", metrics.SuccessfulTasks)
fmt.Printf("Failed Tasks: %d\n", metrics.FailedTasks)
fmt.Printf("Canceled Tasks: %d\n", metrics.CanceledTasks)
```

> Note: There may be a delay in metrics updates. Immediately calling `GetMetrics` after performing operations
> on the pool may not reflect the latest state.
To reset metrics:

```go
metricsProvider.ResetAllMetrics()

// Or reset specific metrics
err := metricsProvider.ResetMetric(worker.TotalSubmittedTasks)
if err != nil {
fmt.Println("Reset metric error:", err)
}
```

> Note:
> The `Metrics` struct should be used for a single worker pool only and is not recommended for reuse across multiple pools.
107 changes: 107 additions & 0 deletions workflow/standard/worker/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) 2023 - 2024. vistart. All rights reserved.
// Use of this source code is governed by Apache-2.0 license
// that can be found in the LICENSE file.

package worker

import (
"errors"
"sync"
)

// Metrics represents the various metrics for the worker pool.
//
// Note that the instance should be used for a single worker pool only and is not recommended for reuse across multiple pools.
type Metrics struct {
CurrentCapacity int // Current number of workerWg in the pool
WorkingWorkers int // Number of workerWg currently working
IdleWorkers int // Number of idle workerWg
WaitingTasks int // Number of tasks waiting to be executed
TotalSubmittedTasks int // Total number of submitted tasks
TotalCompletedTasks int // Total number of completed tasks
SuccessfulTasks int // Number of successfully completed tasks
FailedTasks int // Number of failed tasks
CanceledTasks int // Number of canceled tasks
}

// Constants for ResetMetric method
const (
CurrentCapacity = iota
WorkingWorkers
IdleWorkers
WaitingTasks
TotalSubmittedTasks
TotalCompletedTasks
SuccessfulTasks
FailedTasks
CanceledTasks
)

// metricsUpdate represents a request to update metrics.
type metricsUpdate struct {
deltaCapacity int
deltaSubmitted int
deltaSuccessful int
deltaFailed int
deltaCanceled int
deltaWorking int
deltaWaiting int
}

// metrics implements MetricsProvider.
type metrics struct {
sync.Mutex
data Metrics
}

// GetMetrics returns the current metrics.
func (m *metrics) GetMetrics() Metrics {
m.Lock()
defer m.Unlock()

idleWorkers := m.data.CurrentCapacity - m.data.WorkingWorkers
m.data.IdleWorkers = idleWorkers

return m.data
}

// ResetAllMetrics resets all the metrics to zero.
func (m *metrics) ResetAllMetrics() {
m.Lock()
defer m.Unlock()
m.data = Metrics{}
}

// ResetMetric resets a specific metric by name.
func (m *metrics) ResetMetric(metric int) error {
m.Lock()
defer m.Unlock()
switch metric {
case CurrentCapacity:
m.data.CurrentCapacity = 0
case WorkingWorkers:
m.data.WorkingWorkers = 0
case IdleWorkers:
m.data.IdleWorkers = 0
case WaitingTasks:
m.data.WaitingTasks = 0
case TotalSubmittedTasks:
m.data.TotalSubmittedTasks = 0
case TotalCompletedTasks:
m.data.TotalCompletedTasks = 0
case SuccessfulTasks:
m.data.SuccessfulTasks = 0
case FailedTasks:
m.data.FailedTasks = 0
case CanceledTasks:
m.data.CanceledTasks = 0
default:
return errors.New("unknown metric name")
}
return nil
}

// NewWorkerMetrics instantiate a worker metrics.
func NewWorkerMetrics() MetricsProvider {
return &metrics{}
}
Loading

0 comments on commit 254e702

Please sign in to comment.