From 62b6c3709d04d5ba6eb52f533574f145fb9f184f Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 11 Dec 2024 11:51:51 +0800 Subject: [PATCH] feat: add AllSeedPeersScope for preheating (#3698) Signed-off-by: Gaius --- manager/service/job.go | 2 +- manager/types/job.go | 11 +- scheduler/job/job.go | 179 +++++++++++++++++- .../resource/standard/seed_peer_client.go | 8 + .../standard/seed_peer_client_mock.go | 15 ++ 5 files changed, 201 insertions(+), 14 deletions(-) diff --git a/manager/service/job.go b/manager/service/job.go index 8677b661cf0..64f068f8e07 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -45,7 +45,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) { if json.Args.Scope == "" { - json.Args.Scope = types.SinglePeerScope + json.Args.Scope = types.SingleSeedPeerScope } if json.Args.ConcurrentCount == 0 { diff --git a/manager/types/job.go b/manager/types/job.go index 58160664aaf..f9421196feb 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -19,8 +19,11 @@ package types import "time" const ( - // SinglePeerScope represents the scope that only single peer will be preheated. - SinglePeerScope = "single_peer" + // SingleSeedPeerScope represents the scope that only single seed peer will be preheated. + SingleSeedPeerScope = "single_seed_peer" + + // AllSeedPeersScope represents the scope that all seed peers will be preheated. + AllSeedPeersScope = "all_seed_peers" // AllPeersScope represents the scope that all peers will be preheated. AllPeersScope = "all_peers" @@ -126,8 +129,8 @@ type PreheatArgs struct { // The image type preheating task can specify the image architecture type. eg: linux/amd64. Platform string `json:"platform" binding:"omitempty"` - // Scope is the scope for preheating, default is single_peer. - Scope string `json:"scope" binding:"omitempty,oneof=single_peer all_peers"` + // Scope is the scope for preheating, default is single_seed_peer. + Scope string `json:"scope" binding:"omitempty"` // BatchSize is the batch size for preheating all peers, default is 50. ConcurrentCount int64 `json:"concurrent_count" binding:"omitempty,gte=1,lte=500"` diff --git a/scheduler/job/job.go b/scheduler/job/job.go index a2392643d1f..c748515ac9b 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -178,13 +178,22 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) { defer cancel() switch req.Scope { - case managertypes.SinglePeerScope: - log.Info("preheat single peer") + case managertypes.SingleSeedPeerScope: + log.Info("preheat single seed peer") resp, err := j.preheatSinglePeer(ctx, taskID, req, log) if err != nil { return "", err } + resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID + return internaljob.MarshalResponse(resp) + case managertypes.AllSeedPeersScope: + log.Info("preheat all seed peers") + resp, err := j.preheatAllSeedPeers(ctx, taskID, req, log) + if err != nil { + return "", err + } + resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID return internaljob.MarshalResponse(resp) case managertypes.AllPeersScope: @@ -237,10 +246,162 @@ func (j *job) preheatSinglePeer(ctx context.Context, taskID string, req *interna return resp, nil } -// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task. -// If all the peer download task failed, return error. If some of the peer download task failed, return success tasks and failure tasks. +// preheatAllSeedPeers preheats job by all peer seed peers, only suoported by v2 protocol. Scheduler will trigger all seed peers to download task. +// If all the seed peers download task failed, return error. If some of the seed peers download task failed, return success tasks and failure tasks. // Notify the client that the preheat is successful. +func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) { + // If seed peer is disabled, return error. + if !j.config.SeedPeer.Enable { + return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP) + } + + // If scheduler has no available seed peer, return error. + seedPeers := j.resource.SeedPeer().Client().SeedPeers() + if len(seedPeers) == 0 { + return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP) + } + + var ( + successTasks = sync.Map{} + failureTasks = sync.Map{} + ) + + eg, _ := errgroup.WithContext(ctx) + eg.SetLimit(int(req.ConcurrentCount)) + for _, seedPeer := range seedPeers { + var ( + hostname = seedPeer.Hostname + ip = seedPeer.Ip + port = seedPeer.Port + ) + + target := fmt.Sprintf("%s:%d", ip, port) + log := logger.WithHost(idgen.HostIDV2(ip, hostname, true), hostname, ip) + + eg.Go(func() error { + log.Info("preheat started") + dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...) + if err != nil { + log.Errorf("preheat failed: %s", err.Error()) + failureTasks.Store(ip, &internaljob.PreheatFailureTask{ + URL: req.URL, + Hostname: hostname, + IP: ip, + Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()), + }) + + return err + } + + stream, err := dfdaemonClient.DownloadTask( + ctx, + taskID, + &dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{ + Url: req.URL, + Type: commonv2.TaskType_STANDARD, + Tag: &req.Tag, + Application: &req.Application, + Priority: commonv2.Priority(req.Priority), + FilteredQueryParams: strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator), + RequestHeader: req.Headers, + Timeout: durationpb.New(req.Timeout), + CertificateChain: req.CertificateChain, + }}) + if err != nil { + log.Errorf("preheat failed: %s", err.Error()) + failureTasks.Store(ip, &internaljob.PreheatFailureTask{ + URL: req.URL, + Hostname: hostname, + IP: ip, + Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()), + }) + + return err + } + + // Wait for the download task to complete. + for { + _, err := stream.Recv() + if err != nil { + if err == io.EOF { + log.Info("preheat succeeded") + successTasks.Store(ip, &internaljob.PreheatSuccessTask{ + URL: req.URL, + Hostname: hostname, + IP: ip, + }) + + return nil + } + + log.Errorf("preheat failed: %s", err.Error()) + failureTasks.Store(ip, &internaljob.PreheatFailureTask{ + URL: req.URL, + Hostname: hostname, + IP: ip, + Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()), + }) + + return err + } + } + }) + } + + // Wait for all tasks to complete and print the errors. + if err := eg.Wait(); err != nil { + log.Errorf("preheat failed: %s", err.Error()) + } + + // If successTasks is not empty, return success tasks and failure tasks. + // Notify the client that the preheat is successful. + var preheatResponse internaljob.PreheatResponse + failureTasks.Range(func(_, value any) bool { + if failureTask, ok := value.(*internaljob.PreheatFailureTask); ok { + preheatResponse.FailureTasks = append(preheatResponse.FailureTasks, failureTask) + } + + return true + }) + + successTasks.Range(func(_, value any) bool { + if successTask, ok := value.(*internaljob.PreheatSuccessTask); ok { + for _, failureTask := range preheatResponse.FailureTasks { + if failureTask.IP == successTask.IP { + return true + } + } + + preheatResponse.SuccessTasks = append(preheatResponse.SuccessTasks, successTask) + } + + return true + }) + + if len(preheatResponse.SuccessTasks) > 0 { + return &preheatResponse, nil + } + + msg := "no error message" + if len(preheatResponse.FailureTasks) > 0 { + msg = fmt.Sprintf("%s %s %s %s", taskID, preheatResponse.FailureTasks[0].IP, preheatResponse.FailureTasks[0].Hostname, + preheatResponse.FailureTasks[0].Description) + } + + return nil, fmt.Errorf("all peers preheat failed: %s", msg) +} + +// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task. +// If all the peers download task failed, return error. If some of the peers download task failed, return success tasks and +// failure tasks. Notify the client that the preheat is successful. func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) { + // If scheduler has no available peer, return error. + peers := j.resource.HostManager().LoadAll() + if len(peers) == 0 { + return nil, fmt.Errorf("cluster %d scheduler %s has no available peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP) + } + var ( successTasks = sync.Map{} failureTasks = sync.Map{} @@ -248,15 +409,15 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj eg, _ := errgroup.WithContext(ctx) eg.SetLimit(int(req.ConcurrentCount)) - for _, host := range j.resource.HostManager().LoadAll() { + for _, peer := range peers { var ( - hostname = host.Hostname - ip = host.IP - port = host.Port + hostname = peer.Hostname + ip = peer.IP + port = peer.Port ) target := fmt.Sprintf("%s:%d", ip, port) - log := logger.WithHost(host.ID, hostname, ip) + log := logger.WithHost(peer.ID, hostname, ip) eg.Go(func() error { log.Info("preheat started") diff --git a/scheduler/resource/standard/seed_peer_client.go b/scheduler/resource/standard/seed_peer_client.go index ee5d662d234..027dd98a4ed 100644 --- a/scheduler/resource/standard/seed_peer_client.go +++ b/scheduler/resource/standard/seed_peer_client.go @@ -42,6 +42,9 @@ type SeedPeerClient interface { // Addrs returns the addresses of seed peers. Addrs() []string + // SeedPeers returns the seed peers working for the scheduler. + SeedPeers() []*managerv2.SeedPeer + // Client is cdnsystem grpc client interface. cdnsystemclient.Client @@ -132,6 +135,11 @@ func (sc *seedPeerClient) Addrs() []string { return addrs } +// SeedPeers returns the seed peers working for the scheduler. +func (sc *seedPeerClient) SeedPeers() []*managerv2.SeedPeer { + return sc.data.Scheduler.SeedPeers +} + // Dynamic config notify function. func (sc *seedPeerClient) OnNotify(data *config.DynconfigData) { if reflect.DeepEqual(sc.data, data) { diff --git a/scheduler/resource/standard/seed_peer_client_mock.go b/scheduler/resource/standard/seed_peer_client_mock.go index 3eb7a6f4509..2922e25e13f 100644 --- a/scheduler/resource/standard/seed_peer_client_mock.go +++ b/scheduler/resource/standard/seed_peer_client_mock.go @@ -17,6 +17,7 @@ import ( common "d7y.io/api/v2/pkg/apis/common/v1" common0 "d7y.io/api/v2/pkg/apis/common/v2" dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2" + manager "d7y.io/api/v2/pkg/apis/manager/v2" config "d7y.io/dragonfly/v2/scheduler/config" gomock "go.uber.org/mock/gomock" grpc "google.golang.org/grpc" @@ -224,6 +225,20 @@ func (mr *MockSeedPeerClientMockRecorder) OnNotify(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNotify", reflect.TypeOf((*MockSeedPeerClient)(nil).OnNotify), arg0) } +// SeedPeers mocks base method. +func (m *MockSeedPeerClient) SeedPeers() []*manager.SeedPeer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SeedPeers") + ret0, _ := ret[0].([]*manager.SeedPeer) + return ret0 +} + +// SeedPeers indicates an expected call of SeedPeers. +func (mr *MockSeedPeerClientMockRecorder) SeedPeers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeedPeers", reflect.TypeOf((*MockSeedPeerClient)(nil).SeedPeers)) +} + // StatPersistentCacheTask mocks base method. func (m *MockSeedPeerClient) StatPersistentCacheTask(arg0 context.Context, arg1 *dfdaemon.StatPersistentCacheTaskRequest, arg2 ...grpc.CallOption) (*common0.PersistentCacheTask, error) { m.ctrl.T.Helper()