Skip to content

Commit

Permalink
feat: replicate the persistent cache task when delete host
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jan 26, 2025
1 parent a1cb0a4 commit b4734d5
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 60 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.18
d7y.io/api/v2 v2.1.22
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.18 h1:5fpA94N7CihRdQxWPzUFx3qO7ScY76d0R2oxBtOt9PE=
d7y.io/api/v2 v2.1.18/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.22 h1:uz5fACtHBx+TPlQz36Bcqa3Zs86MhcWlFKHtyyHVlzk=
d7y.io/api/v2 v2.1.22/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
12 changes: 12 additions & 0 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type V2 interface {
// DownloadPersistentCacheTask downloads persistent cache task from p2p network.
DownloadPersistentCacheTask(context.Context, *dfdaemonv2.DownloadPersistentCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadPersistentCacheTaskClient, error)

// UpdatePersistentCacheTask updates persistent cache task information.
UpdatePersistentCacheTask(context.Context, *dfdaemonv2.UpdatePersistentCacheTaskRequest, ...grpc.CallOption) error

// StatPersistentCacheTask stats persistent cache task information.
StatPersistentCacheTask(context.Context, *dfdaemonv2.StatPersistentCacheTaskRequest, ...grpc.CallOption) (*commonv2.PersistentCacheTask, error)

Expand Down Expand Up @@ -212,6 +215,15 @@ func (v *v2) DownloadPersistentCacheTask(ctx context.Context, req *dfdaemonv2.Do
return v.DfdaemonUploadClient.DownloadPersistentCacheTask(ctx, req, opts...)
}

// UpdatePersistentCacheTask updates persistent cache task information.
func (v *v2) UpdatePersistentCacheTask(ctx context.Context, req *dfdaemonv2.UpdatePersistentCacheTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonUploadClient.UpdatePersistentCacheTask(ctx, req, opts...)
return err
}

// StatPersistentCacheTask stats persistent cache task information.
func (v *v2) StatPersistentCacheTask(ctx context.Context, req *dfdaemonv2.StatPersistentCacheTaskRequest, opts ...grpc.CallOption) (*commonv2.PersistentCacheTask, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
Expand Down
19 changes: 19 additions & 0 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{
ID: id,
Persistent: persistent,
FinishedPieces: finishedPieces,
Task: task,
Host: host,
Expand Down
27 changes: 13 additions & 14 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
Expand All @@ -226,7 +226,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
Expand All @@ -238,6 +238,11 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}

return nil
}); err != nil {
peer.Log.Errorf("store peer failed: %v", err)
Expand All @@ -251,8 +256,8 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
func (p *peerManager) Delete(ctx context.Context, peerID string) error {
log := logger.WithPeerID(peerID)
if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
rawPeer, err := pipe.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
if err != nil {
peer, found := p.Load(ctx, peerID)
if !found {
return errors.New("getting peer failed from redis")
}

Expand All @@ -261,24 +266,18 @@ func (p *peerManager) Delete(ctx context.Context, peerID string) error {
return err
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil {
log.Errorf("delete peer id from task joint-set failed: %v", err)
return err
}

persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return err
}

if persistent {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
if peer.Persistent {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil {
log.Errorf("delete persistent peer id from task joint-set failed: %v", err)
}
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID).Result(); err != nil {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peerID).Result(); err != nil {
log.Errorf("delete peer id from host joint-set failed: %v", err)
return err
}
Expand Down
30 changes: 30 additions & 0 deletions scheduler/resource/persistentcache/peer_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions scheduler/resource/standard/seed_peer_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions scheduler/scheduling/mocks/scheduling_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 25 additions & 21 deletions scheduler/scheduling/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ type Scheduling interface {
FindSuccessParent(context.Context, *standard.Peer, set.SafeSet[string]) (*standard.Peer, bool)

// FindReplicatePersistentCacheHosts finds replicate persistent cache hosts for the peer to replicate the task. It will compare the current
// persistent replica count with the persistent replica count and try to find enough hosts.
FindReplicatePersistentCacheHosts(context.Context, *persistentcache.Task, set.SafeSet[string]) ([]*persistentcache.Host, bool)
// persistent replica count with the persistent replica count and try to find enough parents. Then function will return the cached replicate hosts,
// the replicate hosts without cache and found flag.
FindReplicatePersistentCacheHosts(context.Context, *persistentcache.Task, set.SafeSet[string]) ([]*persistentcache.Host, []*persistentcache.Host, bool)

// FindCandidatePersistentCacheParents finds candidate persistent cache parents for the peer to download the task.
FindCandidatePersistentCacheParents(context.Context, *persistentcache.Peer, set.SafeSet[string]) ([]*persistentcache.Peer, bool)
Expand Down Expand Up @@ -589,23 +590,26 @@ func (s *scheduling) filterCandidateParents(peer *standard.Peer, blocklist set.S
}

// FindReplicatePersistentCacheHosts finds replicate persistent cache hosts for the peer to replicate the task. It will compare the current
// persistent replica count with the persistent replica count and try to find enough parents.
func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) ([]*persistentcache.Host, bool) {
// persistent replica count with the persistent replica count and try to find enough parents. Then function will return the cached replicate hosts,
// the replicate hosts without cache and found flag.
func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) ([]*persistentcache.Host, []*persistentcache.Host, bool) {
currentPersistentReplicaCount, err := s.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, task.ID)
if err != nil {
task.Log.Errorf("load current persistent replica count failed %s", err)
return nil, false
return nil, nil, false
}

needPersistentReplicaCount := int(task.PersistentReplicaCount - currentPersistentReplicaCount)
if needPersistentReplicaCount <= 0 {
task.Log.Infof("persistent cache task %s has enough persistent replica count %d", task.ID, task.PersistentReplicaCount)
return nil, false
return nil, nil, false
}

var (
replicateHosts []*persistentcache.Host
replicateHostIDs []string
replicateHosts []*persistentcache.Host
replicateHostIDs []string
cachedReplicateHosts []*persistentcache.Host
cachedReplicateHostIDs []string
)
cachedHosts := s.filterCachedReplicatePersistentCacheHosts(ctx, task, blocklist)
cachedHostsCount := len(cachedHosts)
Expand All @@ -614,19 +618,19 @@ func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task
// return the cached hosts directly and no need to find the replicate hosts without cache.
if cachedHostsCount >= needPersistentReplicaCount {
for _, cachedHost := range cachedHosts[:needPersistentReplicaCount] {
replicateHosts = append(replicateHosts, cachedHost)
replicateHostIDs = append(replicateHostIDs, cachedHost.ID)
cachedReplicateHosts = append(cachedReplicateHosts, cachedHost)
cachedReplicateHostIDs = append(cachedReplicateHostIDs, cachedHost.ID)
}

task.Log.Infof("find persistent cache hosts is %#v", replicateHostIDs)
return replicateHosts, true
task.Log.Infof("find cached hosts is %#v", cachedReplicateHostIDs)
return cachedReplicateHosts, nil, true
}

// If cached hosts are not enough, append the replicate cached hosts and find the replicate hosts without cache.
if cachedHostsCount > 0 {
for _, cachedHost := range cachedHosts {
replicateHosts = append(replicateHosts, cachedHost)
replicateHostIDs = append(replicateHostIDs, cachedHost.ID)
cachedReplicateHosts = append(cachedReplicateHosts, cachedHost)
cachedReplicateHostIDs = append(cachedReplicateHostIDs, cachedHost.ID)
blocklist.Add(cachedHost.ID)
}
}
Expand All @@ -635,7 +639,7 @@ func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task
currentPersistentPeers, err := s.persistentCacheResource.PeerManager().LoadPersistentAllByTaskID(ctx, task.ID)
if err != nil {
task.Log.Errorf("load all persistent cache peers failed: %s", err.Error())
return nil, false
return nil, nil, false
}

for _, currentPersistentPeer := range currentPersistentPeers {
Expand All @@ -651,13 +655,13 @@ func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task
replicateHostIDs = append(replicateHostIDs, host.ID)
}

if len(replicateHosts) == 0 {
task.Log.Info("can not find replicate persistent cache hosts")
return nil, false
if len(cachedReplicateHosts) == 0 && len(replicateHosts) == 0 {
task.Log.Info("can not find replicate hosts")
return nil, nil, false
}

task.Log.Infof("find persistent cache hosts is %#v", replicateHostIDs)
return replicateHosts, false
task.Log.Infof("find cached hosts is %#v and hosts is %#v", cachedReplicateHostIDs, replicateHostIDs)
return cachedReplicateHosts, replicateHosts, true
}

// FindCandidatePersistentCacheParents finds candidate persistent cache parents for the peer to download the task.
Expand Down Expand Up @@ -758,7 +762,7 @@ func (s *scheduling) filterCachedReplicatePersistentCacheHosts(ctx context.Conte
}

// If the parent is not succeeded, it cannot be selected.
if !replicateParent.FSM.Is(standard.PeerStateSucceeded) {
if !replicateParent.FSM.Is(persistentcache.PeerStateSucceeded) {
task.Log.Debugf("persistent cache parent %s host %s is not selected because its download state is %s", replicateParent.ID, replicateParent.Host.ID, replicateParent.FSM.Current())
continue
}
Expand Down
Loading

0 comments on commit b4734d5

Please sign in to comment.