From 871110877778c7345d7f35d2cdbbe0847b310c67 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 11 Nov 2024 17:24:11 +0800 Subject: [PATCH] feat: store persistent cache host by announce host api (#3640) Signed-off-by: Gaius --- scheduler/resource/persistentcache/host.go | 31 +- .../resource/persistentcache/host_manager.go | 2 +- .../persistentcache/task_manager_mock.go | 30 + scheduler/service/service_v1_test.go | 99 --- scheduler/service/service_v2.go | 320 +++++++--- scheduler/service/service_v2_test.go | 579 +++++++++++++++++- 6 files changed, 872 insertions(+), 189 deletions(-) diff --git a/scheduler/resource/persistentcache/host.go b/scheduler/resource/persistentcache/host.go index ab0adacbe40..734f27fd588 100644 --- a/scheduler/resource/persistentcache/host.go +++ b/scheduler/resource/persistentcache/host.go @@ -21,8 +21,19 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/types" + "d7y.io/dragonfly/v2/scheduler/config" ) +// HostOption is a functional option for configuring the persistent cache host. +type HostOption func(h *Host) + +// WithConcurrentUploadLimit sets persistent cache host's ConcurrentUploadLimit. +func WithConcurrentUploadLimit(limit int32) HostOption { + return func(h *Host) { + h.ConcurrentUploadLimit = limit + } +} + // Host contains content for host. type Host struct { // ID is host id. @@ -248,11 +259,17 @@ type Disk struct { // New host instance. func NewHost( - id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadLimit, concurrentUploadCount int32, + id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadCount int32, UploadCount, UploadFailedCount int64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk, - build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, + build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, options ...HostOption, ) *Host { - return &Host{ + // Calculate default of the concurrent upload limit by host type. + concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit + if typ == types.HostTypeNormal { + concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit + } + + h := &Host{ ID: id, Type: types.HostType(typ), Hostname: hostname, @@ -271,7 +288,7 @@ func NewHost( Disk: disk, Build: build, AnnounceInterval: announceInterval, - ConcurrentUploadLimit: concurrentUploadLimit, + ConcurrentUploadLimit: int32(concurrentUploadLimit), ConcurrentUploadCount: concurrentUploadCount, UploadCount: UploadCount, UploadFailedCount: UploadFailedCount, @@ -279,4 +296,10 @@ func NewHost( UpdatedAt: updatedAt, Log: logger.WithHost(id, hostname, ip), } + + for _, opt := range options { + opt(h) + } + + return h } diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go index a338e7e0f02..50a28ae7fb3 100644 --- a/scheduler/resource/persistentcache/host_manager.go +++ b/scheduler/resource/persistentcache/host_manager.go @@ -408,7 +408,6 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { rawHost["kernel_version"], int32(port), int32(downloadPort), - int32(concurrentUploadLimit), int32(concurrentUploadCount), uploadCount, uploadFailedCount, @@ -423,6 +422,7 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { createdAt, updatedAt, logger.WithHost(rawHost["id"], rawHost["hostname"], rawHost["ip"]), + WithConcurrentUploadLimit(int32(concurrentUploadLimit)), ), true } diff --git a/scheduler/resource/persistentcache/task_manager_mock.go b/scheduler/resource/persistentcache/task_manager_mock.go index 4b5c30880e7..6580b7288c5 100644 --- a/scheduler/resource/persistentcache/task_manager_mock.go +++ b/scheduler/resource/persistentcache/task_manager_mock.go @@ -84,6 +84,36 @@ func (mr *MockTaskManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockTaskManager)(nil).LoadAll), arg0) } +// LoadCorrentReplicaCount mocks base method. +func (m *MockTaskManager) LoadCorrentReplicaCount(arg0 context.Context, arg1 string) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadCorrentReplicaCount", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadCorrentReplicaCount indicates an expected call of LoadCorrentReplicaCount. +func (mr *MockTaskManagerMockRecorder) LoadCorrentReplicaCount(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadCorrentReplicaCount", reflect.TypeOf((*MockTaskManager)(nil).LoadCorrentReplicaCount), arg0, arg1) +} + +// LoadCurrentPersistentReplicaCount mocks base method. +func (m *MockTaskManager) LoadCurrentPersistentReplicaCount(arg0 context.Context, arg1 string) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadCurrentPersistentReplicaCount", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadCurrentPersistentReplicaCount indicates an expected call of LoadCurrentPersistentReplicaCount. +func (mr *MockTaskManagerMockRecorder) LoadCurrentPersistentReplicaCount(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadCurrentPersistentReplicaCount", reflect.TypeOf((*MockTaskManager)(nil).LoadCurrentPersistentReplicaCount), arg0, arg1) +} + // Store mocks base method. func (m *MockTaskManager) Store(arg0 context.Context, arg1 *Task) error { m.ctrl.T.Helper() diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 8628aad6886..04913a65036 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -33,11 +33,9 @@ import ( "time" "github.com/stretchr/testify/assert" - "go.uber.org/atomic" "go.uber.org/mock/gomock" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" commonv1 "d7y.io/api/v2/pkg/apis/common/v1" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" @@ -76,103 +74,6 @@ var ( TaskDownloadTimeout: 1 * time.Hour, } - mockRawHost = resource.Host{ - ID: mockHostID, - Type: pkgtypes.HostTypeNormal, - Hostname: "foo", - IP: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - OS: "darwin", - Platform: "darwin", - PlatformFamily: "Standalone Workstation", - PlatformVersion: "11.1", - KernelVersion: "20.2.0", - CPU: mockCPU, - Memory: mockMemory, - Network: mockNetwork, - Disk: mockDisk, - Build: mockBuild, - CreatedAt: atomic.NewTime(time.Now()), - UpdatedAt: atomic.NewTime(time.Now()), - } - - mockRawSeedHost = resource.Host{ - ID: mockSeedHostID, - Type: pkgtypes.HostTypeSuperSeed, - Hostname: "bar", - IP: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - OS: "darwin", - Platform: "darwin", - PlatformFamily: "Standalone Workstation", - PlatformVersion: "11.1", - KernelVersion: "20.2.0", - CPU: mockCPU, - Memory: mockMemory, - Network: mockNetwork, - Disk: mockDisk, - Build: mockBuild, - CreatedAt: atomic.NewTime(time.Now()), - UpdatedAt: atomic.NewTime(time.Now()), - } - - mockCPU = resource.CPU{ - LogicalCount: 4, - PhysicalCount: 2, - Percent: 1, - ProcessPercent: 0.5, - Times: resource.CPUTimes{ - User: 240662.2, - System: 317950.1, - Idle: 3393691.3, - Nice: 0, - Iowait: 0, - Irq: 0, - Softirq: 0, - Steal: 0, - Guest: 0, - GuestNice: 0, - }, - } - - mockMemory = resource.Memory{ - Total: 17179869184, - Available: 5962813440, - Used: 11217055744, - UsedPercent: 65.291858, - ProcessUsedPercent: 41.525125, - Free: 2749598908, - } - - mockNetwork = resource.Network{ - TCPConnectionCount: 10, - UploadTCPConnectionCount: 1, - Location: mockHostLocation, - IDC: mockHostIDC, - } - - mockDisk = resource.Disk{ - Total: 499963174912, - Free: 37226479616, - Used: 423809622016, - UsedPercent: 91.92547406065952, - InodesTotal: 4882452880, - InodesUsed: 7835772, - InodesFree: 4874617108, - InodesUsedPercent: 0.1604884305611568, - } - - mockBuild = resource.Build{ - GitVersion: "v1.0.0", - GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", - GoVersion: "1.18", - Platform: "darwin", - } - - mockInterval = durationpb.New(5 * time.Minute).AsDuration() - mockPeerHost = &schedulerv1.PeerHost{ Id: mockHostID, Ip: "127.0.0.1", diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 5b589904f4f..2f78a1e35b8 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -515,6 +515,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ } } + // Handle standard host. host, loaded := v.resource.HostManager().Load(req.Host.GetId()) if !loaded { options := []standard.HostOption{ @@ -613,94 +614,263 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ v.resource.HostManager().Store(host) host.Log.Infof("announce new host: %#v", req) - return nil + } else { + // Host already exists and updates properties. + host.Port = req.Host.GetPort() + host.DownloadPort = req.Host.GetDownloadPort() + host.Type = types.HostType(req.Host.GetType()) + host.DisableShared = req.Host.GetDisableShared() + host.OS = req.Host.GetOs() + host.Platform = req.Host.GetPlatform() + host.PlatformFamily = req.Host.GetPlatformFamily() + host.PlatformVersion = req.Host.GetPlatformVersion() + host.KernelVersion = req.Host.GetKernelVersion() + host.UpdatedAt.Store(time.Now()) + + if concurrentUploadLimit > 0 { + host.ConcurrentUploadLimit.Store(concurrentUploadLimit) + } + + if req.Host.GetCpu() != nil { + host.CPU = standard.CPU{ + LogicalCount: req.Host.Cpu.GetLogicalCount(), + PhysicalCount: req.Host.Cpu.GetPhysicalCount(), + Percent: req.Host.Cpu.GetPercent(), + ProcessPercent: req.Host.Cpu.GetProcessPercent(), + Times: standard.CPUTimes{ + User: req.Host.Cpu.Times.GetUser(), + System: req.Host.Cpu.Times.GetSystem(), + Idle: req.Host.Cpu.Times.GetIdle(), + Nice: req.Host.Cpu.Times.GetNice(), + Iowait: req.Host.Cpu.Times.GetIowait(), + Irq: req.Host.Cpu.Times.GetIrq(), + Softirq: req.Host.Cpu.Times.GetSoftirq(), + Steal: req.Host.Cpu.Times.GetSteal(), + Guest: req.Host.Cpu.Times.GetGuest(), + GuestNice: req.Host.Cpu.Times.GetGuestNice(), + }, + } + } + + if req.Host.GetMemory() != nil { + host.Memory = standard.Memory{ + Total: req.Host.Memory.GetTotal(), + Available: req.Host.Memory.GetAvailable(), + Used: req.Host.Memory.GetUsed(), + UsedPercent: req.Host.Memory.GetUsedPercent(), + ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(), + Free: req.Host.Memory.GetFree(), + } + } + + if req.Host.GetNetwork() != nil { + host.Network = standard.Network{ + TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(), + UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(), + Location: req.Host.Network.GetLocation(), + IDC: req.Host.Network.GetIdc(), + DownloadRate: req.Host.Network.GetDownloadRate(), + DownloadRateLimit: req.Host.Network.GetDownloadRateLimit(), + UploadRate: req.Host.Network.GetUploadRate(), + UploadRateLimit: req.Host.Network.GetUploadRateLimit(), + } + } + + if req.Host.GetDisk() != nil { + host.Disk = standard.Disk{ + Total: req.Host.Disk.GetTotal(), + Free: req.Host.Disk.GetFree(), + Used: req.Host.Disk.GetUsed(), + UsedPercent: req.Host.Disk.GetUsedPercent(), + InodesTotal: req.Host.Disk.GetInodesTotal(), + InodesUsed: req.Host.Disk.GetInodesUsed(), + InodesFree: req.Host.Disk.GetInodesFree(), + InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + } + } + + if req.Host.GetBuild() != nil { + host.Build = standard.Build{ + GitVersion: req.Host.Build.GetGitVersion(), + GitCommit: req.Host.Build.GetGitCommit(), + GoVersion: req.Host.Build.GetGoVersion(), + Platform: req.Host.Build.GetPlatform(), + } + } + + if req.GetInterval() != nil { + host.AnnounceInterval = req.GetInterval().AsDuration() + } } - // Host already exists and updates properties. - host.Port = req.Host.GetPort() - host.DownloadPort = req.Host.GetDownloadPort() - host.Type = types.HostType(req.Host.GetType()) - host.DisableShared = req.Host.GetDisableShared() - host.OS = req.Host.GetOs() - host.Platform = req.Host.GetPlatform() - host.PlatformFamily = req.Host.GetPlatformFamily() - host.PlatformVersion = req.Host.GetPlatformVersion() - host.KernelVersion = req.Host.GetKernelVersion() - host.UpdatedAt.Store(time.Now()) - - if concurrentUploadLimit > 0 { - host.ConcurrentUploadLimit.Store(concurrentUploadLimit) - } - - if req.Host.GetCpu() != nil { - host.CPU = standard.CPU{ - LogicalCount: req.Host.Cpu.GetLogicalCount(), - PhysicalCount: req.Host.Cpu.GetPhysicalCount(), - Percent: req.Host.Cpu.GetPercent(), - ProcessPercent: req.Host.Cpu.GetProcessPercent(), - Times: standard.CPUTimes{ - User: req.Host.Cpu.Times.GetUser(), - System: req.Host.Cpu.Times.GetSystem(), - Idle: req.Host.Cpu.Times.GetIdle(), - Nice: req.Host.Cpu.Times.GetNice(), - Iowait: req.Host.Cpu.Times.GetIowait(), - Irq: req.Host.Cpu.Times.GetIrq(), - Softirq: req.Host.Cpu.Times.GetSoftirq(), - Steal: req.Host.Cpu.Times.GetSteal(), - Guest: req.Host.Cpu.Times.GetGuest(), - GuestNice: req.Host.Cpu.Times.GetGuestNice(), + // Handle the persistent cache host. + persistentCacheHost, loaded := v.persistentCacheResource.HostManager().Load(ctx, req.Host.GetId()) + if !loaded { + options := []persistentcache.HostOption{} + if concurrentUploadLimit > 0 { + options = append(options, persistentcache.WithConcurrentUploadLimit(concurrentUploadLimit)) + } + + persistentCacheHost = persistentcache.NewHost(req.Host.GetId(), req.Host.GetHostname(), req.Host.GetIp(), req.Host.GetOs(), + req.Host.GetPlatform(), req.Host.GetPlatformFamily(), req.Host.GetPlatformVersion(), req.Host.GetKernelVersion(), req.Host.GetPort(), + req.Host.GetDownloadPort(), 0, 0, 0, req.Host.GetDisableShared(), types.HostType(req.Host.GetType()), + persistentcache.CPU{ + LogicalCount: req.Host.Cpu.GetLogicalCount(), + PhysicalCount: req.Host.Cpu.GetPhysicalCount(), + Percent: req.Host.Cpu.GetPercent(), + ProcessPercent: req.Host.Cpu.GetProcessPercent(), + Times: persistentcache.CPUTimes{ + User: req.Host.Cpu.Times.GetUser(), + System: req.Host.Cpu.Times.GetSystem(), + Idle: req.Host.Cpu.Times.GetIdle(), + Nice: req.Host.Cpu.Times.GetNice(), + Iowait: req.Host.Cpu.Times.GetIowait(), + Irq: req.Host.Cpu.Times.GetIrq(), + Softirq: req.Host.Cpu.Times.GetSoftirq(), + Steal: req.Host.Cpu.Times.GetSteal(), + Guest: req.Host.Cpu.Times.GetGuest(), + GuestNice: req.Host.Cpu.Times.GetGuestNice(), + }, + }, + persistentcache.Memory{ + Total: req.Host.Memory.GetTotal(), + Available: req.Host.Memory.GetAvailable(), + Used: req.Host.Memory.GetUsed(), + UsedPercent: req.Host.Memory.GetUsedPercent(), + ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(), + Free: req.Host.Memory.GetFree(), }, + persistentcache.Network{ + TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(), + UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(), + Location: req.Host.Network.GetLocation(), + IDC: req.Host.Network.GetIdc(), + DownloadRate: req.Host.Network.GetDownloadRate(), + DownloadRateLimit: req.Host.Network.GetDownloadRateLimit(), + UploadRate: req.Host.Network.GetUploadRate(), + UploadRateLimit: req.Host.Network.GetUploadRateLimit(), + }, + persistentcache.Disk{ + Total: req.Host.Disk.GetTotal(), + Free: req.Host.Disk.GetFree(), + Used: req.Host.Disk.GetUsed(), + UsedPercent: req.Host.Disk.GetUsedPercent(), + InodesTotal: req.Host.Disk.GetInodesTotal(), + InodesUsed: req.Host.Disk.GetInodesUsed(), + InodesFree: req.Host.Disk.GetInodesFree(), + InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + }, + persistentcache.Build{ + GitVersion: req.Host.Build.GetGitVersion(), + GitCommit: req.Host.Build.GetGitCommit(), + GoVersion: req.Host.Build.GetGoVersion(), + Platform: req.Host.Build.GetPlatform(), + }, + req.GetInterval().AsDuration(), + time.Now(), + time.Now(), + logger.WithHostID(req.Host.GetId()), + options..., + ) + + persistentCacheHost.Log.Infof("announce new persistent cache host: %#v", req) + if err := v.persistentCacheResource.HostManager().Store(ctx, persistentCacheHost); err != nil { + persistentCacheHost.Log.Errorf("store persistent cache host failed: %s", err) + return err } - } + } else { + // persistentCacheHost already exists and updates properties. + persistentCacheHost.Port = req.Host.GetPort() + persistentCacheHost.DownloadPort = req.Host.GetDownloadPort() + persistentCacheHost.Type = types.HostType(req.Host.GetType()) + persistentCacheHost.DisableShared = req.Host.GetDisableShared() + persistentCacheHost.OS = req.Host.GetOs() + persistentCacheHost.Platform = req.Host.GetPlatform() + persistentCacheHost.PlatformFamily = req.Host.GetPlatformFamily() + persistentCacheHost.PlatformVersion = req.Host.GetPlatformVersion() + persistentCacheHost.KernelVersion = req.Host.GetKernelVersion() + persistentCacheHost.UpdatedAt = time.Now() - if req.Host.GetMemory() != nil { - host.Memory = standard.Memory{ - Total: req.Host.Memory.GetTotal(), - Available: req.Host.Memory.GetAvailable(), - Used: req.Host.Memory.GetUsed(), - UsedPercent: req.Host.Memory.GetUsedPercent(), - ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(), - Free: req.Host.Memory.GetFree(), + if concurrentUploadLimit > 0 { + persistentCacheHost.ConcurrentUploadLimit = concurrentUploadLimit } - } - if req.Host.GetNetwork() != nil { - host.Network = standard.Network{ - TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(), - UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(), - Location: req.Host.Network.GetLocation(), - IDC: req.Host.Network.GetIdc(), - DownloadRate: req.Host.Network.GetDownloadRate(), - DownloadRateLimit: req.Host.Network.GetDownloadRateLimit(), - UploadRate: req.Host.Network.GetUploadRate(), - UploadRateLimit: req.Host.Network.GetUploadRateLimit(), + if req.Host.GetCpu() != nil { + persistentCacheHost.CPU = persistentcache.CPU{ + LogicalCount: req.Host.Cpu.GetLogicalCount(), + PhysicalCount: req.Host.Cpu.GetPhysicalCount(), + Percent: req.Host.Cpu.GetPercent(), + ProcessPercent: req.Host.Cpu.GetProcessPercent(), + Times: persistentcache.CPUTimes{ + User: req.Host.Cpu.Times.GetUser(), + System: req.Host.Cpu.Times.GetSystem(), + Idle: req.Host.Cpu.Times.GetIdle(), + Nice: req.Host.Cpu.Times.GetNice(), + Iowait: req.Host.Cpu.Times.GetIowait(), + Irq: req.Host.Cpu.Times.GetIrq(), + Softirq: req.Host.Cpu.Times.GetSoftirq(), + Steal: req.Host.Cpu.Times.GetSteal(), + Guest: req.Host.Cpu.Times.GetGuest(), + GuestNice: req.Host.Cpu.Times.GetGuestNice(), + }, + } } - } - if req.Host.GetDisk() != nil { - host.Disk = standard.Disk{ - Total: req.Host.Disk.GetTotal(), - Free: req.Host.Disk.GetFree(), - Used: req.Host.Disk.GetUsed(), - UsedPercent: req.Host.Disk.GetUsedPercent(), - InodesTotal: req.Host.Disk.GetInodesTotal(), - InodesUsed: req.Host.Disk.GetInodesUsed(), - InodesFree: req.Host.Disk.GetInodesFree(), - InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + if req.Host.GetMemory() != nil { + persistentCacheHost.Memory = persistentcache.Memory{ + Total: req.Host.Memory.GetTotal(), + Available: req.Host.Memory.GetAvailable(), + Used: req.Host.Memory.GetUsed(), + UsedPercent: req.Host.Memory.GetUsedPercent(), + ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(), + Free: req.Host.Memory.GetFree(), + } } - } - if req.Host.GetBuild() != nil { - host.Build = standard.Build{ - GitVersion: req.Host.Build.GetGitVersion(), - GitCommit: req.Host.Build.GetGitCommit(), - GoVersion: req.Host.Build.GetGoVersion(), - Platform: req.Host.Build.GetPlatform(), + if req.Host.GetNetwork() != nil { + persistentCacheHost.Network = persistentcache.Network{ + TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(), + UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(), + Location: req.Host.Network.GetLocation(), + IDC: req.Host.Network.GetIdc(), + DownloadRate: req.Host.Network.GetDownloadRate(), + DownloadRateLimit: req.Host.Network.GetDownloadRateLimit(), + UploadRate: req.Host.Network.GetUploadRate(), + UploadRateLimit: req.Host.Network.GetUploadRateLimit(), + } + } + + if req.Host.GetDisk() != nil { + persistentCacheHost.Disk = persistentcache.Disk{ + Total: req.Host.Disk.GetTotal(), + Free: req.Host.Disk.GetFree(), + Used: req.Host.Disk.GetUsed(), + UsedPercent: req.Host.Disk.GetUsedPercent(), + InodesTotal: req.Host.Disk.GetInodesTotal(), + InodesUsed: req.Host.Disk.GetInodesUsed(), + InodesFree: req.Host.Disk.GetInodesFree(), + InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + } } - } - if req.GetInterval() != nil { - host.AnnounceInterval = req.GetInterval().AsDuration() + if req.Host.GetBuild() != nil { + persistentCacheHost.Build = persistentcache.Build{ + GitVersion: req.Host.Build.GetGitVersion(), + GitCommit: req.Host.Build.GetGitCommit(), + GoVersion: req.Host.Build.GetGoVersion(), + Platform: req.Host.Build.GetPlatform(), + } + } + + if req.GetInterval() != nil { + persistentCacheHost.AnnounceInterval = req.GetInterval().AsDuration() + } + + persistentCacheHost.Log.Infof("update persistent cache host: %#v", req) + if err := v.persistentCacheResource.HostManager().Store(ctx, persistentCacheHost); err != nil { + persistentCacheHost.Log.Errorf("store persistent cache host failed: %s", err) + return err + } } return nil diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 69417723167..4762629490b 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" "go.uber.org/mock/gomock" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -53,6 +54,181 @@ import ( storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" ) +var ( + mockRawHost = standard.Host{ + ID: mockHostID, + Type: pkgtypes.HostTypeNormal, + Hostname: "foo", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + CreatedAt: atomic.NewTime(time.Now()), + UpdatedAt: atomic.NewTime(time.Now()), + } + + mockRawSeedHost = standard.Host{ + ID: mockSeedHostID, + Type: pkgtypes.HostTypeSuperSeed, + Hostname: "bar", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + CreatedAt: atomic.NewTime(time.Now()), + UpdatedAt: atomic.NewTime(time.Now()), + } + + mockCPU = standard.CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 1, + ProcessPercent: 0.5, + Times: standard.CPUTimes{ + User: 240662.2, + System: 317950.1, + Idle: 3393691.3, + Nice: 0, + Iowait: 0, + Irq: 0, + Softirq: 0, + Steal: 0, + Guest: 0, + GuestNice: 0, + }, + } + + mockMemory = standard.Memory{ + Total: 17179869184, + Available: 5962813440, + Used: 11217055744, + UsedPercent: 65.291858, + ProcessUsedPercent: 41.525125, + Free: 2749598908, + } + + mockNetwork = standard.Network{ + TCPConnectionCount: 10, + UploadTCPConnectionCount: 1, + Location: mockHostLocation, + IDC: mockHostIDC, + } + + mockDisk = standard.Disk{ + Total: 499963174912, + Free: 37226479616, + Used: 423809622016, + UsedPercent: 91.92547406065952, + InodesTotal: 4882452880, + InodesUsed: 7835772, + InodesFree: 4874617108, + InodesUsedPercent: 0.1604884305611568, + } + + mockBuild = standard.Build{ + GitVersion: "v1.0.0", + GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", + GoVersion: "1.18", + Platform: "darwin", + } + + mockInterval = durationpb.New(5 * time.Minute).AsDuration() + + mockRawPersistentCacheHost = persistentcache.Host{ + ID: mockHostID, + Type: pkgtypes.HostTypeNormal, + Hostname: "foo", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockPersistentCacheCPU, + Memory: mockPersistentCacheMemory, + Network: mockPersistentCacheNetwork, + Disk: mockPersistentCacheDisk, + Build: mockPersistentCacheBuild, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + mockPersistentCacheCPU = persistentcache.CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 1, + ProcessPercent: 0.5, + Times: persistentcache.CPUTimes{ + User: 240662.2, + System: 317950.1, + Idle: 3393691.3, + Nice: 0, + Iowait: 0, + Irq: 0, + Softirq: 0, + Steal: 0, + Guest: 0, + GuestNice: 0, + }, + } + + mockPersistentCacheMemory = persistentcache.Memory{ + Total: 17179869184, + Available: 5962813440, + Used: 11217055744, + UsedPercent: 65.291858, + ProcessUsedPercent: 41.525125, + Free: 2749598908, + } + + mockPersistentCacheNetwork = persistentcache.Network{ + TCPConnectionCount: 10, + UploadTCPConnectionCount: 1, + Location: mockHostLocation, + IDC: mockHostIDC, + } + + mockPersistentCacheDisk = persistentcache.Disk{ + Total: 499963174912, + Free: 37226479616, + Used: 423809622016, + UsedPercent: 91.92547406065952, + InodesTotal: 4882452880, + InodesUsed: 7835772, + InodesFree: 4874617108, + InodesUsedPercent: 0.1604884305611568, + } + + mockPersistentCacheBuild = persistentcache.Build{ + GitVersion: "v1.0.0", + GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", + GoVersion: "1.18", + Platform: "darwin", + } + + mockPersistentCacheInterval = durationpb.New(5 * time.Minute).AsDuration() +) + func TestService_NewV2(t *testing.T) { tests := []struct { name string @@ -424,10 +600,10 @@ func TestServiceV2_AnnounceHost(t *testing.T) { tests := []struct { name string req *schedulerv2.AnnounceHostRequest - run func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, hostManager standard.HostManager, mr *standard.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) + run func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, persistentCacheHost *persistentcache.Host, hostManager standard.HostManager, persistentCacheHostManager persistentcache.HostManager, mr *standard.MockResourceMockRecorder, mpr *persistentcache.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, mph *persistentcache.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { - name: "host not found", + name: "host not found and persistent cache host not found", req: &schedulerv2.AnnounceHostRequest{ Host: &commonv2.Host{ Id: mockHostID, @@ -497,7 +673,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { }, Interval: durationpb.New(5 * time.Minute), }, - run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, hostManager standard.HostManager, mr *standard.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, persistentCacheHost *persistentcache.Host, hostManager standard.HostManager, persistentCacheHostManager persistentcache.HostManager, mr *standard.MockResourceMockRecorder, mpr *persistentcache.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, mph *persistentcache.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), mr.HostManager().Return(hostManager).Times(1), @@ -532,6 +708,36 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) assert.NotNil(host.Log) }).Return().Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Load(gomock.Any(), gomock.Any()).Return(nil, false).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Store(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, host *persistentcache.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockPersistentCacheCPU) + assert.EqualValues(host.Memory, mockPersistentCacheMemory) + assert.EqualValues(host.Network, mockPersistentCacheNetwork) + assert.EqualValues(host.Disk, mockPersistentCacheDisk) + assert.EqualValues(host.Build, mockPersistentCacheBuild) + assert.EqualValues(host.AnnounceInterval, mockPersistentCacheInterval) + assert.Equal(host.ConcurrentUploadLimit, int32(10)) + assert.Equal(host.ConcurrentUploadCount, int32(0)) + assert.Equal(host.UploadCount, int64(0)) + assert.Equal(host.UploadFailedCount, int64(0)) + assert.NotEqual(host.CreatedAt.Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return(nil).Times(1), ) assert := assert.New(t) @@ -539,7 +745,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { }, }, { - name: "host not found and dynconfig returns error", + name: "host not found and persistent cache host not found, dynconfig returns error", req: &schedulerv2.AnnounceHostRequest{ Host: &commonv2.Host{ Id: mockHostID, @@ -609,7 +815,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { }, Interval: durationpb.New(5 * time.Minute), }, - run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, hostManager standard.HostManager, mr *standard.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, persistentCacheHost *persistentcache.Host, hostManager standard.HostManager, persistentCacheHostManager persistentcache.HostManager, mr *standard.MockResourceMockRecorder, mpr *persistentcache.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, mph *persistentcache.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), mr.HostManager().Return(hostManager).Times(1), @@ -644,6 +850,36 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) assert.NotNil(host.Log) }).Return().Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Load(gomock.Any(), gomock.Any()).Return(nil, false).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Store(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, host *persistentcache.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockPersistentCacheCPU) + assert.EqualValues(host.Memory, mockPersistentCacheMemory) + assert.EqualValues(host.Network, mockPersistentCacheNetwork) + assert.EqualValues(host.Disk, mockPersistentCacheDisk) + assert.EqualValues(host.Build, mockPersistentCacheBuild) + assert.EqualValues(host.AnnounceInterval, mockPersistentCacheInterval) + assert.Equal(host.ConcurrentUploadLimit, int32(200)) + assert.Equal(host.ConcurrentUploadCount, int32(0)) + assert.Equal(host.UploadCount, int64(0)) + assert.Equal(host.UploadFailedCount, int64(0)) + assert.NotEqual(host.CreatedAt.Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return(nil).Times(1), ) assert := assert.New(t) @@ -651,7 +887,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { }, }, { - name: "host already exists", + name: "host already exists and persistent cache host already exists", req: &schedulerv2.AnnounceHostRequest{ Host: &commonv2.Host{ Id: mockHostID, @@ -721,11 +957,41 @@ func TestServiceV2_AnnounceHost(t *testing.T) { }, Interval: durationpb.New(5 * time.Minute), }, - run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, hostManager standard.HostManager, mr *standard.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, persistentCacheHost *persistentcache.Host, hostManager standard.HostManager, persistentCacheHostManager persistentcache.HostManager, mr *standard.MockResourceMockRecorder, mpr *persistentcache.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, mph *persistentcache.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Load(gomock.Any(), gomock.Any()).Return(persistentCacheHost, true).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Store(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, host *persistentcache.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockPersistentCacheCPU) + assert.EqualValues(host.Memory, mockPersistentCacheMemory) + assert.EqualValues(host.Network, mockPersistentCacheNetwork) + assert.EqualValues(host.Disk, mockPersistentCacheDisk) + assert.EqualValues(host.Build, mockPersistentCacheBuild) + assert.EqualValues(host.AnnounceInterval, mockPersistentCacheInterval) + assert.Equal(host.ConcurrentUploadLimit, int32(10)) + assert.Equal(host.ConcurrentUploadCount, int32(0)) + assert.Equal(host.UploadCount, int64(0)) + assert.Equal(host.UploadFailedCount, int64(0)) + assert.NotEqual(host.CreatedAt.Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return(nil).Times(1), ) assert := assert.New(t) @@ -759,7 +1025,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { }, }, { - name: "host already exists and dynconfig returns error", + name: "host already exists and persistent cache host already exists, dynconfig returns error", req: &schedulerv2.AnnounceHostRequest{ Host: &commonv2.Host{ Id: mockHostID, @@ -829,11 +1095,41 @@ func TestServiceV2_AnnounceHost(t *testing.T) { }, Interval: durationpb.New(5 * time.Minute), }, - run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, hostManager standard.HostManager, mr *standard.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, persistentCacheHost *persistentcache.Host, hostManager standard.HostManager, persistentCacheHostManager persistentcache.HostManager, mr *standard.MockResourceMockRecorder, mpr *persistentcache.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, mph *persistentcache.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Load(gomock.Any(), gomock.Any()).Return(persistentCacheHost, true).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Store(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, host *persistentcache.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockPersistentCacheCPU) + assert.EqualValues(host.Memory, mockPersistentCacheMemory) + assert.EqualValues(host.Network, mockPersistentCacheNetwork) + assert.EqualValues(host.Disk, mockPersistentCacheDisk) + assert.EqualValues(host.Build, mockPersistentCacheBuild) + assert.EqualValues(host.AnnounceInterval, mockPersistentCacheInterval) + assert.Equal(host.ConcurrentUploadLimit, int32(200)) + assert.Equal(host.ConcurrentUploadCount, int32(0)) + assert.Equal(host.UploadCount, int64(0)) + assert.Equal(host.UploadFailedCount, int64(0)) + assert.NotEqual(host.CreatedAt.Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return(nil).Times(1), ) assert := assert.New(t) @@ -866,6 +1162,260 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.NotNil(host.Log) }, }, + { + name: "host not found and persistent cache host not found, store persistent cache host failed", + req: &schedulerv2.AnnounceHostRequest{ + Host: &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + DisableShared: true, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + Location: &mockNetwork.Location, + Idc: &mockNetwork.IDC, + DownloadRate: mockNetwork.DownloadRate, + DownloadRateLimit: mockNetwork.DownloadRateLimit, + UploadRate: mockNetwork.UploadRate, + UploadRateLimit: mockNetwork.UploadRateLimit, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: &mockBuild.GitCommit, + GoVersion: &mockBuild.GoVersion, + Platform: &mockBuild.Platform, + }, + }, + Interval: durationpb.New(5 * time.Minute), + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, persistentCacheHost *persistentcache.Host, hostManager standard.HostManager, persistentCacheHostManager persistentcache.HostManager, mr *standard.MockResourceMockRecorder, mpr *persistentcache.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, mph *persistentcache.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(nil, false).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Store(gomock.Any()).Do(func(host *standard.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return().Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Load(gomock.Any(), gomock.Any()).Return(nil, false).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Store(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, host *persistentcache.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockPersistentCacheCPU) + assert.EqualValues(host.Memory, mockPersistentCacheMemory) + assert.EqualValues(host.Network, mockPersistentCacheNetwork) + assert.EqualValues(host.Disk, mockPersistentCacheDisk) + assert.EqualValues(host.Build, mockPersistentCacheBuild) + assert.EqualValues(host.AnnounceInterval, mockPersistentCacheInterval) + assert.Equal(host.ConcurrentUploadLimit, int32(10)) + assert.Equal(host.ConcurrentUploadCount, int32(0)) + assert.Equal(host.UploadCount, int64(0)) + assert.Equal(host.UploadFailedCount, int64(0)) + assert.NotEqual(host.CreatedAt.Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return(errors.New("bar")).Times(1), + ) + + assert := assert.New(t) + assert.Error(svc.AnnounceHost(context.Background(), req)) + }, + }, + { + name: "host already exists and persistent cache host already exists, store persistent cache host failed", + req: &schedulerv2.AnnounceHostRequest{ + Host: &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "foo", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + DisableShared: false, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + Location: &mockNetwork.Location, + Idc: &mockNetwork.IDC, + DownloadRate: mockNetwork.DownloadRate, + DownloadRateLimit: mockNetwork.DownloadRateLimit, + UploadRate: mockNetwork.UploadRate, + UploadRateLimit: mockNetwork.UploadRateLimit, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: &mockBuild.GitCommit, + GoVersion: &mockBuild.GoVersion, + Platform: &mockBuild.Platform, + }, + }, + Interval: durationpb.New(5 * time.Minute), + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *standard.Host, persistentCacheHost *persistentcache.Host, hostManager standard.HostManager, persistentCacheHostManager persistentcache.HostManager, mr *standard.MockResourceMockRecorder, mpr *persistentcache.MockResourceMockRecorder, mh *standard.MockHostManagerMockRecorder, mph *persistentcache.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(host, true).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Load(gomock.Any(), gomock.Any()).Return(persistentCacheHost, true).Times(1), + mpr.HostManager().Return(persistentCacheHostManager).Times(1), + mph.Store(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, host *persistentcache.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockPersistentCacheCPU) + assert.EqualValues(host.Memory, mockPersistentCacheMemory) + assert.EqualValues(host.Network, mockPersistentCacheNetwork) + assert.EqualValues(host.Disk, mockPersistentCacheDisk) + assert.EqualValues(host.Build, mockPersistentCacheBuild) + assert.EqualValues(host.AnnounceInterval, mockPersistentCacheInterval) + assert.Equal(host.ConcurrentUploadLimit, int32(200)) + assert.Equal(host.ConcurrentUploadCount, int32(0)) + assert.Equal(host.UploadCount, int64(0)) + assert.Equal(host.UploadFailedCount, int64(0)) + assert.NotEqual(host.CreatedAt.Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return(errors.New("bar")).Times(1), + ) + + assert := assert.New(t) + assert.Error(svc.AnnounceHost(context.Background(), req)) + }, + }, } for _, tc := range tests { @@ -878,12 +1428,21 @@ func TestServiceV2_AnnounceHost(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) hostManager := standard.NewMockHostManager(ctl) + persistentcacheHostManager := persistentcache.NewMockHostManager(ctl) host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + persistentCacheHost := persistentcache.NewHost( + mockRawPersistentCacheHost.ID, mockRawPersistentCacheHost.Hostname, mockRawPersistentCacheHost.IP, + mockRawPersistentCacheHost.OS, mockRawPersistentCacheHost.Platform, mockRawPersistentCacheHost.PlatformFamily, mockRawPersistentCacheHost.PlatformVersion, mockRawPersistentCacheHost.KernelVersion, + mockRawPersistentCacheHost.Port, mockRawPersistentCacheHost.DownloadPort, mockRawPersistentCacheHost.ConcurrentUploadCount, + mockRawPersistentCacheHost.UploadCount, mockRawPersistentCacheHost.UploadFailedCount, mockRawPersistentCacheHost.DisableShared, pkgtypes.HostType(mockRawPersistentCacheHost.Type), + mockRawPersistentCacheHost.CPU, mockRawPersistentCacheHost.Memory, mockRawPersistentCacheHost.Network, mockRawPersistentCacheHost.Disk, + mockRawPersistentCacheHost.Build, mockRawPersistentCacheHost.AnnounceInterval, mockRawPersistentCacheHost.CreatedAt, mockRawPersistentCacheHost.UpdatedAt, mockRawHost.Log) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage) - tc.run(t, svc, tc.req, host, hostManager, resource.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) + tc.run(t, svc, tc.req, host, persistentCacheHost, hostManager, persistentcacheHostManager, resource.EXPECT(), persistentCacheResource.EXPECT(), hostManager.EXPECT(), persistentcacheHostManager.EXPECT(), dynconfig.EXPECT()) }) } }