Skip to content

Commit

Permalink
feat: add AllSeedPeersScope for preheating (#3698)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Dec 11, 2024
1 parent 79a845e commit 62b6c37
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 14 deletions.
2 changes: 1 addition & 1 deletion manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP

func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
if json.Args.Scope == "" {
json.Args.Scope = types.SinglePeerScope
json.Args.Scope = types.SingleSeedPeerScope
}

if json.Args.ConcurrentCount == 0 {
Expand Down
11 changes: 7 additions & 4 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package types
import "time"

const (
// SinglePeerScope represents the scope that only single peer will be preheated.
SinglePeerScope = "single_peer"
// SingleSeedPeerScope represents the scope that only single seed peer will be preheated.
SingleSeedPeerScope = "single_seed_peer"

// AllSeedPeersScope represents the scope that all seed peers will be preheated.
AllSeedPeersScope = "all_seed_peers"

// AllPeersScope represents the scope that all peers will be preheated.
AllPeersScope = "all_peers"
Expand Down Expand Up @@ -126,8 +129,8 @@ type PreheatArgs struct {
// The image type preheating task can specify the image architecture type. eg: linux/amd64.
Platform string `json:"platform" binding:"omitempty"`

// Scope is the scope for preheating, default is single_peer.
Scope string `json:"scope" binding:"omitempty,oneof=single_peer all_peers"`
// Scope is the scope for preheating, default is single_seed_peer.
Scope string `json:"scope" binding:"omitempty"`

// BatchSize is the batch size for preheating all peers, default is 50.
ConcurrentCount int64 `json:"concurrent_count" binding:"omitempty,gte=1,lte=500"`
Expand Down
179 changes: 170 additions & 9 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,22 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
defer cancel()

switch req.Scope {
case managertypes.SinglePeerScope:
log.Info("preheat single peer")
case managertypes.SingleSeedPeerScope:
log.Info("preheat single seed peer")
resp, err := j.preheatSinglePeer(ctx, taskID, req, log)
if err != nil {
return "", err
}

resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID
return internaljob.MarshalResponse(resp)
case managertypes.AllSeedPeersScope:
log.Info("preheat all seed peers")
resp, err := j.preheatAllSeedPeers(ctx, taskID, req, log)
if err != nil {
return "", err
}

resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID
return internaljob.MarshalResponse(resp)
case managertypes.AllPeersScope:
Expand Down Expand Up @@ -237,26 +246,178 @@ func (j *job) preheatSinglePeer(ctx context.Context, taskID string, req *interna
return resp, nil
}

// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
// If all the peer download task failed, return error. If some of the peer download task failed, return success tasks and failure tasks.
// preheatAllSeedPeers preheats job by all peer seed peers, only suoported by v2 protocol. Scheduler will trigger all seed peers to download task.
// If all the seed peers download task failed, return error. If some of the seed peers download task failed, return success tasks and failure tasks.
// Notify the client that the preheat is successful.
func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
// If seed peer is disabled, return error.
if !j.config.SeedPeer.Enable {
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}

// If scheduler has no available seed peer, return error.
seedPeers := j.resource.SeedPeer().Client().SeedPeers()
if len(seedPeers) == 0 {
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}

var (
successTasks = sync.Map{}
failureTasks = sync.Map{}
)

eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(int(req.ConcurrentCount))
for _, seedPeer := range seedPeers {
var (
hostname = seedPeer.Hostname
ip = seedPeer.Ip
port = seedPeer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(idgen.HostIDV2(ip, hostname, true), hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...)
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})

return err
}

stream, err := dfdaemonClient.DownloadTask(
ctx,
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
FilteredQueryParams: strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator),
RequestHeader: req.Headers,
Timeout: durationpb.New(req.Timeout),
CertificateChain: req.CertificateChain,
}})
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})

return err
}

// Wait for the download task to complete.
for {
_, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Info("preheat succeeded")
successTasks.Store(ip, &internaljob.PreheatSuccessTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
})

return nil
}

log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})

return err
}
}
})
}

// Wait for all tasks to complete and print the errors.
if err := eg.Wait(); err != nil {
log.Errorf("preheat failed: %s", err.Error())
}

// If successTasks is not empty, return success tasks and failure tasks.
// Notify the client that the preheat is successful.
var preheatResponse internaljob.PreheatResponse
failureTasks.Range(func(_, value any) bool {
if failureTask, ok := value.(*internaljob.PreheatFailureTask); ok {
preheatResponse.FailureTasks = append(preheatResponse.FailureTasks, failureTask)
}

return true
})

successTasks.Range(func(_, value any) bool {
if successTask, ok := value.(*internaljob.PreheatSuccessTask); ok {
for _, failureTask := range preheatResponse.FailureTasks {
if failureTask.IP == successTask.IP {
return true
}
}

preheatResponse.SuccessTasks = append(preheatResponse.SuccessTasks, successTask)
}

return true
})

if len(preheatResponse.SuccessTasks) > 0 {
return &preheatResponse, nil
}

msg := "no error message"
if len(preheatResponse.FailureTasks) > 0 {
msg = fmt.Sprintf("%s %s %s %s", taskID, preheatResponse.FailureTasks[0].IP, preheatResponse.FailureTasks[0].Hostname,
preheatResponse.FailureTasks[0].Description)
}

return nil, fmt.Errorf("all peers preheat failed: %s", msg)
}

// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
// If all the peers download task failed, return error. If some of the peers download task failed, return success tasks and
// failure tasks. Notify the client that the preheat is successful.
func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
// If scheduler has no available peer, return error.
peers := j.resource.HostManager().LoadAll()
if len(peers) == 0 {
return nil, fmt.Errorf("cluster %d scheduler %s has no available peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}

var (
successTasks = sync.Map{}
failureTasks = sync.Map{}
)

eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(int(req.ConcurrentCount))
for _, host := range j.resource.HostManager().LoadAll() {
for _, peer := range peers {
var (
hostname = host.Hostname
ip = host.IP
port = host.Port
hostname = peer.Hostname
ip = peer.IP
port = peer.Port
)

target := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(host.ID, hostname, ip)
log := logger.WithHost(peer.ID, hostname, ip)

eg.Go(func() error {
log.Info("preheat started")
Expand Down
8 changes: 8 additions & 0 deletions scheduler/resource/standard/seed_peer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type SeedPeerClient interface {
// Addrs returns the addresses of seed peers.
Addrs() []string

// SeedPeers returns the seed peers working for the scheduler.
SeedPeers() []*managerv2.SeedPeer

// Client is cdnsystem grpc client interface.
cdnsystemclient.Client

Expand Down Expand Up @@ -132,6 +135,11 @@ func (sc *seedPeerClient) Addrs() []string {
return addrs
}

// SeedPeers returns the seed peers working for the scheduler.
func (sc *seedPeerClient) SeedPeers() []*managerv2.SeedPeer {
return sc.data.Scheduler.SeedPeers
}

// Dynamic config notify function.
func (sc *seedPeerClient) OnNotify(data *config.DynconfigData) {
if reflect.DeepEqual(sc.data, data) {
Expand Down
15 changes: 15 additions & 0 deletions scheduler/resource/standard/seed_peer_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 62b6c37

Please sign in to comment.