From cad60c82dd70bad56c28450b0eaf06bb05a39d5c Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 22 Jan 2025 18:30:11 +0800 Subject: [PATCH] feat: replicate persistent cache task when task needs persistent replicas Signed-off-by: Gaius --- .../resource/persistentcache/host_manager.go | 198 ++++++++++++------ .../persistentcache/host_manager_mock.go | 16 ++ .../resource/persistentcache/peer_manager.go | 28 ++- .../persistentcache/peer_manager_mock.go | 15 ++ scheduler/resource/persistentcache/task.go | 10 +- .../resource/persistentcache/task_manager.go | 24 ++- .../persistentcache/task_manager_mock.go | 8 +- scheduler/scheduling/evaluator/evaluator.go | 4 +- .../scheduling/evaluator/evaluator_base.go | 10 +- .../evaluator/evaluator_base_test.go | 6 +- scheduler/scheduling/evaluator/plugin_test.go | 6 +- .../scheduling/evaluator/testdata/main.go | 4 +- .../evaluator/testdata/plugin/evaluator.go | 4 +- scheduler/scheduling/mocks/scheduling_mock.go | 14 +- scheduler/scheduling/scheduling.go | 166 ++++++++++++++- scheduler/service/service_v2.go | 140 ++++++++++--- 16 files changed, 503 insertions(+), 150 deletions(-) diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go index 8ea2d449721..740eeba24e2 100644 --- a/scheduler/resource/persistentcache/host_manager.go +++ b/scheduler/resource/persistentcache/host_manager.go @@ -20,12 +20,14 @@ package persistentcache import ( "context" + "math/rand" "strconv" "time" redis "github.com/redis/go-redis/v9" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/set" pkggc "d7y.io/dragonfly/v2/pkg/gc" pkgredis "d7y.io/dragonfly/v2/pkg/redis" pkgtypes "d7y.io/dragonfly/v2/pkg/types" @@ -51,6 +53,9 @@ type HostManager interface { // LoadAll returns all hosts. LoadAll(context.Context) ([]*Host, error) + // LoadRandom loads host randomly through the set of redis. + LoadRandom(context.Context, int, set.SafeSet[string]) ([]*Host, error) + // RunGC runs garbage collection. RunGC() error } @@ -446,74 +451,102 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { // Store sets host. func (h *hostManager) Store(ctx context.Context, host *Host) error { - _, err := h.rdb.HSet(ctx, - pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID), - "id", host.ID, - "type", host.Type.Name(), - "hostname", host.Hostname, - "ip", host.IP, - "port", host.Port, - "download_port", host.DownloadPort, - "disable_shared", host.DisableShared, - "os", host.OS, - "platform", host.Platform, - "platform_family", host.PlatformFamily, - "platform_version", host.PlatformVersion, - "kernel_version", host.KernelVersion, - "cpu_logical_count", host.CPU.LogicalCount, - "cpu_physical_count", host.CPU.PhysicalCount, - "cpu_percent", host.CPU.Percent, - "cpu_processe_percent", host.CPU.ProcessPercent, - "cpu_times_user", host.CPU.Times.User, - "cpu_times_system", host.CPU.Times.System, - "cpu_times_idle", host.CPU.Times.Idle, - "cpu_times_nice", host.CPU.Times.Nice, - "cpu_times_iowait", host.CPU.Times.Iowait, - "cpu_times_irq", host.CPU.Times.Irq, - "cpu_times_softirq", host.CPU.Times.Softirq, - "cpu_times_steal", host.CPU.Times.Steal, - "cpu_times_guest", host.CPU.Times.Guest, - "cpu_times_guest_nice", host.CPU.Times.GuestNice, - "memory_total", host.Memory.Total, - "memory_available", host.Memory.Available, - "memory_used", host.Memory.Used, - "memory_used_percent", host.Memory.UsedPercent, - "memory_processe_used_percent", host.Memory.ProcessUsedPercent, - "memory_free", host.Memory.Free, - "network_tcp_connection_count", host.Network.TCPConnectionCount, - "network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount, - "network_location", host.Network.Location, - "network_idc", host.Network.IDC, - "network_download_rate", host.Network.DownloadRate, - "network_download_rate_limit", host.Network.DownloadRateLimit, - "network_upload_rate", host.Network.UploadRate, - "network_upload_rate_limit", host.Network.UploadRateLimit, - "disk_total", host.Disk.Total, - "disk_free", host.Disk.Free, - "disk_used", host.Disk.Used, - "disk_used_percent", host.Disk.UsedPercent, - "disk_inodes_total", host.Disk.InodesTotal, - "disk_inodes_used", host.Disk.InodesUsed, - "disk_inodes_free", host.Disk.InodesFree, - "disk_inodes_used_percent", host.Disk.InodesUsedPercent, - "disk_write_bandwidth", host.Disk.WriteBandwidth, - "disk_read_bandwidth", host.Disk.ReadBandwidth, - "build_git_version", host.Build.GitVersion, - "build_git_commit", host.Build.GitCommit, - "build_go_version", host.Build.GoVersion, - "build_platform", host.Build.Platform, - "scheduler_cluster_id", host.SchedulerClusterID, - "announce_interval", host.AnnounceInterval, - "created_at", host.CreatedAt.Format(time.RFC3339), - "updated_at", host.UpdatedAt.Format(time.RFC3339)).Result() - - return err + if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + if _, err := pipe.HSet(ctx, + pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID), + "id", host.ID, + "type", host.Type.Name(), + "hostname", host.Hostname, + "ip", host.IP, + "port", host.Port, + "download_port", host.DownloadPort, + "disable_shared", host.DisableShared, + "os", host.OS, + "platform", host.Platform, + "platform_family", host.PlatformFamily, + "platform_version", host.PlatformVersion, + "kernel_version", host.KernelVersion, + "cpu_logical_count", host.CPU.LogicalCount, + "cpu_physical_count", host.CPU.PhysicalCount, + "cpu_percent", host.CPU.Percent, + "cpu_processe_percent", host.CPU.ProcessPercent, + "cpu_times_user", host.CPU.Times.User, + "cpu_times_system", host.CPU.Times.System, + "cpu_times_idle", host.CPU.Times.Idle, + "cpu_times_nice", host.CPU.Times.Nice, + "cpu_times_iowait", host.CPU.Times.Iowait, + "cpu_times_irq", host.CPU.Times.Irq, + "cpu_times_softirq", host.CPU.Times.Softirq, + "cpu_times_steal", host.CPU.Times.Steal, + "cpu_times_guest", host.CPU.Times.Guest, + "cpu_times_guest_nice", host.CPU.Times.GuestNice, + "memory_total", host.Memory.Total, + "memory_available", host.Memory.Available, + "memory_used", host.Memory.Used, + "memory_used_percent", host.Memory.UsedPercent, + "memory_processe_used_percent", host.Memory.ProcessUsedPercent, + "memory_free", host.Memory.Free, + "network_tcp_connection_count", host.Network.TCPConnectionCount, + "network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount, + "network_location", host.Network.Location, + "network_idc", host.Network.IDC, + "network_download_rate", host.Network.DownloadRate, + "network_download_rate_limit", host.Network.DownloadRateLimit, + "network_upload_rate", host.Network.UploadRate, + "network_upload_rate_limit", host.Network.UploadRateLimit, + "disk_total", host.Disk.Total, + "disk_free", host.Disk.Free, + "disk_used", host.Disk.Used, + "disk_used_percent", host.Disk.UsedPercent, + "disk_inodes_total", host.Disk.InodesTotal, + "disk_inodes_used", host.Disk.InodesUsed, + "disk_inodes_free", host.Disk.InodesFree, + "disk_inodes_used_percent", host.Disk.InodesUsedPercent, + "disk_write_bandwidth", host.Disk.WriteBandwidth, + "disk_read_bandwidth", host.Disk.ReadBandwidth, + "build_git_version", host.Build.GitVersion, + "build_git_commit", host.Build.GitCommit, + "build_go_version", host.Build.GoVersion, + "build_platform", host.Build.Platform, + "scheduler_cluster_id", host.SchedulerClusterID, + "announce_interval", host.AnnounceInterval, + "created_at", host.CreatedAt.Format(time.RFC3339), + "updated_at", host.UpdatedAt.Format(time.RFC3339)).Result(); err != nil { + return err + } + + if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), host.ID).Result(); err != nil { + return err + } + + return nil + }); err != nil { + host.Log.Errorf("store host failed: %v", err) + return err + } + + return nil } // Delete deletes host by a key. func (h *hostManager) Delete(ctx context.Context, hostID string) error { - _, err := h.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result() - return err + log := logger.WithHostID(hostID) + if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + if _, err := pipe.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result(); err != nil { + return err + } + + if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), hostID).Result(); err != nil { + return err + } + + return nil + }); err != nil { + log.Errorf("store host failed: %v", err) + return err + } + + return nil } // LoadAll returns all hosts. @@ -529,7 +562,7 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { err error ) - hostKeys, cursor, err = h.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), 10).Result() + hostKeys, cursor, err = h.rdb.SScan(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), cursor, "*", 10).Result() if err != nil { logger.Error("scan hosts failed") return nil, err @@ -553,6 +586,41 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { return hosts, nil } +// LoadRandom loads host randomly through the set of redis. +func (h *hostManager) LoadRandom(ctx context.Context, n int, blocklist set.SafeSet[string]) ([]*Host, error) { + hostKeys, err := h.rdb.SMembers(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID)).Result() + if err != nil { + logger.Error("smembers hosts failed") + return nil, err + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + r.Shuffle(len(hostKeys), func(i, j int) { + hostKeys[i], hostKeys[j] = hostKeys[j], hostKeys[i] + }) + + hosts := make([]*Host, 0, n) + for _, hostKey := range hostKeys { + if len(hosts) >= n { + break + } + + if blocklist.Contains(hostKey) { + continue + } + + host, loaded := h.Load(ctx, hostKey) + if !loaded { + logger.WithHostID(hostKey).Error("load host failed") + continue + } + + hosts = append(hosts, host) + } + + return hosts, nil +} + // RunGC runs garbage collection. func (h *hostManager) RunGC() error { hosts, err := h.LoadAll(context.Background()) diff --git a/scheduler/resource/persistentcache/host_manager_mock.go b/scheduler/resource/persistentcache/host_manager_mock.go index ac3a755fe2f..a111d384758 100644 --- a/scheduler/resource/persistentcache/host_manager_mock.go +++ b/scheduler/resource/persistentcache/host_manager_mock.go @@ -13,6 +13,7 @@ import ( context "context" reflect "reflect" + set "d7y.io/dragonfly/v2/pkg/container/set" gomock "go.uber.org/mock/gomock" ) @@ -84,6 +85,21 @@ func (mr *MockHostManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockHostManager)(nil).LoadAll), arg0) } +// LoadRandom mocks base method. +func (m *MockHostManager) LoadRandom(arg0 context.Context, arg1 int, arg2 set.SafeSet[string]) ([]*Host, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadRandom", arg0, arg1, arg2) + ret0, _ := ret[0].([]*Host) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadRandom indicates an expected call of LoadRandom. +func (mr *MockHostManagerMockRecorder) LoadRandom(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadRandom", reflect.TypeOf((*MockHostManager)(nil).LoadRandom), arg0, arg1, arg2) +} + // RunGC mocks base method. func (m *MockHostManager) RunGC() error { m.ctrl.T.Helper() diff --git a/scheduler/resource/persistentcache/peer_manager.go b/scheduler/resource/persistentcache/peer_manager.go index d57a017da7a..b62bc181873 100644 --- a/scheduler/resource/persistentcache/peer_manager.go +++ b/scheduler/resource/persistentcache/peer_manager.go @@ -51,6 +51,9 @@ type PeerManager interface { // LoadAllByTaskID returns all peers by task id. LoadAllByTaskID(context.Context, string) ([]*Peer, error) + // LoadPersistentAllByTaskID returns all persistent peers by task id. + LoadPersistentAllByTaskID(context.Context, string) ([]*Peer, error) + // DeleteAllByTaskID deletes all peers by task id. DeleteAllByTaskID(context.Context, string) error @@ -242,7 +245,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error { func (p *peerManager) Delete(ctx context.Context, peerID string) error { log := logger.WithPeerID(peerID) if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - rawPeer, err := p.rdb.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result() + rawPeer, err := pipe.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result() if err != nil { return errors.New("getting peer failed from redis") } @@ -343,6 +346,29 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe return peers, nil } +// LoadPersistentAllByTaskID returns all persistent cache peers by task id. +func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) { + log := logger.WithTaskID(taskID) + peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result() + if err != nil { + log.Error("get peer ids failed") + return nil, err + } + + peers := make([]*Peer, 0, len(peerIDs)) + for _, peerID := range peerIDs { + peer, loaded := p.Load(ctx, peerID) + if !loaded { + log.Errorf("load peer %s failed", peerID) + continue + } + + peers = append(peers, peer) + } + + return peers, nil +} + // DeleteAllByTaskID deletes all persistent cache peers by task id. func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error { log := logger.WithTaskID(taskID) diff --git a/scheduler/resource/persistentcache/peer_manager_mock.go b/scheduler/resource/persistentcache/peer_manager_mock.go index fe3634bbb01..63e31bc4cb8 100644 --- a/scheduler/resource/persistentcache/peer_manager_mock.go +++ b/scheduler/resource/persistentcache/peer_manager_mock.go @@ -142,6 +142,21 @@ func (mr *MockPeerManagerMockRecorder) LoadAllByTaskID(arg0, arg1 any) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAllByTaskID", reflect.TypeOf((*MockPeerManager)(nil).LoadAllByTaskID), arg0, arg1) } +// LoadPersistentAllByTaskID mocks base method. +func (m *MockPeerManager) LoadPersistentAllByTaskID(arg0 context.Context, arg1 string) ([]*Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadPersistentAllByTaskID", arg0, arg1) + ret0, _ := ret[0].([]*Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadPersistentAllByTaskID indicates an expected call of LoadPersistentAllByTaskID. +func (mr *MockPeerManagerMockRecorder) LoadPersistentAllByTaskID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadPersistentAllByTaskID", reflect.TypeOf((*MockPeerManager)(nil).LoadPersistentAllByTaskID), arg0, arg1) +} + // Store mocks base method. func (m *MockPeerManager) Store(arg0 context.Context, arg1 *Peer) error { m.ctrl.T.Helper() diff --git a/scheduler/resource/persistentcache/task.go b/scheduler/resource/persistentcache/task.go index a5579d2c953..ce23b4289be 100644 --- a/scheduler/resource/persistentcache/task.go +++ b/scheduler/resource/persistentcache/task.go @@ -77,13 +77,13 @@ type Task struct { Application string // Persistet cache task piece length. - PieceLength int32 + PieceLength uint64 // ContentLength is persistent cache task total content length. - ContentLength int64 + ContentLength uint64 // TotalPieceCount is total piece count. - TotalPieceCount int32 + TotalPieceCount uint32 // Persistent cache task state machine. FSM *fsm.FSM @@ -102,8 +102,8 @@ type Task struct { } // New persistent cache task instance. -func NewTask(id, tag, application, state string, persistentReplicaCount uint64, pieceLength int32, - contentLength int64, totalPieceCount int32, ttl time.Duration, createdAt, updatedAt time.Time, +func NewTask(id, tag, application, state string, persistentReplicaCount, pieceLength, contentLength uint64, + totalPieceCount uint32, ttl time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Task { t := &Task{ ID: id, diff --git a/scheduler/resource/persistentcache/task_manager.go b/scheduler/resource/persistentcache/task_manager.go index 82f1cf45134..a5e44e03e21 100644 --- a/scheduler/resource/persistentcache/task_manager.go +++ b/scheduler/resource/persistentcache/task_manager.go @@ -36,10 +36,10 @@ type TaskManager interface { Load(context.Context, string) (*Task, bool) // LoadCorrentReplicaCount returns current replica count of the persistent cache task. - LoadCorrentReplicaCount(context.Context, string) (int64, error) + LoadCorrentReplicaCount(context.Context, string) (uint64, error) // LoadCurrentPersistentReplicaCount returns current persistent replica count of the persistent cache task. - LoadCurrentPersistentReplicaCount(context.Context, string) (int64, error) + LoadCurrentPersistentReplicaCount(context.Context, string) (uint64, error) // Store sets persistent cache task. Store(context.Context, *Task) error @@ -85,19 +85,19 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { return nil, false } - pieceLength, err := strconv.ParseInt(rawTask["piece_length"], 10, 32) + pieceLength, err := strconv.ParseUint(rawTask["piece_length"], 10, 64) if err != nil { log.Errorf("parsing piece length failed: %v", err) return nil, false } - contentLength, err := strconv.ParseInt(rawTask["content_length"], 10, 64) + contentLength, err := strconv.ParseUint(rawTask["content_length"], 10, 64) if err != nil { log.Errorf("parsing content length failed: %v", err) return nil, false } - totalPieceCount, err := strconv.ParseInt(rawTask["total_piece_count"], 10, 32) + totalPieceCount, err := strconv.ParseUint(rawTask["total_piece_count"], 10, 32) if err != nil { log.Errorf("parsing total piece count failed: %v", err) return nil, false @@ -128,9 +128,9 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { rawTask["application"], rawTask["state"], persistentReplicaCount, - int32(pieceLength), + pieceLength, contentLength, - int32(totalPieceCount), + uint32(totalPieceCount), time.Duration(ttl), createdAt, updatedAt, @@ -139,13 +139,15 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { } // LoadCorrentReplicaCount returns current replica count of the persistent cache task. -func (t *taskManager) LoadCorrentReplicaCount(ctx context.Context, taskID string) (int64, error) { - return t.rdb.SCard(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result() +func (t *taskManager) LoadCorrentReplicaCount(ctx context.Context, taskID string) (uint64, error) { + count, err := t.rdb.SCard(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result() + return uint64(count), err } // LoadCurrentPersistentReplicaCount returns current persistent replica count of the persistent cache task. -func (t *taskManager) LoadCurrentPersistentReplicaCount(ctx context.Context, taskID string) (int64, error) { - return t.rdb.SCard(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result() +func (t *taskManager) LoadCurrentPersistentReplicaCount(ctx context.Context, taskID string) (uint64, error) { + count, err := t.rdb.SCard(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result() + return uint64(count), err } // Store sets persistent cache task. diff --git a/scheduler/resource/persistentcache/task_manager_mock.go b/scheduler/resource/persistentcache/task_manager_mock.go index 6580b7288c5..48b972f7fbd 100644 --- a/scheduler/resource/persistentcache/task_manager_mock.go +++ b/scheduler/resource/persistentcache/task_manager_mock.go @@ -85,10 +85,10 @@ func (mr *MockTaskManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { } // LoadCorrentReplicaCount mocks base method. -func (m *MockTaskManager) LoadCorrentReplicaCount(arg0 context.Context, arg1 string) (int64, error) { +func (m *MockTaskManager) LoadCorrentReplicaCount(arg0 context.Context, arg1 string) (uint64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LoadCorrentReplicaCount", arg0, arg1) - ret0, _ := ret[0].(int64) + ret0, _ := ret[0].(uint64) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -100,10 +100,10 @@ func (mr *MockTaskManagerMockRecorder) LoadCorrentReplicaCount(arg0, arg1 any) * } // LoadCurrentPersistentReplicaCount mocks base method. -func (m *MockTaskManager) LoadCurrentPersistentReplicaCount(arg0 context.Context, arg1 string) (int64, error) { +func (m *MockTaskManager) LoadCurrentPersistentReplicaCount(arg0 context.Context, arg1 string) (uint64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LoadCurrentPersistentReplicaCount", arg0, arg1) - ret0, _ := ret[0].(int64) + ret0, _ := ret[0].(uint64) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/scheduler/scheduling/evaluator/evaluator.go b/scheduler/scheduling/evaluator/evaluator.go index 1707ac570bc..9507cb116ef 100644 --- a/scheduler/scheduling/evaluator/evaluator.go +++ b/scheduler/scheduling/evaluator/evaluator.go @@ -61,13 +61,13 @@ const ( // Evaluator is an interface that evaluates the parents. type Evaluator interface { // EvaluateParents sort parents by evaluating multiple feature scores. - EvaluateParents(parents []*standard.Peer, child *standard.Peer, taskPieceCount int32) []*standard.Peer + EvaluateParents(parents []*standard.Peer, child *standard.Peer, taskPieceCount uint32) []*standard.Peer // IsBadParent determine if peer is a bad parent, it can not be selected as a parent. IsBadParent(peer *standard.Peer) bool // EvaluatePersistentCacheParents sort persistent cache parents by evaluating multiple feature scores. - EvaluatePersistentCacheParents(parents []*persistentcache.Peer, child *persistentcache.Peer, taskPieceCount int32) []*persistentcache.Peer + EvaluatePersistentCacheParents(parents []*persistentcache.Peer, child *persistentcache.Peer, taskPieceCount uint32) []*persistentcache.Peer // IsBadPersistentCacheParent determine if persistent cache peer is a bad parent, it can not be selected as a parent. IsBadPersistentCacheParent(peer *persistentcache.Peer) bool diff --git a/scheduler/scheduling/evaluator/evaluator_base.go b/scheduler/scheduling/evaluator/evaluator_base.go index 9a9cc72488f..8613534aef6 100644 --- a/scheduler/scheduling/evaluator/evaluator_base.go +++ b/scheduler/scheduling/evaluator/evaluator_base.go @@ -57,7 +57,7 @@ func newEvaluatorBase() Evaluator { } // EvaluateParents sort parents by evaluating multiple feature scores. -func (e *evaluatorBase) EvaluateParents(parents []*standard.Peer, child *standard.Peer, totalPieceCount int32) []*standard.Peer { +func (e *evaluatorBase) EvaluateParents(parents []*standard.Peer, child *standard.Peer, totalPieceCount uint32) []*standard.Peer { sort.Slice( parents, func(i, j int) bool { @@ -69,7 +69,7 @@ func (e *evaluatorBase) EvaluateParents(parents []*standard.Peer, child *standar } // evaluateParents sort parents by evaluating multiple feature scores. -func (e *evaluatorBase) evaluateParents(parent *standard.Peer, child *standard.Peer, totalPieceCount int32) float64 { +func (e *evaluatorBase) evaluateParents(parent *standard.Peer, child *standard.Peer, totalPieceCount uint32) float64 { parentLocation := parent.Host.Network.Location parentIDC := parent.Host.Network.IDC childLocation := child.Host.Network.Location @@ -84,7 +84,7 @@ func (e *evaluatorBase) evaluateParents(parent *standard.Peer, child *standard.P } // EvaluatePersistentCacheParents sort persistent cache parents by evaluating multiple feature scores. -func (e *evaluatorBase) EvaluatePersistentCacheParents(parents []*persistentcache.Peer, child *persistentcache.Peer, totalPieceCount int32) []*persistentcache.Peer { +func (e *evaluatorBase) EvaluatePersistentCacheParents(parents []*persistentcache.Peer, child *persistentcache.Peer, totalPieceCount uint32) []*persistentcache.Peer { sort.Slice( parents, func(i, j int) bool { @@ -96,7 +96,7 @@ func (e *evaluatorBase) EvaluatePersistentCacheParents(parents []*persistentcach } // evaluatePersistentCacheParents sort persistent cache parents by evaluating multiple feature scores. -func (e *evaluatorBase) evaluatePersistentCacheParents(parent *persistentcache.Peer, child *persistentcache.Peer, totalPieceCount int32) float64 { +func (e *evaluatorBase) evaluatePersistentCacheParents(parent *persistentcache.Peer, child *persistentcache.Peer, totalPieceCount uint32) float64 { parentLocation := parent.Host.Network.Location parentIDC := parent.Host.Network.IDC childLocation := child.Host.Network.Location @@ -108,7 +108,7 @@ func (e *evaluatorBase) evaluatePersistentCacheParents(parent *persistentcache.P } // calculatePieceScore 0.0~unlimited larger and better. -func (e *evaluatorBase) calculatePieceScore(parentFinishedPieceCount uint, childFinishedPieceCount uint, totalPieceCount int32) float64 { +func (e *evaluatorBase) calculatePieceScore(parentFinishedPieceCount uint, childFinishedPieceCount uint, totalPieceCount uint32) float64 { // If the total piece is determined, normalize the number of // pieces downloaded by the parent node. if totalPieceCount > 0 { diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 20b12f66f30..1b6f4ed79aa 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -176,7 +176,7 @@ func TestEvaluatorBase_EvaluateParents(t *testing.T) { name string parents []*resource.Peer child *resource.Peer - totalPieceCount int32 + totalPieceCount uint32 mock func(parent []*resource.Peer, child *resource.Peer) expect func(t *testing.T, parents []*resource.Peer) }{ @@ -340,7 +340,7 @@ func TestEvaluatorBase_evaluate(t *testing.T) { name string parent *resource.Peer child *resource.Peer - totalPieceCount int32 + totalPieceCount uint32 mock func(parent *resource.Peer, child *resource.Peer) expect func(t *testing.T, score float64) }{ @@ -406,7 +406,7 @@ func TestEvaluatorBase_calculatePieceScore(t *testing.T) { name string parent *resource.Peer child *resource.Peer - totalPieceCount int32 + totalPieceCount uint32 mock func(parent *resource.Peer, child *resource.Peer) expect func(t *testing.T, score float64) }{ diff --git a/scheduler/scheduling/evaluator/plugin_test.go b/scheduler/scheduling/evaluator/plugin_test.go index 5e1b002b22d..68c38e5f0cf 100644 --- a/scheduler/scheduling/evaluator/plugin_test.go +++ b/scheduler/scheduling/evaluator/plugin_test.go @@ -44,7 +44,7 @@ func TestPlugin_Load(t *testing.T) { output, err = cmd.CombinedOutput() assert.Nil(err) if err != nil { - t.Fatalf(string(output)) + t.Fatal(string(output)) return } @@ -53,7 +53,7 @@ func TestPlugin_Load(t *testing.T) { output, err = cmd.CombinedOutput() assert.Nil(err) if err != nil { - t.Fatalf(string(output)) + t.Fatal(string(output)) return } @@ -66,7 +66,7 @@ func TestPlugin_Load(t *testing.T) { output, err = cmd.CombinedOutput() assert.Nil(err) if err != nil { - t.Fatalf(string(output)) + t.Fatal(string(output)) return } } diff --git a/scheduler/scheduling/evaluator/testdata/main.go b/scheduler/scheduling/evaluator/testdata/main.go index cde111b53fd..29beabead6b 100644 --- a/scheduler/scheduling/evaluator/testdata/main.go +++ b/scheduler/scheduling/evaluator/testdata/main.go @@ -32,7 +32,7 @@ func main() { os.Exit(1) } - candidateParents := e.EvaluateParents([]*standard.Peer{&standard.Peer{}}, &standard.Peer{}, int32(0)) + candidateParents := e.EvaluateParents([]*standard.Peer{&standard.Peer{}}, &standard.Peer{}, uint32(0)) if len(candidateParents) != 1 { fmt.Println("EvaluateParents failed") os.Exit(1) @@ -43,7 +43,7 @@ func main() { os.Exit(1) } - candidatePersistentCacheParents := e.EvaluatePersistentCacheParents([]*persistentcache.Peer{&persistentcache.Peer{}}, &persistentcache.Peer{}, int32(0)) + candidatePersistentCacheParents := e.EvaluatePersistentCacheParents([]*persistentcache.Peer{&persistentcache.Peer{}}, &persistentcache.Peer{}, uint32(0)) if len(candidatePersistentCacheParents) != 1 { fmt.Println("EvaluatePersistentCacheParents failed") os.Exit(1) diff --git a/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go b/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go index 27ec98c8bf0..8c247455039 100644 --- a/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go +++ b/scheduler/scheduling/evaluator/testdata/plugin/evaluator.go @@ -24,7 +24,7 @@ import ( type evaluator struct{} // EvaluateParents sort parents by evaluating multiple feature scores. -func (e *evaluator) EvaluateParents(parents []*standard.Peer, child *standard.Peer, taskPieceCount int32) []*standard.Peer { +func (e *evaluator) EvaluateParents(parents []*standard.Peer, child *standard.Peer, taskPieceCount uint32) []*standard.Peer { return []*standard.Peer{&standard.Peer{}} } @@ -34,7 +34,7 @@ func (e *evaluator) IsBadParent(peer *standard.Peer) bool { } // EvaluatePersistentCacheParents sort persistent cache parents by evaluating multiple feature scores. -func (e *evaluator) EvaluatePersistentCacheParents(parents []*persistentcache.Peer, child *persistentcache.Peer, taskPieceCount int32) []*persistentcache.Peer { +func (e *evaluator) EvaluatePersistentCacheParents(parents []*persistentcache.Peer, child *persistentcache.Peer, taskPieceCount uint32) []*persistentcache.Peer { return []*persistentcache.Peer{&persistentcache.Peer{}} } diff --git a/scheduler/scheduling/mocks/scheduling_mock.go b/scheduler/scheduling/mocks/scheduling_mock.go index cb6e864cb63..9d83724d1a8 100644 --- a/scheduler/scheduling/mocks/scheduling_mock.go +++ b/scheduler/scheduling/mocks/scheduling_mock.go @@ -88,19 +88,19 @@ func (mr *MockSchedulingMockRecorder) FindParentAndCandidateParents(arg0, arg1, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindParentAndCandidateParents", reflect.TypeOf((*MockScheduling)(nil).FindParentAndCandidateParents), arg0, arg1, arg2) } -// FindReplicatePersistentCacheParents mocks base method. -func (m *MockScheduling) FindReplicatePersistentCacheParents(arg0 context.Context, arg1 *persistentcache.Task, arg2 set.SafeSet[string]) ([]*persistentcache.Peer, bool) { +// FindReplicatePersistentCacheHosts mocks base method. +func (m *MockScheduling) FindReplicatePersistentCacheHosts(arg0 context.Context, arg1 *persistentcache.Task, arg2 set.SafeSet[string]) ([]*persistentcache.Host, bool) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FindReplicatePersistentCacheParents", arg0, arg1, arg2) - ret0, _ := ret[0].([]*persistentcache.Peer) + ret := m.ctrl.Call(m, "FindReplicatePersistentCacheHosts", arg0, arg1, arg2) + ret0, _ := ret[0].([]*persistentcache.Host) ret1, _ := ret[1].(bool) return ret0, ret1 } -// FindReplicatePersistentCacheParents indicates an expected call of FindReplicatePersistentCacheParents. -func (mr *MockSchedulingMockRecorder) FindReplicatePersistentCacheParents(arg0, arg1, arg2 any) *gomock.Call { +// FindReplicatePersistentCacheHosts indicates an expected call of FindReplicatePersistentCacheHosts. +func (mr *MockSchedulingMockRecorder) FindReplicatePersistentCacheHosts(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindReplicatePersistentCacheParents", reflect.TypeOf((*MockScheduling)(nil).FindReplicatePersistentCacheParents), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindReplicatePersistentCacheHosts", reflect.TypeOf((*MockScheduling)(nil).FindReplicatePersistentCacheHosts), arg0, arg1, arg2) } // FindSuccessParent mocks base method. diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 0ee61c130c0..b2cfe7b2367 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -61,8 +61,9 @@ type Scheduling interface { // FindSuccessParent finds success parent for the peer to download the task. FindSuccessParent(context.Context, *standard.Peer, set.SafeSet[string]) (*standard.Peer, bool) - // FindReplicatePersistentCacheParents finds replicate persistent cache parents for the peer to replicate the task. - FindReplicatePersistentCacheParents(context.Context, *persistentcache.Task, set.SafeSet[string]) ([]*persistentcache.Peer, bool) + // FindReplicatePersistentCacheHosts finds replicate persistent cache hosts for the peer to replicate the task. It will compare the current + // persistent replica count with the persistent replica count and try to find enough hosts. + FindReplicatePersistentCacheHosts(context.Context, *persistentcache.Task, set.SafeSet[string]) ([]*persistentcache.Host, bool) // FindCandidatePersistentCacheParents finds candidate persistent cache parents for the peer to download the task. FindCandidatePersistentCacheParents(context.Context, *persistentcache.Peer, set.SafeSet[string]) ([]*persistentcache.Peer, bool) @@ -410,7 +411,7 @@ func (s *scheduling) FindCandidateParents(ctx context.Context, peer *standard.Pe // Sort candidate parents by evaluation score. taskTotalPieceCount := peer.Task.TotalPieceCount.Load() - candidateParents = s.evaluator.EvaluateParents(candidateParents, peer, taskTotalPieceCount) + candidateParents = s.evaluator.EvaluateParents(candidateParents, peer, uint32(taskTotalPieceCount)) // Get the parents with candidateParentLimit. candidateParentLimit := config.DefaultSchedulerCandidateParentLimit @@ -452,7 +453,7 @@ func (s *scheduling) FindParentAndCandidateParents(ctx context.Context, peer *st // Sort candidate parents by evaluation score. taskTotalPieceCount := peer.Task.TotalPieceCount.Load() - candidateParents = s.evaluator.EvaluateParents(candidateParents, peer, taskTotalPieceCount) + candidateParents = s.evaluator.EvaluateParents(candidateParents, peer, uint32(taskTotalPieceCount)) // Get the parents with candidateParentLimit. candidateParentLimit := config.DefaultSchedulerCandidateParentLimit @@ -501,7 +502,7 @@ func (s *scheduling) FindSuccessParent(ctx context.Context, peer *standard.Peer, // Sort candidate parents by evaluation score. taskTotalPieceCount := peer.Task.TotalPieceCount.Load() - successParents = s.evaluator.EvaluateParents(successParents, peer, taskTotalPieceCount) + successParents = s.evaluator.EvaluateParents(successParents, peer, uint32(taskTotalPieceCount)) peer.Log.Infof("scheduling success parent is %s", successParents[0].ID) return successParents[0], true @@ -587,10 +588,76 @@ func (s *scheduling) filterCandidateParents(peer *standard.Peer, blocklist set.S return candidateParents } -// TODO(Gaius) Implement the following methods. -// FindReplicatePersistentCacheParents finds replicate persistent cache parents for the peer to replicate the task. -func (s *scheduling) FindReplicatePersistentCacheParents(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) ([]*persistentcache.Peer, bool) { - return nil, false +// FindReplicatePersistentCacheHosts finds replicate persistent cache hosts for the peer to replicate the task. It will compare the current +// persistent replica count with the persistent replica count and try to find enough parents. +func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) ([]*persistentcache.Host, bool) { + currentPersistentReplicaCount, err := s.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, task.ID) + if err != nil { + task.Log.Errorf("load current persistent replica count failed %s", err) + return nil, false + } + + needPersistentReplicaCount := int(task.PersistentReplicaCount - currentPersistentReplicaCount) + if needPersistentReplicaCount <= 0 { + task.Log.Infof("persistent cache task %s has enough persistent replica count %d", task.ID, task.PersistentReplicaCount) + return nil, false + } + + var ( + replicateHosts []*persistentcache.Host + replicateHostIDs []string + ) + cachedHosts := s.filterCachedReplicatePersistentCacheHosts(ctx, task, blocklist) + cachedHostsCount := len(cachedHosts) + + // If the number of cached hosts is greater than or equal to the number of persistent replica count, + // return the cached hosts directly and no need to find the replicate hosts without cache. + if cachedHostsCount >= needPersistentReplicaCount { + for _, cachedHost := range cachedHosts[:needPersistentReplicaCount] { + replicateHosts = append(replicateHosts, cachedHost) + replicateHostIDs = append(replicateHostIDs, cachedHost.ID) + } + + task.Log.Infof("find persistent cache hosts is %#v", replicateHostIDs) + return replicateHosts, true + } + + // If cached hosts are not enough, append the replicate cached hosts and find the replicate hosts without cache. + if cachedHostsCount > 0 { + for _, cachedHost := range cachedHosts { + replicateHosts = append(replicateHosts, cachedHost) + replicateHostIDs = append(replicateHostIDs, cachedHost.ID) + blocklist.Add(cachedHost.ID) + } + } + + // Load all current persistent peers and add them to the blocklist to avoid scheduling the same host. + currentPersistentPeers, err := s.persistentCacheResource.PeerManager().LoadPersistentAllByTaskID(ctx, task.ID) + if err != nil { + task.Log.Errorf("load all persistent cache peers failed: %s", err.Error()) + return nil, false + } + + for _, currentPersistentPeer := range currentPersistentPeers { + blocklist.Add(currentPersistentPeer.Host.ID) + } + + // Find the replicate hosts without cache. Calculate the number of persistent replicas needed without considering the cache. + // Formula: Needed persistent replica count without cache = Total persistent replica count - Current persistent replica count - Cached hosts count. + needPersistentReplicaCount -= cachedHostsCount + hosts := s.filterReplicatePersistentCacheHosts(ctx, task, needPersistentReplicaCount, blocklist) + for _, host := range hosts { + replicateHosts = append(replicateHosts, host) + replicateHostIDs = append(replicateHostIDs, host.ID) + } + + if len(replicateHosts) == 0 { + task.Log.Info("can not find replicate persistent cache hosts") + return nil, false + } + + task.Log.Infof("find persistent cache hosts is %#v", replicateHostIDs) + return replicateHosts, false } // FindCandidatePersistentCacheParents finds candidate persistent cache parents for the peer to download the task. @@ -626,7 +693,7 @@ func (s *scheduling) FindCandidatePersistentCacheParents(ctx context.Context, pe return candidateParents, true } -// filterCandidateParents filters the candidate parents that can be scheduled. +// filterCandidatePersistentCacheParents filters the candidate persistent cache parents that can be scheduled. func (s *scheduling) filterCandidatePersistentCacheParents(ctx context.Context, peer *persistentcache.Peer, blocklist set.SafeSet[string]) []*persistentcache.Peer { parents, err := s.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, peer.Task.ID) if err != nil { @@ -665,6 +732,85 @@ func (s *scheduling) filterCandidatePersistentCacheParents(ctx context.Context, return candidateParents } +// filterCachedReplicatePersistentCacheHosts filters the cached replicate persistent cache hosts that can be scheduled. +func (s *scheduling) filterCachedReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) []*persistentcache.Host { + parents, err := s.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, task.ID) + if err != nil { + task.Log.Errorf("load all persistent cache parents failed: %s", err.Error()) + return nil + } + + var ( + replicateHosts []*persistentcache.Host + replicateHostIDs []string + ) + for _, replicateParent := range parents { + // Candidate persistent cache parent is in blocklist. + if blocklist.Contains(replicateParent.ID) { + task.Log.Debugf("persistent cache parent %s host %s is not selected because it is in blocklist", replicateParent.ID, replicateParent.Host.ID) + continue + } + + // If the parent is persistent, it cannot be selected. + if replicateParent.Persistent { + task.Log.Debugf("persistent cache parent %s host %s is not selected because it is persistent", replicateParent.ID, replicateParent.Host.ID) + continue + } + + // If the parent is not succeeded, it cannot be selected. + if !replicateParent.FSM.Is(standard.PeerStateSucceeded) { + task.Log.Debugf("persistent cache parent %s host %s is not selected because its download state is %s", replicateParent.ID, replicateParent.Host.ID, replicateParent.FSM.Current()) + continue + } + + // If the host is disable shared, it cannot be selected. + if replicateParent.Host.DisableShared { + task.Log.Debugf("persistent cache parent %s host %s is not selected because it is disable shared", replicateParent.ID, replicateParent.Host.ID) + continue + } + + replicateHosts = append(replicateHosts, replicateParent.Host) + replicateHostIDs = append(replicateHostIDs, replicateParent.Host.ID) + } + + task.Log.Infof("filter cached hosts is %#v", replicateHostIDs) + return replicateHosts +} + +// filterReplicatePersistentCacheHosts filters the replicate persistent cache hosts that can be scheduled. +func (s *scheduling) filterReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, count int, blocklist set.SafeSet[string]) []*persistentcache.Host { + hosts, err := s.persistentCacheResource.HostManager().LoadRandom(ctx, count, blocklist) + if err != nil { + task.Log.Errorf("load all persistent cache hosts failed: %s", err.Error()) + return nil + } + + var ( + replicateHosts []*persistentcache.Host + replicateHostIDs []string + ) + for _, host := range hosts { + // If the host is disable shared, it cannot be selected. + if host.DisableShared { + task.Log.Debugf("persistent cache host %s is not selected because it is disable shared", host.ID) + continue + } + + // If the available disk space is not enough, it cannot be selected. + if host.Disk.Free < task.ContentLength { + task.Log.Debugf("persistent cache host %s is not selected because its free disk space is not enough, free disk is %d, content length is %d", + host.ID, host.Disk.Free, task.ContentLength) + continue + } + + replicateHosts = append(replicateHosts, host) + replicateHostIDs = append(replicateHostIDs, host.ID) + } + + task.Log.Infof("filter hosts is %#v", replicateHostIDs) + return replicateHosts +} + // constructSuccessNormalTaskResponse constructs scheduling successful response of the normal task. // Used only in v2 version of the grpc. func constructSuccessNormalTaskResponse(candidateParents []*standard.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse { diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 550e8b5af32..c3a1602daa1 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -23,7 +23,9 @@ import ( "time" "github.com/bits-and-blooms/bitset" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" @@ -37,6 +39,7 @@ import ( "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/net/http" + dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/metrics" @@ -1771,13 +1774,13 @@ func (v *V2) handleRegisterPersistentCachePeerRequest(ctx context.Context, strea Task: &commonv2.PersistentCacheTask{ Id: parent.Task.ID, PersistentReplicaCount: parent.Task.PersistentReplicaCount, - CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount), - CurrentReplicaCount: uint64(currentReplicaCount), + CurrentPersistentReplicaCount: currentPersistentReplicaCount, + CurrentReplicaCount: currentReplicaCount, Tag: &parent.Task.Tag, Application: &parent.Task.Application, - PieceLength: uint64(parent.Task.PieceLength), - ContentLength: uint64(parent.Task.ContentLength), - PieceCount: uint32(parent.Task.TotalPieceCount), + PieceLength: parent.Task.PieceLength, + ContentLength: parent.Task.ContentLength, + PieceCount: parent.Task.TotalPieceCount, State: parent.Task.FSM.Current(), CreatedAt: timestamppb.New(parent.Task.CreatedAt), UpdatedAt: timestamppb.New(parent.Task.UpdatedAt), @@ -1965,13 +1968,13 @@ func (v *V2) handleReschedulePersistentCachePeerRequest(ctx context.Context, str Task: &commonv2.PersistentCacheTask{ Id: parent.Task.ID, PersistentReplicaCount: parent.Task.PersistentReplicaCount, - CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount), - CurrentReplicaCount: uint64(currentReplicaCount), + CurrentPersistentReplicaCount: currentPersistentReplicaCount, + CurrentReplicaCount: currentReplicaCount, Tag: &parent.Task.Tag, Application: &parent.Task.Application, - PieceLength: uint64(parent.Task.PieceLength), - ContentLength: uint64(parent.Task.ContentLength), - PieceCount: uint32(parent.Task.TotalPieceCount), + PieceLength: parent.Task.PieceLength, + ContentLength: parent.Task.ContentLength, + PieceCount: parent.Task.TotalPieceCount, State: parent.Task.FSM.Current(), CreatedAt: timestamppb.New(parent.Task.CreatedAt), UpdatedAt: timestamppb.New(parent.Task.UpdatedAt), @@ -2226,12 +2229,12 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP Task: &commonv2.PersistentCacheTask{ Id: peer.Task.ID, PersistentReplicaCount: peer.Task.PersistentReplicaCount, - CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount), - CurrentReplicaCount: uint64(currentReplicaCount), + CurrentPersistentReplicaCount: currentPersistentReplicaCount, + CurrentReplicaCount: currentReplicaCount, Tag: &peer.Task.Tag, Application: &peer.Task.Application, - PieceLength: uint64(peer.Task.PieceLength), - ContentLength: uint64(peer.Task.ContentLength), + PieceLength: peer.Task.PieceLength, + ContentLength: peer.Task.ContentLength, PieceCount: uint32(peer.Task.TotalPieceCount), State: peer.Task.FSM.Current(), CreatedAt: timestamppb.New(peer.Task.CreatedAt), @@ -2319,13 +2322,26 @@ func (v *V2) DeletePersistentCachePeer(ctx context.Context, req *schedulerv2.Del log := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId()) log.Info("delete persistent cache peer") + task, founded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId()) + if !founded { + log.Errorf("persistent cache task %s not found", req.GetTaskId()) + return status.Errorf(codes.NotFound, "persistent cache task %s not found", req.GetTaskId()) + } + if err := v.persistentCacheResource.PeerManager().Delete(ctx, req.GetPeerId()); err != nil { log.Errorf("delete persistent cache peer %s error %s", req.GetPeerId(), err) return status.Error(codes.Internal, err.Error()) } - // TODO(gaius) Implement copy replica to the other peers. // Select the remote peer to copy the replica and trigger the download task with asynchronous. + blocklist := set.NewSafeSet[string]() + blocklist.Add(req.GetHostId()) + go func(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) { + if err := v.replicatePersistentCacheTask(ctx, task, blocklist); err != nil { + log.Errorf("replicate persistent cache task failed %s", err) + } + }(context.Background(), task, blocklist) + return nil } @@ -2352,7 +2368,7 @@ func (v *V2) UploadPersistentCacheTaskStarted(ctx context.Context, req *schedule } task = persistentcache.NewTask(req.GetTaskId(), req.GetTag(), req.GetApplication(), persistentcache.TaskStatePending, req.GetPersistentReplicaCount(), - int32(req.GetPieceLength()), int64(req.GetContentLength()), int32(req.GetPieceCount()), req.GetTtl().AsDuration(), time.Now(), time.Now(), log) + req.GetPieceLength(), req.GetContentLength(), req.GetPieceCount(), req.GetTtl().AsDuration(), time.Now(), time.Now(), log) if err := task.FSM.Event(ctx, persistentcache.TaskEventUpload); err != nil { log.Errorf("task fsm event failed: %s", err.Error()) @@ -2438,22 +2454,86 @@ func (v *V2) UploadPersistentCacheTaskFinished(ctx context.Context, req *schedul return nil, status.Error(codes.Internal, err.Error()) } - // TODO(gaius) Implement copy multiple replicas to the other peers. - // Select the remote peer to copy the replica and trigger the download task with asynchronous. - return &commonv2.PersistentCacheTask{ + persistentCacheTask := &commonv2.PersistentCacheTask{ Id: peer.Task.ID, PersistentReplicaCount: peer.Task.PersistentReplicaCount, - CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount), - CurrentReplicaCount: uint64(currentReplicaCount), + CurrentPersistentReplicaCount: currentPersistentReplicaCount, + CurrentReplicaCount: currentReplicaCount, Tag: &peer.Task.Tag, Application: &peer.Task.Application, - PieceLength: uint64(peer.Task.PieceLength), - ContentLength: uint64(peer.Task.ContentLength), - PieceCount: uint32(peer.Task.TotalPieceCount), + PieceLength: peer.Task.PieceLength, + ContentLength: peer.Task.ContentLength, + PieceCount: peer.Task.TotalPieceCount, State: peer.Task.FSM.Current(), CreatedAt: timestamppb.New(peer.Task.CreatedAt), UpdatedAt: timestamppb.New(peer.Task.UpdatedAt), - }, nil + } + + // Select the remote peer to copy the replica and trigger the download task with asynchronous. + blocklist := set.NewSafeSet[string]() + blocklist.Add(peer.Host.ID) + go func(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) { + if err := v.replicatePersistentCacheTask(ctx, task, blocklist); err != nil { + log.Errorf("replicate persistent cache task failed %s", err) + } + }(context.Background(), peer.Task, blocklist) + + return persistentCacheTask, nil +} + +// replicatePersistentCacheTask replicates the persistent cache task to the remote peer. +func (v *V2) replicatePersistentCacheTask(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) error { + hosts, found := v.scheduling.FindReplicatePersistentCacheHosts(ctx, task, blocklist) + if !found { + task.Log.Warn("no replicate hosts found") + return nil + } + + for _, host := range hosts { + go func(context.Context, *persistentcache.Task, *persistentcache.Host) { + if err := v.downloadPersistentCacheTaskByPeer(ctx, task, host); err != nil { + task.Log.Errorf("replicate to host %s failed %s", host.ID, err) + } + }(context.Background(), task, host) + } + + return nil +} + +// downloadPersistentCacheTaskByPeer downloads the persistent cache task by peer. +func (v *V2) downloadPersistentCacheTaskByPeer(ctx context.Context, task *persistentcache.Task, host *persistentcache.Host) error { + addr := fmt.Sprintf("%s:%d", host.IP, host.DownloadPort) + dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr, dialOptions...) + if err != nil { + task.Log.Errorf("get dfdaemon client failed %s", err) + return err + } + + stream, err := dfdaemonClient.DownloadPersistentCacheTask(ctx, &dfdaemonv2.DownloadPersistentCacheTaskRequest{ + TaskId: task.ID, + Persistent: true, + Tag: &task.Tag, + Application: &task.Application, + }) + if err != nil { + task.Log.Errorf("download persistent cache task failed %s", err) + return err + } + + // Wait for the download persistent cache task to complete. + for { + _, err := stream.Recv() + if err != nil { + if err == io.EOF { + task.Log.Info("download persistent cache task finished") + return nil + } + + task.Log.Errorf("download persistent cache task failed %s", err) + return err + } + } } // UploadPersistentCacheTaskFailed uploads the metadata of the persistent cache task failed. @@ -2528,13 +2608,13 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP return &commonv2.PersistentCacheTask{ Id: task.ID, PersistentReplicaCount: task.PersistentReplicaCount, - CurrentPersistentReplicaCount: uint64(currentPersistentReplicaCount), - CurrentReplicaCount: uint64(currentReplicaCount), + CurrentPersistentReplicaCount: currentPersistentReplicaCount, + CurrentReplicaCount: currentReplicaCount, Tag: &task.Tag, Application: &task.Application, - PieceLength: uint64(task.PieceLength), - ContentLength: uint64(task.ContentLength), - PieceCount: uint32(task.TotalPieceCount), + PieceLength: task.PieceLength, + ContentLength: task.ContentLength, + PieceCount: task.TotalPieceCount, State: task.FSM.Current(), CreatedAt: timestamppb.New(task.CreatedAt), UpdatedAt: timestamppb.New(task.UpdatedAt),