Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replicate persistent cache task when task needs persistent replicas #3784

Merged
merged 1 commit into from
Jan 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: replicate persistent cache task when task needs persistent repl…
…icas

Signed-off-by: Gaius <gaius.qi@gmail.com>
gaius-qi committed Jan 24, 2025
commit 855809e9a59055910ec97607d1cb6f832789ec9e
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.50.0
d7y.io/api/v2 v2.1.12
d7y.io/api/v2 v2.1.16
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.50.0 h1:3TbVkzTooBvnZsk7WaAQfOsNrdoM8QHusXA1cpk6Q
cloud.google.com/go/storage v1.50.0/go.mod h1:l7XeiD//vx5lfqE3RavfmU9yvk5Pp0Zhcv482poyafY=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.1.12 h1:jFo4TA6sRVSbcjPlFrig8S+7P37pww4bFbeTxcGhd54=
d7y.io/api/v2 v2.1.12/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
d7y.io/api/v2 v2.1.16 h1:ql4PaC17eG0NSteu+4cijrQ7vA/P/Xki4we66Q7FbQw=
d7y.io/api/v2 v2.1.16/go.mod h1:zPZ7m8yC1LZH9VR4ACcvrphhPIVKSS2c3QHG+PRSixU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
198 changes: 133 additions & 65 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
@@ -20,12 +20,14 @@ package persistentcache

import (
"context"
"math/rand"
"strconv"
"time"

redis "github.com/redis/go-redis/v9"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/set"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
@@ -51,6 +53,9 @@ type HostManager interface {
// LoadAll returns all hosts.
LoadAll(context.Context) ([]*Host, error)

// LoadRandom loads host randomly through the set of redis.
LoadRandom(context.Context, int, set.SafeSet[string]) ([]*Host, error)

// RunGC runs garbage collection.
RunGC() error
}
@@ -446,74 +451,102 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {

// Store sets host.
func (h *hostManager) Store(ctx context.Context, host *Host) error {
_, err := h.rdb.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"network_download_rate", host.Network.DownloadRate,
"network_download_rate_limit", host.Network.DownloadRateLimit,
"network_upload_rate", host.Network.UploadRate,
"network_upload_rate_limit", host.Network.UploadRateLimit,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result()

return err
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"network_download_rate", host.Network.DownloadRate,
"network_download_rate_limit", host.Network.DownloadRateLimit,
"network_upload_rate", host.Network.UploadRate,
"network_upload_rate_limit", host.Network.UploadRateLimit,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result(); err != nil {
return err
}

if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), host.ID).Result(); err != nil {
return err
}

return nil
}); err != nil {
host.Log.Errorf("store host failed: %v", err)
return err
}

return nil
}

// Delete deletes host by a key.
func (h *hostManager) Delete(ctx context.Context, hostID string) error {
_, err := h.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result()
return err
log := logger.WithHostID(hostID)
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result(); err != nil {
return err
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), hostID).Result(); err != nil {
return err
}

return nil
}); err != nil {
log.Errorf("store host failed: %v", err)
return err
}

return nil
}

// LoadAll returns all hosts.
@@ -529,7 +562,7 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
err error
)

hostKeys, cursor, err = h.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), 10).Result()
hostKeys, cursor, err = h.rdb.SScan(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), cursor, "*", 10).Result()
if err != nil {
logger.Error("scan hosts failed")
return nil, err
@@ -553,6 +586,41 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
return hosts, nil
}

// LoadRandom loads host randomly through the set of redis.
func (h *hostManager) LoadRandom(ctx context.Context, n int, blocklist set.SafeSet[string]) ([]*Host, error) {
hostKeys, err := h.rdb.SMembers(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID)).Result()
if err != nil {
logger.Error("smembers hosts failed")
return nil, err
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(hostKeys), func(i, j int) {
hostKeys[i], hostKeys[j] = hostKeys[j], hostKeys[i]
})

hosts := make([]*Host, 0, n)
for _, hostKey := range hostKeys {
if len(hosts) >= n {
break
}

if blocklist.Contains(hostKey) {
continue
}

host, loaded := h.Load(ctx, hostKey)
if !loaded {
logger.WithHostID(hostKey).Error("load host failed")
continue
}

hosts = append(hosts, host)
}

return hosts, nil
}

// RunGC runs garbage collection.
func (h *hostManager) RunGC() error {
hosts, err := h.LoadAll(context.Background())
16 changes: 16 additions & 0 deletions scheduler/resource/persistentcache/host_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
@@ -51,6 +51,9 @@ type PeerManager interface {
// LoadAllByTaskID returns all peers by task id.
LoadAllByTaskID(context.Context, string) ([]*Peer, error)

// LoadPersistentAllByTaskID returns all persistent peers by task id.
LoadPersistentAllByTaskID(context.Context, string) ([]*Peer, error)

// DeleteAllByTaskID deletes all peers by task id.
DeleteAllByTaskID(context.Context, string) error

@@ -242,7 +245,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
func (p *peerManager) Delete(ctx context.Context, peerID string) error {
log := logger.WithPeerID(peerID)
if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
rawPeer, err := p.rdb.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
rawPeer, err := pipe.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
if err != nil {
return errors.New("getting peer failed from redis")
}
@@ -343,6 +346,29 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe
return peers, nil
}

// LoadPersistentAllByTaskID returns all persistent cache peers by task id.
func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
continue
}

peers = append(peers, peer)
}

return peers, nil
}

// DeleteAllByTaskID deletes all persistent cache peers by task id.
func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error {
log := logger.WithTaskID(taskID)
15 changes: 15 additions & 0 deletions scheduler/resource/persistentcache/peer_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
@@ -77,13 +77,13 @@ type Task struct {
Application string

// Persistet cache task piece length.
PieceLength int32
PieceLength uint64

// ContentLength is persistent cache task total content length.
ContentLength int64
ContentLength uint64

// TotalPieceCount is total piece count.
TotalPieceCount int32
TotalPieceCount uint32

// Persistent cache task state machine.
FSM *fsm.FSM
@@ -102,8 +102,8 @@ type Task struct {
}

// New persistent cache task instance.
func NewTask(id, tag, application, state string, persistentReplicaCount uint64, pieceLength int32,
contentLength int64, totalPieceCount int32, ttl time.Duration, createdAt, updatedAt time.Time,
func NewTask(id, tag, application, state string, persistentReplicaCount, pieceLength, contentLength uint64,
totalPieceCount uint32, ttl time.Duration, createdAt, updatedAt time.Time,
log *logger.SugaredLoggerOnWith) *Task {
t := &Task{
ID: id,
Loading