Skip to content

Commit

Permalink
feat: replicate the persistent cache task when delete host (#3787)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 27, 2025
1 parent a1cb0a4 commit 20926e7
Show file tree
Hide file tree
Showing 12 changed files with 322 additions and 93 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.18
d7y.io/api/v2 v2.1.23
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.18 h1:5fpA94N7CihRdQxWPzUFx3qO7ScY76d0R2oxBtOt9PE=
d7y.io/api/v2 v2.1.18/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.23 h1:adUXI1QHNxWG38iwtefZN3j7DysGkHIc3/UmOYFJBgw=
d7y.io/api/v2 v2.1.23/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
12 changes: 12 additions & 0 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ type V2 interface {
// DownloadPersistentCacheTask downloads persistent cache task from p2p network.
DownloadPersistentCacheTask(context.Context, *dfdaemonv2.DownloadPersistentCacheTaskRequest, ...grpc.CallOption) (dfdaemonv2.DfdaemonUpload_DownloadPersistentCacheTaskClient, error)

// UpdatePersistentCacheTask updates persistent cache task information.
UpdatePersistentCacheTask(context.Context, *dfdaemonv2.UpdatePersistentCacheTaskRequest, ...grpc.CallOption) error

// StatPersistentCacheTask stats persistent cache task information.
StatPersistentCacheTask(context.Context, *dfdaemonv2.StatPersistentCacheTaskRequest, ...grpc.CallOption) (*commonv2.PersistentCacheTask, error)

Expand Down Expand Up @@ -212,6 +215,15 @@ func (v *v2) DownloadPersistentCacheTask(ctx context.Context, req *dfdaemonv2.Do
return v.DfdaemonUploadClient.DownloadPersistentCacheTask(ctx, req, opts...)
}

// UpdatePersistentCacheTask updates persistent cache task information.
func (v *v2) UpdatePersistentCacheTask(ctx context.Context, req *dfdaemonv2.UpdatePersistentCacheTaskRequest, opts ...grpc.CallOption) error {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

_, err := v.DfdaemonUploadClient.UpdatePersistentCacheTask(ctx, req, opts...)
return err
}

// StatPersistentCacheTask stats persistent cache task information.
func (v *v2) StatPersistentCacheTask(ctx context.Context, req *dfdaemonv2.StatPersistentCacheTaskRequest, opts ...grpc.CallOption) (*commonv2.PersistentCacheTask, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
Expand Down
19 changes: 19 additions & 0 deletions pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file added q
Empty file.
1 change: 1 addition & 0 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{
ID: id,
Persistent: persistent,
FinishedPieces: finishedPieces,
Task: task,
Host: host,
Expand Down
27 changes: 13 additions & 14 deletions scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
Expand All @@ -226,7 +226,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
Expand All @@ -238,6 +238,11 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
return err
}

if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), ttl).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}

return nil
}); err != nil {
peer.Log.Errorf("store peer failed: %v", err)
Expand All @@ -251,8 +256,8 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
func (p *peerManager) Delete(ctx context.Context, peerID string) error {
log := logger.WithPeerID(peerID)
if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
rawPeer, err := pipe.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
if err != nil {
peer, found := p.Load(ctx, peerID)
if !found {
return errors.New("getting peer failed from redis")
}

Expand All @@ -261,24 +266,18 @@ func (p *peerManager) Delete(ctx context.Context, peerID string) error {
return err
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil {
log.Errorf("delete peer id from task joint-set failed: %v", err)
return err
}

persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return err
}

if persistent {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
if peer.Persistent {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peerID).Result(); err != nil {
log.Errorf("delete persistent peer id from task joint-set failed: %v", err)
}
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID).Result(); err != nil {
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peerID).Result(); err != nil {
log.Errorf("delete peer id from host joint-set failed: %v", err)
return err
}
Expand Down
30 changes: 30 additions & 0 deletions scheduler/resource/persistentcache/peer_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions scheduler/resource/standard/seed_peer_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions scheduler/scheduling/mocks/scheduling_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 20926e7

Please sign in to comment.