Skip to content

Commit

Permalink
feat: store persistent cache host by announce host api (#3640)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Nov 11, 2024
1 parent 071ab91 commit 8711108
Show file tree
Hide file tree
Showing 6 changed files with 872 additions and 189 deletions.
31 changes: 27 additions & 4 deletions scheduler/resource/persistentcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
)

// HostOption is a functional option for configuring the persistent cache host.
type HostOption func(h *Host)

// WithConcurrentUploadLimit sets persistent cache host's ConcurrentUploadLimit.
func WithConcurrentUploadLimit(limit int32) HostOption {
return func(h *Host) {
h.ConcurrentUploadLimit = limit
}
}

// Host contains content for host.
type Host struct {
// ID is host id.
Expand Down Expand Up @@ -248,11 +259,17 @@ type Disk struct {

// New host instance.
func NewHost(
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadLimit, concurrentUploadCount int32,
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadCount int32,
UploadCount, UploadFailedCount int64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, options ...HostOption,
) *Host {
return &Host{
// Calculate default of the concurrent upload limit by host type.
concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit
if typ == types.HostTypeNormal {
concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit
}

h := &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
Expand All @@ -271,12 +288,18 @@ func NewHost(
Disk: disk,
Build: build,
AnnounceInterval: announceInterval,
ConcurrentUploadLimit: concurrentUploadLimit,
ConcurrentUploadLimit: int32(concurrentUploadLimit),
ConcurrentUploadCount: concurrentUploadCount,
UploadCount: UploadCount,
UploadFailedCount: UploadFailedCount,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
}

for _, opt := range options {
opt(h)
}

return h
}
2 changes: 1 addition & 1 deletion scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
rawHost["kernel_version"],
int32(port),
int32(downloadPort),
int32(concurrentUploadLimit),
int32(concurrentUploadCount),
uploadCount,
uploadFailedCount,
Expand All @@ -423,6 +422,7 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
createdAt,
updatedAt,
logger.WithHost(rawHost["id"], rawHost["hostname"], rawHost["ip"]),
WithConcurrentUploadLimit(int32(concurrentUploadLimit)),
), true
}

Expand Down
30 changes: 30 additions & 0 deletions scheduler/resource/persistentcache/task_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 0 additions & 99 deletions scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"go.uber.org/mock/gomock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
Expand Down Expand Up @@ -76,103 +74,6 @@ var (
TaskDownloadTimeout: 1 * time.Hour,
}

mockRawHost = resource.Host{
ID: mockHostID,
Type: pkgtypes.HostTypeNormal,
Hostname: "foo",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
}

mockRawSeedHost = resource.Host{
ID: mockSeedHostID,
Type: pkgtypes.HostTypeSuperSeed,
Hostname: "bar",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
}

mockCPU = resource.CPU{
LogicalCount: 4,
PhysicalCount: 2,
Percent: 1,
ProcessPercent: 0.5,
Times: resource.CPUTimes{
User: 240662.2,
System: 317950.1,
Idle: 3393691.3,
Nice: 0,
Iowait: 0,
Irq: 0,
Softirq: 0,
Steal: 0,
Guest: 0,
GuestNice: 0,
},
}

mockMemory = resource.Memory{
Total: 17179869184,
Available: 5962813440,
Used: 11217055744,
UsedPercent: 65.291858,
ProcessUsedPercent: 41.525125,
Free: 2749598908,
}

mockNetwork = resource.Network{
TCPConnectionCount: 10,
UploadTCPConnectionCount: 1,
Location: mockHostLocation,
IDC: mockHostIDC,
}

mockDisk = resource.Disk{
Total: 499963174912,
Free: 37226479616,
Used: 423809622016,
UsedPercent: 91.92547406065952,
InodesTotal: 4882452880,
InodesUsed: 7835772,
InodesFree: 4874617108,
InodesUsedPercent: 0.1604884305611568,
}

mockBuild = resource.Build{
GitVersion: "v1.0.0",
GitCommit: "221176b117c6d59366d68f2b34d38be50c935883",
GoVersion: "1.18",
Platform: "darwin",
}

mockInterval = durationpb.New(5 * time.Minute).AsDuration()

mockPeerHost = &schedulerv1.PeerHost{
Id: mockHostID,
Ip: "127.0.0.1",
Expand Down
Loading

0 comments on commit 8711108

Please sign in to comment.