Skip to content

Commit

Permalink
feat: replicate persistent cache task when task needs persistent repl…
Browse files Browse the repository at this point in the history
…icas

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jan 24, 2025
1 parent 9320a7f commit cad60c8
Show file tree
Hide file tree
Showing 16 changed files with 503 additions and 150 deletions.
198 changes: 133 additions & 65 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package persistentcache

import (
"context"
"math/rand"
"strconv"
"time"

redis "github.com/redis/go-redis/v9"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/set"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
Expand All @@ -51,6 +53,9 @@ type HostManager interface {
// LoadAll returns all hosts.
LoadAll(context.Context) ([]*Host, error)

// LoadRandom loads host randomly through the set of redis.
LoadRandom(context.Context, int, set.SafeSet[string]) ([]*Host, error)

// RunGC runs garbage collection.
RunGC() error
}
Expand Down Expand Up @@ -446,74 +451,102 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {

// Store sets host.
func (h *hostManager) Store(ctx context.Context, host *Host) error {
_, err := h.rdb.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"network_download_rate", host.Network.DownloadRate,
"network_download_rate_limit", host.Network.DownloadRateLimit,
"network_upload_rate", host.Network.UploadRate,
"network_upload_rate_limit", host.Network.UploadRateLimit,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result()

return err
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"network_download_rate", host.Network.DownloadRate,
"network_download_rate_limit", host.Network.DownloadRateLimit,
"network_upload_rate", host.Network.UploadRate,
"network_upload_rate_limit", host.Network.UploadRateLimit,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result(); err != nil {
return err
}

if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), host.ID).Result(); err != nil {
return err
}

return nil
}); err != nil {
host.Log.Errorf("store host failed: %v", err)
return err
}

return nil
}

// Delete deletes host by a key.
func (h *hostManager) Delete(ctx context.Context, hostID string) error {
_, err := h.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result()
return err
log := logger.WithHostID(hostID)
if _, err := h.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
if _, err := pipe.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(h.config.Manager.SchedulerClusterID, hostID)).Result(); err != nil {
return err
}

if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), hostID).Result(); err != nil {
return err
}

return nil
}); err != nil {
log.Errorf("store host failed: %v", err)
return err
}

return nil
}

// LoadAll returns all hosts.
Expand All @@ -529,7 +562,7 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
err error
)

hostKeys, cursor, err = h.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), 10).Result()
hostKeys, cursor, err = h.rdb.SScan(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID), cursor, "*", 10).Result()
if err != nil {
logger.Error("scan hosts failed")
return nil, err
Expand All @@ -553,6 +586,41 @@ func (h *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
return hosts, nil
}

// LoadRandom loads host randomly through the set of redis.
func (h *hostManager) LoadRandom(ctx context.Context, n int, blocklist set.SafeSet[string]) ([]*Host, error) {
hostKeys, err := h.rdb.SMembers(ctx, pkgredis.MakePersistentCacheHostsInScheduler(h.config.Manager.SchedulerClusterID)).Result()
if err != nil {
logger.Error("smembers hosts failed")
return nil, err
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(hostKeys), func(i, j int) {
hostKeys[i], hostKeys[j] = hostKeys[j], hostKeys[i]
})

hosts := make([]*Host, 0, n)
for _, hostKey := range hostKeys {
if len(hosts) >= n {
break
}

if blocklist.Contains(hostKey) {
continue
}

host, loaded := h.Load(ctx, hostKey)
if !loaded {
logger.WithHostID(hostKey).Error("load host failed")
continue
}

hosts = append(hosts, host)
}

return hosts, nil
}

// RunGC runs garbage collection.
func (h *hostManager) RunGC() error {
hosts, err := h.LoadAll(context.Background())
Expand Down
16 changes: 16 additions & 0 deletions scheduler/resource/persistentcache/host_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion scheduler/resource/persistentcache/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type PeerManager interface {
// LoadAllByTaskID returns all peers by task id.
LoadAllByTaskID(context.Context, string) ([]*Peer, error)

// LoadPersistentAllByTaskID returns all persistent peers by task id.
LoadPersistentAllByTaskID(context.Context, string) ([]*Peer, error)

// DeleteAllByTaskID deletes all peers by task id.
DeleteAllByTaskID(context.Context, string) error

Expand Down Expand Up @@ -242,7 +245,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
func (p *peerManager) Delete(ctx context.Context, peerID string) error {
log := logger.WithPeerID(peerID)
if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
rawPeer, err := p.rdb.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
rawPeer, err := pipe.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result()
if err != nil {
return errors.New("getting peer failed from redis")
}
Expand Down Expand Up @@ -343,6 +346,29 @@ func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Pe
return peers, nil
}

// LoadPersistentAllByTaskID returns all persistent cache peers by task id.
func (p *peerManager) LoadPersistentAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentPeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
return nil, err
}

peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
continue
}

peers = append(peers, peer)
}

return peers, nil
}

// DeleteAllByTaskID deletes all persistent cache peers by task id.
func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error {
log := logger.WithTaskID(taskID)
Expand Down
15 changes: 15 additions & 0 deletions scheduler/resource/persistentcache/peer_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ type Task struct {
Application string

// Persistet cache task piece length.
PieceLength int32
PieceLength uint64

// ContentLength is persistent cache task total content length.
ContentLength int64
ContentLength uint64

// TotalPieceCount is total piece count.
TotalPieceCount int32
TotalPieceCount uint32

// Persistent cache task state machine.
FSM *fsm.FSM
Expand All @@ -102,8 +102,8 @@ type Task struct {
}

// New persistent cache task instance.
func NewTask(id, tag, application, state string, persistentReplicaCount uint64, pieceLength int32,
contentLength int64, totalPieceCount int32, ttl time.Duration, createdAt, updatedAt time.Time,
func NewTask(id, tag, application, state string, persistentReplicaCount, pieceLength, contentLength uint64,
totalPieceCount uint32, ttl time.Duration, createdAt, updatedAt time.Time,
log *logger.SugaredLoggerOnWith) *Task {
t := &Task{
ID: id,
Expand Down
Loading

0 comments on commit cad60c8

Please sign in to comment.