Skip to content

Commit

Permalink
harmony: Put max counter Limiter behind an interface
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 10, 2024
1 parent 2fb1a34 commit b438cc4
Show file tree
Hide file tree
Showing 33 changed files with 128 additions and 74 deletions.
3 changes: 2 additions & 1 deletion alertmanager/task_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package alertmanager
import (
"context"
"fmt"
"github.com/filecoin-project/curio/harmony/taskhelp"
"time"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (a *AlertTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Task

func (a *AlertTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(1),
Max: taskhelp.Max(1),
Name: "AlertManager",
Cost: resources.Resources{
Cpu: 1,
Expand Down
3 changes: 2 additions & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks

import (
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -275,7 +276,7 @@ func addSealingTasks(
}

if cfg.Subsystems.EnableSealSDR {
sdrMax := harmonytask.Max(cfg.Subsystems.SealSDRMaxTasks)
sdrMax := taskhelp.Max(cfg.Subsystems.SealSDRMaxTasks)

sdrTask := seal.NewSDRTask(full, db, sp, slr, sdrMax, cfg.Subsystems.SealSDRMinTasks)
keyTask := unseal.NewTaskUnsealSDR(slr, db, sdrMax, full)
Expand Down
57 changes: 21 additions & 36 deletions harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
)

// Consts (except for unit test)
Expand All @@ -24,7 +25,7 @@ type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type.
// Nil (default)/Zero or less means unrestricted.
// Counters can either be independent when created with Max, or shared between tasks with SharedMax.Make()
Max *MaxCounter
Max Limiter

// Name is the task name to be added to the task list.
Name string
Expand Down Expand Up @@ -98,37 +99,21 @@ type TaskInterface interface {
Adder(AddTaskFunc)
}

type MaxCounter struct {
// maximum number of tasks of this type that can be run
N int
type Limiter interface {
// Active returns the number of tasks of this type that are currently running
// in this limiter / limiter group.
Active() int

// current number of tasks of this type that are running (shared)
current *atomic.Int32
// ActiveThis returns the number of tasks of this type that are currently running
// in this limiter (e.g. per-task-type count).
ActiveThis() int

// current number of tasks of this type that are running (per task)
currentThis *atomic.Int32
}

func (m *MaxCounter) max() int {
return m.N
}

// note: cur can't be called on counters for which max is 0
func (m *MaxCounter) cur() int {
return int(m.current.Load())
}

func (m *MaxCounter) curThis() int {
return int(m.currentThis.Load())
}

func (m *MaxCounter) add(n int) {
m.current.Add(int32(n))
m.currentThis.Add(int32(n))
}
// AtMax returns whether this limiter permits more tasks to run.
AtMax() bool

func Max(n int) *MaxCounter {
return &MaxCounter{N: n, current: new(atomic.Int32), currentThis: new(atomic.Int32)}
// Add increments / decrements the active task counters by delta. This call
// is atomic
Add(delta int)
}

// AddTaskFunc is responsible for adding a task's details "extra info" to the DB.
Expand Down Expand Up @@ -195,7 +180,7 @@ func New(
TaskEngine: e,
}
if h.Max == nil {
h.Max = Max(0)
h.Max = taskhelp.Max(0)
}

if Registry[h.TaskTypeDetails.Name] == nil {
Expand Down Expand Up @@ -256,31 +241,31 @@ func (e *TaskEngine) GracefullyTerminate() {
for {
timeout := time.Millisecond
for _, h := range e.handlers {
if h.TaskTypeDetails.Name == "WinPost" && h.Max.cur() > 0 {
if h.TaskTypeDetails.Name == "WinPost" && h.Max.Active() > 0 {
timeout = time.Second
log.Infof("node shutdown deferred for %f seconds", timeout.Seconds())
continue
}
if h.TaskTypeDetails.Name == "WdPost" && h.Max.cur() > 0 {
if h.TaskTypeDetails.Name == "WdPost" && h.Max.Active() > 0 {
timeout = time.Second * 3
log.Infof("node shutdown deferred for %f seconds due to running WdPost task", timeout.Seconds())
continue
}

if h.TaskTypeDetails.Name == "WdPostSubmit" && h.Max.cur() > 0 {
if h.TaskTypeDetails.Name == "WdPostSubmit" && h.Max.Active() > 0 {
timeout = time.Second
log.Infof("node shutdown deferred for %f seconds due to running WdPostSubmit task", timeout.Seconds())
continue
}

if h.TaskTypeDetails.Name == "WdPostRecover" && h.Max.cur() > 0 {
if h.TaskTypeDetails.Name == "WdPostRecover" && h.Max.Active() > 0 {
timeout = time.Second
log.Infof("node shutdown deferred for %f seconds due to running WdPostRecover task", timeout.Seconds())
continue
}

// Test tasks for itest
if h.TaskTypeDetails.Name == "ThingOne" && h.Max.cur() > 0 {
if h.TaskTypeDetails.Name == "ThingOne" && h.Max.Active() > 0 {
timeout = time.Second
log.Infof("node shutdown deferred for %f seconds due to running itest task", timeout.Seconds())
continue
Expand Down Expand Up @@ -435,7 +420,7 @@ func (e *TaskEngine) pollerTryAllWork() bool {
func (e *TaskEngine) ResourcesAvailable() resources.Resources {
tmp := e.reg.Resources
for _, t := range e.handlers {
ct := t.Max.curThis()
ct := t.Max.ActiveThis()
tmp.Cpu -= ct * t.Cost.Cpu
tmp.Gpu -= float64(ct) * t.Cost.Gpu
tmp.Ram -= uint64(ct) * t.Cost.Ram
Expand Down
12 changes: 6 additions & 6 deletions harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ top:
// 1. Can we do any more of this task type?
// NOTE: 0 is the default value, so this way people don't need to worry about
// this setting unless they want to limit the number of tasks of this type.
if h.Max.max() > 0 && h.Max.cur() >= h.Max.max() {
if h.Max.AtMax() {
log.Debugw("did not accept task", "name", h.Name, "reason", "at max already")
return false
}
Expand Down Expand Up @@ -172,10 +172,10 @@ canAcceptAgain:
tag.Upsert(sourceTag, from),
}, TaskMeasures.TasksStarted.M(1))

h.Max.add(1)
h.Max.Add(1)
_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Max.cur())))
}, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis())))

go func() {
log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name)
Expand All @@ -200,7 +200,7 @@ canAcceptAgain:
"while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r,
" Stack: ", string(stackSlice[:sz]))
}
h.Max.add(-1)
h.Max.Add(-1)

releaseStorage()
h.recordCompletion(*tID, sectorID, workStart, done, doErr)
Expand Down Expand Up @@ -240,7 +240,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, sectorID *abi.SectorID, w

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{
tag.Upsert(taskNameTag, h.Name),
}, TaskMeasures.ActiveTasks.M(int64(h.Max.cur())))
}, TaskMeasures.ActiveTasks.M(int64(h.Max.ActiveThis())))

duration := workEnd.Sub(workStart).Seconds()
TaskMeasures.TaskDuration.Observe(duration)
Expand Down Expand Up @@ -338,7 +338,7 @@ VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, tID, h.Name, postedTime.U
func (h *taskTypeHandler) AssertMachineHasCapacity() error {
r := h.TaskEngine.ResourcesAvailable()

if h.Max.max() > 0 && h.Max.cur() >= h.Max.max() {
if h.Max.AtMax() {
return errors.New("Did not accept " + h.Name + " task: at max already")
}

Expand Down
42 changes: 42 additions & 0 deletions harmony/taskhelp/max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package taskhelp

import (
"sync/atomic"
)

type MaxCounter struct {
// maximum number of tasks of this type that can be run
N int

// current number of tasks of this type that are running (shared)
current *atomic.Int32

// current number of tasks of this type that are running (per task)
currentThis *atomic.Int32
}

func (m *MaxCounter) AtMax() bool {
return m.Max() > 0 && m.Active() >= m.Max()
}

func (m *MaxCounter) Max() int {
return m.N
}

// note: cur can't be called on counters for which max is 0
func (m *MaxCounter) Active() int {
return int(m.current.Load())
}

func (m *MaxCounter) ActiveThis() int {
return int(m.currentThis.Load())
}

func (m *MaxCounter) Add(n int) {
m.current.Add(int32(n))
m.currentThis.Add(int32(n))
}

func Max(n int) *MaxCounter {
return &MaxCounter{N: n, current: new(atomic.Int32), currentThis: new(atomic.Int32)}
}
3 changes: 2 additions & 1 deletion tasks/gc/pipeline_meta_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gc

import (
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"time"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (s *PipelineGC) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Tas

func (s *PipelineGC) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(1),
Max: taskhelp.Max(1),
Name: "PipelineGC",
Cost: resources.Resources{
Cpu: 1,
Expand Down
3 changes: 2 additions & 1 deletion tasks/gc/storage_endpoint_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gc

import (
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -270,7 +271,7 @@ func (s *StorageEndpointGC) CanAccept(ids []harmonytask.TaskID, engine *harmonyt

func (s *StorageEndpointGC) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(1),
Max: taskhelp.Max(1),
Name: "StorageMetaGC",
Cost: resources.Resources{
Cpu: 1,
Expand Down
3 changes: 2 additions & 1 deletion tasks/gc/storage_gc_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gc

import (
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"time"

cbor "github.com/ipfs/go-ipld-cbor"
Expand Down Expand Up @@ -291,7 +292,7 @@ func (s *StorageGCMark) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.

func (s *StorageGCMark) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(1),
Max: taskhelp.Max(1),
Name: "StorageGCMark",
Cost: resources.Resources{
Cpu: 1,
Expand Down
3 changes: 2 additions & 1 deletion tasks/gc/storage_gc_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gc

import (
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"time"

"github.com/samber/lo"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (s *StorageGCSweep) CanAccept(ids []harmonytask.TaskID, engine *harmonytask

func (s *StorageGCSweep) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(1),
Max: taskhelp.Max(1),
Name: "StorageGCSweep",
Cost: resources.Resources{
Cpu: 1,
Expand Down
3 changes: 2 additions & 1 deletion tasks/message/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package message
import (
"bytes"
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -236,7 +237,7 @@ func (s *SendTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskE

func (s *SendTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(1024),
Max: taskhelp.Max(1024),
Name: "SendMessage",
Cost: resources.Resources{
Cpu: 0,
Expand Down
3 changes: 2 additions & 1 deletion tasks/metadata/task_sector_expirations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metadata

import (
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"time"

cbor "github.com/ipfs/go-ipld-cbor"
Expand Down Expand Up @@ -182,7 +183,7 @@ func (s *SectorMetadata) CanAccept(ids []harmonytask.TaskID, engine *harmonytask

func (s *SectorMetadata) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(1),
Max: taskhelp.Max(1),
Name: "SectorMetadata",
Cost: resources.Resources{
Cpu: 1,
Expand Down
3 changes: 2 additions & 1 deletion tasks/piece/task_cleanup_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package piece

import (
"context"
"github.com/filecoin-project/curio/harmony/taskhelp"
"time"

"golang.org/x/xerrors"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (c *CleanupPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonyta

func (c *CleanupPieceTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(c.max),
Max: taskhelp.Max(c.max),
Name: "DropPiece",
Cost: resources.Resources{
Cpu: 1,
Expand Down
3 changes: 2 additions & 1 deletion tasks/piece/task_park_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package piece
import (
"context"
"encoding/json"
"github.com/filecoin-project/curio/harmony/taskhelp"
"strconv"
"time"

Expand Down Expand Up @@ -193,7 +194,7 @@ func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails {
const maxSizePiece = 64 << 30

return harmonytask.TaskTypeDetails{
Max: harmonytask.Max(p.max),
Max: taskhelp.Max(p.max),
Name: "ParkPiece",
Cost: resources.Resources{
Cpu: 1,
Expand Down
Loading

0 comments on commit b438cc4

Please sign in to comment.