Skip to content

Commit

Permalink
refactor: scheduler supervisor (#655)
Browse files Browse the repository at this point in the history
* refactor: scheduler supervisor

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Sep 24, 2021
1 parent 270d1c5 commit 5234b34
Show file tree
Hide file tree
Showing 32 changed files with 1,518 additions and 1,623 deletions.
4 changes: 2 additions & 2 deletions cdn/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
for piece := range pieceChan {
psc <- &cdnsystem.PieceSeed{
PeerId: peerID,
HostUuid: idgen.CDNUUID(iputils.HostName, int32(css.cfg.ListenPort)),
HostUuid: idgen.CDN(iputils.HostName, int32(css.cfg.ListenPort)),
PieceInfo: &base.PieceInfo{
PieceNum: piece.PieceNum,
RangeStart: piece.PieceRange.StartIndex,
Expand All @@ -177,7 +177,7 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
}
psc <- &cdnsystem.PieceSeed{
PeerId: peerID,
HostUuid: idgen.CDNUUID(iputils.HostName, int32(css.cfg.ListenPort)),
HostUuid: idgen.CDN(iputils.HostName, int32(css.cfg.ListenPort)),
Done: true,
ContentLength: task.SourceFileLength,
TotalPieceCount: task.PieceTotal,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.uber.org/atomic v1.6.0
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/oauth2 v0.0.0-20210201163806-010130855d6c
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,8 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
Expand Down
2 changes: 1 addition & 1 deletion internal/idgen/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func UUIDString() string {
return uuid.New().String()
}

func CDNUUID(cdnHostname string, serverPort int32) string {
func CDN(cdnHostname string, serverPort int32) string {
return fmt.Sprintf("%s-%d_CDN", cdnHostname, serverPort)
}
118 changes: 43 additions & 75 deletions pkg/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package gc

import (
"context"
"errors"
"sync"
"time"
Expand All @@ -28,7 +27,7 @@ import (
// GC is the interface used for release resource
type GC interface {
// Add adds GC task
Add(string, Task)
Add(Task) error

// Run GC task
Run(string) error
Expand All @@ -45,30 +44,14 @@ type GC interface {

// GC provides task release function
type gc struct {
tasks *sync.Map
interval time.Duration
timeout time.Duration
logger Logger
done chan bool
tasks *sync.Map
logger Logger
done chan bool
}

// Option is a functional option for configuring the GC
type Option func(g *gc)

// WithInterval set the interval for GC collection
func WithInterval(interval time.Duration) Option {
return func(g *gc) {
g.interval = interval
}
}

// WithTimeout set the timeout for GC collection
func WithTimeout(timeout time.Duration) Option {
return func(g *gc) {
g.timeout = timeout
}
}

// WithLogger set the logger for GC
func WithLogger(logger Logger) Option {
return func(g *gc) {
Expand All @@ -77,105 +60,90 @@ func WithLogger(logger Logger) Option {
}

// New returns a new GC instence
func New(options ...Option) (GC, error) {
func New(options ...Option) GC {
g := &gc{
tasks: &sync.Map{},
done: make(chan bool),
logger: logrus.New(),
done: make(chan bool),
}

for _, opt := range options {
opt(g)
}

if err := g.validate(); err != nil {
return nil, err
}

return g, nil
return g
}

func (g gc) Add(k string, t Task) {
g.tasks.Store(k, t)
func (g gc) Add(t Task) error {
if err := t.validate(); err != nil {
return err
}

g.tasks.Store(t.ID, t)
return nil
}

func (g gc) Run(k string) error {
v, ok := g.tasks.Load(k)
func (g gc) Run(id string) error {
v, ok := g.tasks.Load(id)
if !ok {
return errors.New("can not find the task")
}

go g.run(context.Background(), k, v.(Task))
go g.run(v.(Task))
return nil
}

func (g gc) RunAll() {
g.runAll(context.Background())
g.runAll()
}

func (g gc) Serve() {
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tick := time.NewTicker(g.interval)
for {
select {
case <-tick.C:
g.runAll(ctx)
case <-g.done:
g.logger.Infof("GC stop")
return
g.tasks.Range(func(k interface{}, v interface{}) bool {
go func() {
task := v.(Task)
tick := time.NewTicker(task.Interval)
for {
select {
case <-tick.C:
g.run(task)
case <-g.done:
g.logger.Infof("%s GC stop", k)
return
}
}
}
}()
}()
return true
})
}

func (g gc) Stop() {
close(g.done)
}

func (g gc) validate() error {
if g.interval <= 0 {
return errors.New("interval value is greater than 0")
}

if g.timeout <= 0 {
return errors.New("timeout value is greater than 0")
}

if g.timeout >= g.interval {
return errors.New("timeout value needs to be less than the interval value")
}

return nil
}

func (g gc) runAll(ctx context.Context) {
func (g gc) runAll() {
g.tasks.Range(func(k, v interface{}) bool {
go g.run(ctx, k.(string), v.(Task))
go g.run(v.(Task))
return true
})
}

func (g gc) run(ctx context.Context, k string, t Task) {
func (g gc) run(t Task) {
done := make(chan struct{})

go func() {
g.logger.Infof("%s GC %s", k, "start")
g.logger.Infof("%s GC start", t.ID)
defer close(done)
if err := t.RunGC(); err != nil {
g.logger.Errorf("%s GC error: %v", k, err)

if err := t.Runner.RunGC(); err != nil {
g.logger.Errorf("%s GC error: %v", t.ID, err)
return
}
}()

select {
case <-time.After(g.timeout):
g.logger.Infof("%s GC %s", k, "timeout")
case <-time.After(t.Timeout):
g.logger.Infof("%s GC timeout", t.ID)
case <-done:
g.logger.Infof("%s GC %s", k, "done")
case <-ctx.Done():
g.logger.Infof("%s GC %s", k, "stop")
g.logger.Infof("%s GC done", t.ID)
}
}
Loading

0 comments on commit 5234b34

Please sign in to comment.